Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 | 12x 12x 14x 14x 14x 14x 19x 19x 19x 19x 2x 2x 1x 1x 1x 19x 3x 3x 3x 19x 1x 1x 1x 19x 1x 1x 1x 19x 5x 5x 5x 4x 1x 5x 12x | 'use strict';
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
import {ReplaySubject, Subject} from 'rxjs';
import LifecycleEvent from '../../types/lifecycle.event.type';
class ObservableModel extends Subject<any> {
private readonly _collection: string;
private _stream: ChangeStream;
public readonly lifecycle: ReplaySubject<LifecycleEvent>;
constructor(collection: string) {
super();
this._collection = collection;
this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
this._initializeStream();
}
private _initializeStream(): void {
const db: mongoose.mongo.Db = mongoose.connection.db;
const collectionObj: mongoose.mongo.Collection = db.collection(this._collection);
this._stream = collectionObj.watch([], {fullDocument: 'updateLookup'});
this._stream.on('change', (change: any): void => {
try {
const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
this.next({ns, documentKey, operationType, updateDescription, fullDocument});
} catch (error) {
console.error(`[@owservable] -> ObservableModel[${this._collection}] Error in change event:`, error);
this.lifecycle.next({
type: 'error',
collection: this._collection,
timestamp: new Date(),
error
});
}
});
this._stream.on('error', (error: any): void => {
console.error(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream error event:`, error, ', attempting reconnection...');
this.lifecycle.next({
type: 'error',
collection: this._collection,
timestamp: new Date(),
error
});
this._reconnect();
});
this._stream.on('close', (): void => {
console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream close event: stream has closed, attempting reconnection...`);
this.lifecycle.next({
type: 'close',
collection: this._collection,
timestamp: new Date()
});
this._reconnect();
});
this._stream.on('end', (): void => {
console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream end event: stream has ended, attempting reconnection...`);
this.lifecycle.next({
type: 'end',
collection: this._collection,
timestamp: new Date()
});
this._reconnect();
});
this.lifecycle.next({
type: 'live',
collection: this._collection,
timestamp: new Date()
});
}
private _reconnect(): void {
console.info(`[@owservable] -> ObservableModel[${this._collection}] Reconnecting ChangeStream...`);
try {
if (this._stream) {
this._stream.removeAllListeners();
}
} catch (error) {
console.error(`[@owservable] -> ObservableModel[${this._collection}] Error cleaning up old stream:`, error);
}
this._initializeStream();
}
}
export default ObservableModel;
|