owservable

owservable fastify backend

This project is maintained by owservable

owservable

Owservable

A reactive backend library for Node.js applications that provides real-time MongoDB change streams, reactive data stores, and automated task scheduling. Built with RxJS and TypeScript.

Owservable is a replacement for Reactive Stack JS.

πŸš€ Features

πŸ“¦ Installation

npm install owservable

or

yarn add owservable

or

pnpm add owservable

πŸ—οΈ Requirements

πŸ”§ MongoDB Setup

Owservable requires MongoDB to be running as a Replica Set to enable change streams:

# Start MongoDB as a single-node replica set
mongod --replSet rs0

# Initialize the replica set
mongo --eval "rs.initiate()"

For more details, see: MongoDB Change Streams on localhost with Node.js

πŸš€ Quick Start

1. Basic Server Setup

import { 
  OwservableClient, 
  MongoDBConnector, 
  processModels,
  IConnectionManager 
} from 'owservable';

// Initialize MongoDB connection
const mongoConnector = new MongoDBConnector();
await mongoConnector.connect('mongodb://localhost:27017/myapp');

// Process your Mongoose models
await processModels('./models');

// Implement connection manager
class MyConnectionManager implements IConnectionManager {
  async connected(jwt: string): Promise<void> {
    // Handle client connection
  }
  
  async disconnected(): Promise<void> {
    // Handle client disconnection
  }
  
  async checkSession(): Promise<any> {
    // Handle session validation
    return { refresh_in: 300000 };
  }
  
  ping(ms: number): void {
    // Handle ping updates
  }
  
  location(path: string): void {
    // Handle location updates
  }
  
  get user(): any {
    // Return current user
    return this.currentUser;
  }
}

// Create owservable client
const connectionManager = new MyConnectionManager();
const client = new OwservableClient(connectionManager);

// Handle client messages
client.subscribe({
  next: (message) => {
    // Forward to WebSocket clients
    websocket.send(JSON.stringify(message));
  },
  error: (error) => console.error('Client error:', error)
});

2. WebSocket Integration

import { WebSocketServer } from 'ws';

const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  const client = new OwservableClient(connectionManager);
  
  // Subscribe to client updates
  client.subscribe({
    next: (message) => ws.send(JSON.stringify(message)),
    error: (error) => console.error('Error:', error)
  });
  
  // Handle incoming messages
  ws.on('message', async (data) => {
    const message = JSON.parse(data.toString());
    await client.consume(message);
  });
  
  // Handle disconnect
  ws.on('close', () => {
    client.disconnected();
  });
  
  // Start ping
  client.ping();
});

3. Reactive Data Stores

import { storeFactory, EStoreType } from 'owservable';

// Count store - returns only document count
const countStore = storeFactory(EStoreType.COUNT, 'users', 'user-count');
countStore.config = {
  query: { active: true },
  fields: {}
};

// Collection store - returns array of documents
const collectionStore = storeFactory(EStoreType.COLLECTION, 'users', 'user-list');
collectionStore.config = {
  query: { active: true },
  fields: { name: 1, email: 1 },
  sort: { name: 1 },
  page: 1,
  pageSize: 10
};

// Document store - returns single document
const documentStore = storeFactory(EStoreType.DOCUMENT, 'users', 'user-profile');
documentStore.config = {
  query: { _id: 'user123' },
  fields: { name: 1, email: 1, profile: 1 }
};

4. Task Scheduling

import { 
  initiateCronjobs, 
  initiateWatchers, 
  initiateWorkers 
} from 'owservable';

// Initialize cronjobs
await initiateCronjobs('./cronjobs');

// Initialize file watchers
await initiateWatchers('./watchers');

// Initialize background workers
await initiateWorkers('./workers');

5. Data Middleware

import { DataMiddlewareMap } from 'owservable';

// Register middleware for collection
DataMiddlewareMap.set('users', async (payload, user) => {
  // Filter sensitive data based on user permissions
  if (!user.isAdmin) {
    payload.data = payload.data.map(doc => ({
      ...doc,
      email: '***@***.***' // Hide emails for non-admins
    }));
  }
  return payload;
});

