Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 | 3x 3x 11x 11x 10x 10x 10x 15x 15x 15x 2x 2x 1x 1x 1x 15x 3x 3x 3x 15x 1x 1x 1x 15x 1x 1x 1x 15x 5x 5x 5x 4x 1x 5x 3x | 'use strict';
import {ReplaySubject, Subject} from 'rxjs';
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
import LifecycleEvent from '../../types/lifecycle.event.type';
class ObservableDatabase extends Subject<any> {
private _stream: ChangeStream;
private static _instance: ObservableDatabase;
public readonly lifecycle: ReplaySubject<LifecycleEvent>;
public static init(): ObservableDatabase {
if (!ObservableDatabase._instance) ObservableDatabase._instance = new ObservableDatabase();
return ObservableDatabase._instance;
}
constructor() {
super();
this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
this._initializeStream();
}
private _initializeStream(): void {
const db: mongoose.mongo.Db = mongoose.connection.db;
this._stream = db.watch([], {fullDocument: 'updateLookup'});
this._stream.on('change', (change: any): void => {
try {
const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
this.next({ns, documentKey, operationType, updateDescription, fullDocument});
} catch (error) {
console.error('[@owservable] -> ObservableDatabase Error in change event:', error);
this.lifecycle.next({
type: 'error',
collection: '*',
timestamp: new Date(),
error
});
}
});
this._stream.on('error', (error: any): void => {
console.error('[@owservable] -> ObservableDatabase ChangeStream error event:', error, ', attempting reconnection...');
this.lifecycle.next({
type: 'error',
collection: '*',
timestamp: new Date(),
error
});
this._reconnect();
});
this._stream.on('close', (): void => {
console.warn('[@owservable] -> ObservableDatabase ChangeStream close event: stream has closed, attempting reconnection...');
this.lifecycle.next({
type: 'close',
collection: '*',
timestamp: new Date()
});
this._reconnect();
});
this._stream.on('end', (): void => {
console.warn('[@owservable] -> ObservableDatabase ChangeStream end event: stream has ended, attempting reconnection...');
this.lifecycle.next({
type: 'end',
collection: '*',
timestamp: new Date()
});
this._reconnect();
});
this.lifecycle.next({
type: 'live',
collection: '*',
timestamp: new Date()
});
}
private _reconnect(): void {
console.info('[@owservable] -> ObservableDatabase Reconnecting ChangeStream...');
try {
if (this._stream) {
this._stream.removeAllListeners();
}
} catch (error) {
console.error('[@owservable] -> ObservableDatabase Error cleaning up old stream:', error);
}
this._initializeStream();
}
}
export default ObservableDatabase;
|