All files / src/mongodb/functions observable.database.ts

100% Statements 36/36
100% Branches 4/4
100% Functions 8/8
100% Lines 35/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98    3x 3x                     11x 11x       10x 10x   10x       15x   15x   15x 2x 2x 1x   1x 1x                 15x 3x 3x           3x     15x 1x 1x         1x     15x 1x 1x         1x     15x               5x 5x 5x 4x     1x     5x     3x  
'use strict';
 
import {ReplaySubject, Subject} from 'rxjs';
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
 
import LifecycleEvent from '../../types/lifecycle.event.type';
 
class ObservableDatabase extends Subject<any> {
	private _stream: ChangeStream;
	private static _instance: ObservableDatabase;
	public readonly lifecycle: ReplaySubject<LifecycleEvent>;
 
	public static init(): ObservableDatabase {
		if (!ObservableDatabase._instance) ObservableDatabase._instance = new ObservableDatabase();
		return ObservableDatabase._instance;
	}
 
	constructor() {
		super();
		this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
 
		this._initializeStream();
	}
 
	private _initializeStream(): void {
		const db: mongoose.mongo.Db = mongoose.connection.db;
 
		this._stream = db.watch([], {fullDocument: 'updateLookup'});
 
		this._stream.on('change', (change: any): void => {
			try {
				const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
				this.next({ns, documentKey, operationType, updateDescription, fullDocument});
			} catch (error) {
				console.error('[@owservable] -> ObservableDatabase Error in change event:', error);
				this.lifecycle.next({
					type: 'error',
					collection: '*',
					timestamp: new Date(),
					error
				});
			}
		});
 
		this._stream.on('error', (error: any): void => {
			console.error('[@owservable] -> ObservableDatabase ChangeStream error event:', error, ', attempting reconnection...');
			this.lifecycle.next({
				type: 'error',
				collection: '*',
				timestamp: new Date(),
				error
			});
			this._reconnect();
		});
 
		this._stream.on('close', (): void => {
			console.warn('[@owservable] -> ObservableDatabase ChangeStream close event: stream has closed, attempting reconnection...');
			this.lifecycle.next({
				type: 'close',
				collection: '*',
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this._stream.on('end', (): void => {
			console.warn('[@owservable] -> ObservableDatabase ChangeStream end event: stream has ended, attempting reconnection...');
			this.lifecycle.next({
				type: 'end',
				collection: '*',
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this.lifecycle.next({
			type: 'live',
			collection: '*',
			timestamp: new Date()
		});
	}
 
	private _reconnect(): void {
		console.info('[@owservable] -> ObservableDatabase Reconnecting ChangeStream...');
		try {
			if (this._stream) {
				this._stream.removeAllListeners();
			}
		} catch (error) {
			console.error('[@owservable] -> ObservableDatabase Error cleaning up old stream:', error);
		}
 
		this._initializeStream();
	}
}
export default ObservableDatabase;