πŸ“š Core Components

OwservableClient

The main client class that manages WebSocket connections and subscriptions:

const client = new OwservableClient(connectionManager);

// Handle different message types
await client.consume({
  type: 'subscribe',
  target: 'user-list',
  scope: 'collection',
  observe: 'users',
  config: {
    query: { active: true },
    fields: { name: 1, email: 1 }
  }
});

Store Types

CollectionStore

Returns arrays of documents with real-time updates:

const store = storeFactory(EStoreType.COLLECTION, 'posts', 'post-list');
store.config = {
  query: { published: true },
  fields: { title: 1, content: 1, author: 1 },
  sort: { createdAt: -1 },
  populate: ['author'],
  page: 1,
  pageSize: 20,
  incremental: true // Enable incremental updates
};

CountStore

Returns document counts with real-time updates:

const store = storeFactory(EStoreType.COUNT, 'users', 'user-count');
store.config = {
  query: { active: true }
};

DocumentStore

Returns single documents with real-time updates:

const store = storeFactory(EStoreType.DOCUMENT, 'users', 'current-user');
store.config = {
  query: { _id: userId },
  fields: { name: 1, email: 1, preferences: 1 }
};

MongoDB Integration

Observable Models

Monitor MongoDB collections for changes:

import { observableModel } from 'owservable';
import UserModel from './models/User';

const userObservable = observableModel(UserModel);
userObservable.subscribe({
  next: (change) => {
    console.log('User collection changed:', change);
  }
});

Observable Database

Monitor entire database for changes:

import { observableDatabase } from 'owservable';

const dbObservable = observableDatabase();
dbObservable.subscribe({
  next: (change) => {
    console.log('Database changed:', change);
  }
});

Action Pattern

Create structured business logic with the action pattern:

import { Action, ActionInterface } from 'owservable';

class SendEmailAction extends Action implements ActionInterface {
  protected _description = 'Send email notification';
  
  async handle(to: string, subject: string, body: string): Promise<void> {
    // Email sending logic
    console.log(`Sending email to ${to}: ${subject}`);
  }
  
  description(): string {
    return this._description;
  }
}

// Use in cronjobs, workers, or watchers
const emailAction = new SendEmailAction();
await emailAction.handle('user@example.com', 'Welcome!', 'Hello World');

πŸ”§ Configuration

Environment Variables

# MongoDB connection
MONGODB_URI=mongodb://localhost:27017/myapp

# WebSocket server
WS_PORT=8080

# JWT settings
JWT_SECRET=your-secret-key
JWT_EXPIRATION=24h

TypeScript Configuration

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "lib": ["ES2020"],
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "experimentalDecorators": true,
    "emitDecoratorMetadata": true
  }
}

πŸ“– Advanced Usage

Custom Store Implementation

import { AStore, EStoreType } from 'owservable';

class CustomStore extends AStore {
  constructor(model: Model<any>, target: string) {
    super(model, target);
    this._type = EStoreType.COLLECTION;
  }
  
  protected async load(change: any): Promise<void> {
    // Custom loading logic
    const data = await this.customQuery();
    this.emitMany(Date.now(), this._subscriptionId, { data });
  }
}

Performance Optimization

// Use incremental updates for large collections
store.config = {
  incremental: true,
  page: 1,
  pageSize: 50
};

// Optimize queries with indexes
await addIndexToAttributes(model, ['field1', 'field2']);

// Use field projection
store.config = {
  fields: { 
    name: 1, 
    email: 1,
    _id: 0 // Exclude _id
  }
};

πŸ§ͺ Testing

npm test

πŸ“– Documentation

πŸ›£οΈ Roadmap

πŸ“„ License

Licensed under The Unlicense.

🀝 Contributors


Predrag Stojadinović

πŸ’» πŸ“– πŸ€” 🚧 πŸ“† ⚠️

This project follows the all-contributors specification.

Contributions of any kind welcome!

πŸ“Š UML Diagram

Checkout the UML diagram for a visual overview of the architecture.