All files / src/mongodb/functions observable.model.ts

100% Statements 35/35
100% Branches 2/2
100% Functions 7/7
100% Lines 35/35

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96    12x     12x                   14x 14x 14x   14x       19x 19x   19x   19x 2x 2x 1x   1x 1x                 19x 3x 3x           3x     19x 1x 1x         1x     19x 1x 1x         1x     19x               5x 5x 5x 4x     1x     5x     12x  
'use strict';
 
import mongoose from 'mongoose';
import {ChangeStream} from 'mongodb';
 
import {ReplaySubject, Subject} from 'rxjs';
 
import LifecycleEvent from '../../types/lifecycle.event.type';
 
class ObservableModel extends Subject<any> {
	private readonly _collection: string;
	private _stream: ChangeStream;
	public readonly lifecycle: ReplaySubject<LifecycleEvent>;
 
	constructor(collection: string) {
		super();
		this._collection = collection;
		this.lifecycle = new ReplaySubject<LifecycleEvent>(1);
 
		this._initializeStream();
	}
 
	private _initializeStream(): void {
		const db: mongoose.mongo.Db = mongoose.connection.db;
		const collectionObj: mongoose.mongo.Collection = db.collection(this._collection);
 
		this._stream = collectionObj.watch([], {fullDocument: 'updateLookup'});
 
		this._stream.on('change', (change: any): void => {
			try {
				const {ns, documentKey, operationType, updateDescription, fullDocument} = change;
				this.next({ns, documentKey, operationType, updateDescription, fullDocument});
			} catch (error) {
				console.error(`[@owservable] -> ObservableModel[${this._collection}] Error in change event:`, error);
				this.lifecycle.next({
					type: 'error',
					collection: this._collection,
					timestamp: new Date(),
					error
				});
			}
		});
 
		this._stream.on('error', (error: any): void => {
			console.error(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream error event:`, error, ', attempting reconnection...');
			this.lifecycle.next({
				type: 'error',
				collection: this._collection,
				timestamp: new Date(),
				error
			});
			this._reconnect();
		});
 
		this._stream.on('close', (): void => {
			console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream close event: stream has closed, attempting reconnection...`);
			this.lifecycle.next({
				type: 'close',
				collection: this._collection,
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this._stream.on('end', (): void => {
			console.warn(`[@owservable] -> ObservableModel[${this._collection}] ChangeStream end event: stream has ended, attempting reconnection...`);
			this.lifecycle.next({
				type: 'end',
				collection: this._collection,
				timestamp: new Date()
			});
			this._reconnect();
		});
 
		this.lifecycle.next({
			type: 'live',
			collection: this._collection,
			timestamp: new Date()
		});
	}
 
	private _reconnect(): void {
		console.info(`[@owservable] -> ObservableModel[${this._collection}] Reconnecting ChangeStream...`);
		try {
			if (this._stream) {
				this._stream.removeAllListeners();
			}
		} catch (error) {
			console.error(`[@owservable] -> ObservableModel[${this._collection}] Error cleaning up old stream:`, error);
		}
 
		this._initializeStream();
	}
}
export default ObservableModel;