Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | 3x 3x 4x 4x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 3x | 'use strict';
import {ReplaySubject, Subject} from 'rxjs';
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
import LifecycleEvent from '../../types/lifecycle.event.type';
class ObservableDatabase extends Subject<any> {
private _stream: ChangeStream;
private static _instance: ObservableDatabase;
public readonly lifecycle: ReplaySubject<LifecycleEvent>;
public static init(): ObservableDatabase {
if (!ObservableDatabase._instance) ObservableDatabase._instance = new ObservableDatabase();
return ObservableDatabase._instance;
}
constructor() {
super();
this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
this._initializeStream();
}
private _initializeStream(): void {
const db: mongoose.mongo.Db = mongoose.connection.db;
this._stream = db.watch([], {fullDocument: 'updateLookup'});
this._stream.on('change', (change: any): void => {
try {
const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
this.next({ns, documentKey, operationType, updateDescription, fullDocument});
} catch (error) {
console.error('[@owservable] -> ObservableDatabase Error in change event:', error);
this.lifecycle.next({
type: 'error',
collection: '*',
timestamp: new Date(),
error
});
}
});
this._stream.on('error', (error: any): void => {
console.error('[@owservable] -> ObservableDatabase ChangeStream error event:', error, ', attempting reconnection...');
this.lifecycle.next({
type: 'error',
collection: '*',
timestamp: new Date(),
error
});
this._reconnect();
});
this._stream.on('close', (): void => {
console.warn('[@owservable] -> ObservableDatabase ChangeStream close event: stream has closed, attempting reconnection...');
this.lifecycle.next({
type: 'close',
collection: '*',
timestamp: new Date()
});
this._reconnect();
});
this._stream.on('end', (): void => {
console.warn('[@owservable] -> ObservableDatabase ChangeStream end event: stream has ended, attempting reconnection...');
this.lifecycle.next({
type: 'end',
collection: '*',
timestamp: new Date()
});
this._reconnect();
});
this.lifecycle.next({
type: 'live',
collection: '*',
timestamp: new Date()
});
}
private _reconnect(): void {
console.info('[@owservable] -> ObservableDatabase Reconnecting ChangeStream...');
try {
if (this._stream) {
this._stream.removeAllListeners();
}
} catch (error) {
console.error('[@owservable] -> ObservableDatabase Error cleaning up old stream:', error);
}
this._initializeStream();
}
}
export default ObservableDatabase;
|