All files / src/mongodb/functions observable.model.ts

42.85% Statements 15/35
0% Branches 0/2
28.57% Functions 2/7
42.85% Lines 15/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96    12x     12x                   7x 7x 7x   7x       7x 7x   7x   7x                             7x                     7x                   7x                   7x                                       12x  
'use strict';
 
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
 
import {ReplaySubject, Subject} from 'rxjs';
 
import LifecycleEvent from '../../types/lifecycle.event.type';
 
class ObservableModel extends Subject<any> {
	private readonly _collection: string;
	private _stream: ChangeStream;
	public readonly lifecycle: ReplaySubject<LifecycleEvent>;
 
	constructor(collection: string) {
		super();
		this._collection = collection;
		this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
 
		this._initializeStream();
	}
 
	private _initializeStream(): void {
		const db: mongoose.mongo.Db = mongoose.connection.db;
		const collectionObj: mongoose.mongo.Collection = db.collection(this._collection);
 
		this._stream = collectionObj.watch([], {fullDocument: 'updateLookup'});
 
		this._stream.on('change', (change: any): void => {
			try {
				const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
				this.next({ns, documentKey, operationType, updateDescription, fullDocument});
			} catch (error) {
				console.error(`[@owservable] -> ObservableModel[${this._collection}] Error in change event:`, error);
				this.lifecycle.next({
					type: 'error',
					collection: this._collection,
					timestamp: new Date(),
					error
				});
			}
		});
 
		this._stream.on('error', (error: any): void => {
			console.error(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream error event:`, error, ', attempting reconnection...');
			this.lifecycle.next({
				type: 'error',
				collection: this._collection,
				timestamp: new Date(),
				error
			});
			this._reconnect();
		});
 
		this._stream.on('close', (): void => {
			console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream close event: stream has closed, attempting reconnection...`);
			this.lifecycle.next({
				type: 'close',
				collection: this._collection,
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this._stream.on('end', (): void => {
			console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream end event: stream has ended, attempting reconnection...`);
			this.lifecycle.next({
				type: 'end',
				collection: this._collection,
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this.lifecycle.next({
			type: 'live',
			collection: this._collection,
			timestamp: new Date()
		});
	}
 
	private _reconnect(): void {
		console.info(`[@owservable] -> ObservableModel[${this._collection}] Reconnecting ChangeStream...`);
		try {
			if (this._stream) {
				this._stream.removeAllListeners();
			}
		} catch (error) {
			console.error(`[@owservable] -> ObservableModel[${this._collection}] Error cleaning up old stream:`, error);
		}
 
		this._initializeStream();
	}
}
export default ObservableModel;