ɳSelfɳSELFDOCS
  • Getting Started

    • Introduction
    • Quick Start
    • Installation
    • Your First Project
  • Core Concepts

    • Architecture Overview
    • Project Structure
    • Configuration
    • Environments
  • Services

    • PostgreSQL Database
    • Hasura GraphQL
    • Authentication
    • Real-Time Communication
    • Storage (MinIO)
    • Email Configuration
    • Redis Cache
    • Search Engines
    • Functions
    • MLflow (ML Tracking)
    • Monitoring & Metrics
    • Admin UI
    • Dashboard
  • Database Tools

    • Schema Management
    • Migrations
    • Seeding Data
    • Backup & Restore
    • dbdiagram.io Sync
  • Microservices

    • NestJS Services
    • BullMQ Workers
    • Go Services
    • Python Services
  • CLI Reference

    • Complete Command Reference
    • Core Commands
    • Database Commands
    • Service Management
    • Production Commands
  • Deployment

    • Local Development
    • Production Setup
    • SSL/TLS Configuration
    • Domain Configuration
    • Environment Variables
  • Advanced Topics

    • Multi-Tenancy & SaaS
    • Security & Hardening
    • Custom Actions
    • Webhooks
    • Performance Tuning
    • Troubleshooting
  • Migration Guides

    • From Supabase
    • From Nhost
    • From Firebase
  • Resources

    • Changelog
    • Licensing
    • FAQ
    • Contributing
    • Support

Real-Time Communication System

v0.9.5Production ReadyComplete in Sprint 20

The ɳSelf real-time system provides comprehensive real-time communication capabilities including database subscriptions (Change Data Capture), channel broadcasting, presence tracking, and WebSocket connections. Built on PostgreSQL LISTEN/NOTIFY for low-latency updates with compatibility for Supabase and Nhost clients.

Major v0.9.5 Feature

Real-time communication is a major feature addition in v0.9.5, providing enterprise-grade capabilities with 40+ CLI commands for complete real-time infrastructure management. This is a drop-in replacement for Supabase Realtime with additional features like message replay.

Overview

The ɳSelf real-time system is built on PostgreSQL LISTEN/NOTIFY and provides:

  • Database Subscriptions (CDC): Real-time change data capture from any table
  • Channels: Public, private, and presence-enabled channels
  • Broadcasting: Send messages to channels with event types
  • Presence Tracking: Track user online/away/offline status
  • Message Replay: Replay historical messages since a timestamp
  • PostgreSQL Native: Uses LISTEN/NOTIFY for low-latency updates
  • Supabase Compatible: Drop-in replacement for Supabase Realtime

Feature Comparison

FeatureɳSelfSupabaseNhost
Database CDC✅✅✅
Channels✅✅✅
Presence✅✅✅
Broadcast✅✅✅
Message Replay✅❌❌
CLI Management✅ 40+ commands⚠️ Limited⚠️ Limited
Self-hosted✅ Primary⚠️ Optional⚠️ Optional

Quick Start

1. Initialize Real-Time System

# Initialize the real-time database schema
ɳSelf realtime init

# This creates:
# - realtime schema
# - Channels, messages, presence, and subscriptions tables
# - Helper functions for cleanup and management

2. Subscribe to Table Changes

# Subscribe to all changes on a table
ɳSelf realtime subscribe public.users

# Subscribe to specific events
ɳSelf realtime subscribe public.posts INSERT,UPDATE

# Listen to changes in real-time
ɳSelf realtime listen public.users

3. Create a Channel

# Create a public channel
ɳSelf realtime channel create "general" public

# Create a private channel
ɳSelf realtime channel create "support" private

# Create a presence-enabled channel
ɳSelf realtime channel create "lobby" presence

4. Broadcast a Message

# Send a message to a channel
ɳSelf realtime broadcast general user.joined '{"user_id": "123", "name": "John Doe"}'

# Get recent messages
ɳSelf realtime messages general 50

# Replay messages since timestamp
ɳSelf realtime replay general 1643723400

5. Track Presence

# Track user online
ɳSelf realtime presence track user-123 general online

# List online users
ɳSelf realtime presence online general

# Count online users
ɳSelf realtime presence count general

# Set user offline
ɳSelf realtime presence offline user-123 general

Database Subscriptions (CDC)

Change Data Capture allows you to subscribe to table changes in real-time. When you subscribe to a table, PostgreSQL triggers automatically capture INSERT, UPDATE, and DELETE operations and send notifications to listening clients.

Subscribe to Table Changes

# Subscribe to all events (INSERT, UPDATE, DELETE)
ɳSelf realtime subscribe public.users

# Subscribe to specific events
ɳSelf realtime subscribe public.posts INSERT,UPDATE

# Subscribe with filter (future feature)
ɳSelf realtime subscribe public.comments INSERT,UPDATE "post_id = '123'"

Listen to Changes

# Listen indefinitely (Ctrl+C to stop)
ɳSelf realtime listen public.users

# Listen for 30 seconds
ɳSelf realtime listen public.users 30

Example Output

Asynchronous notification "realtime:table:public.users" received from server process with PID 12345.
Payload: {"timestamp":1643723456,"operation":"INSERT","schema":"public","table":"users","new":{"id":"user-123","email":"john@example.com","created_at":"2026-01-30T10:00:00Z"}}

List Subscriptions

# Show all active subscriptions
ɳSelf realtime subscriptions

# Get subscription stats
ɳSelf realtime stats

Unsubscribe

# Remove subscription by table name
ɳSelf realtime unsubscribe public.users

# Remove by subscription ID
ɳSelf realtime unsubscribe <uuid>

How It Works

When you subscribe to a table:

  1. Trigger Function Created: A PostgreSQL function is created to capture changes
  2. Triggers Attached: INSERT/UPDATE/DELETE triggers are added to the table
  3. NOTIFY Sent: On each change, PostgreSQL sends a notification
  4. Clients Listen: WebSocket clients listen via LISTEN realtime:table:schema.table
-- Subscription record
INSERT INTO realtime.subscriptions (table_name, events)
VALUES ('public.users', ARRAY['INSERT', 'UPDATE', 'DELETE']);

-- Trigger function
CREATE FUNCTION realtime.notify_public_users() ...

-- Triggers
CREATE TRIGGER realtime_INSERT_users AFTER INSERT ON public.users ...
CREATE TRIGGER realtime_UPDATE_users AFTER UPDATE ON public.users ...
CREATE TRIGGER realtime_DELETE_users AFTER DELETE ON public.users ...

Channel Management

Channels organize real-time communication. ɳSelf supports three channel types:

TypeDescriptionUse Case
publicAnyone can joinGeneral chat, announcements
privateRequires membershipTeam channels, DMs
presenceTracks who's onlineMultiplayer games, collaborative editing

Create Channel

# Public channel
ɳSelf realtime channel create "general" public

# Private channel
ɳSelf realtime channel create "team-alpha" private

# Presence channel
ɳSelf realtime channel create "game-lobby" presence

# With metadata (JSON)
ɳSelf realtime channel create "support" private '{"max_members": 10}'

List Channels

# List all channels
ɳSelf realtime channel list

# Filter by type
ɳSelf realtime channel list public
ɳSelf realtime channel list private
ɳSelf realtime channel list presence

# JSON output
ɳSelf realtime channel list --format json

Example Output

 id  | slug        | name        | type    | members | online | created_at
-----+-------------+-------------+---------+---------+--------+------------
 abc | general     | General     | public  | 45      | 12     | 2026-01-30
 def | support     | Support     | private | 8       | 3      | 2026-01-30

Get Channel Details

# Get by slug
ɳSelf realtime channel get general

# Get by ID
ɳSelf realtime channel get <uuid>

Delete Channel

# Delete with confirmation
ɳSelf realtime channel delete general

# Force delete (no confirmation)
ɳSelf realtime channel delete general true

Manage Members

# Add member to channel
ɳSelf realtime channel join general user-123

# Add moderator
ɳSelf realtime channel join support user-456 moderator

# Add admin
ɳSelf realtime channel join team-alpha user-789 admin

# Remove member
ɳSelf realtime channel leave general user-123

# List members
ɳSelf realtime channel members general

Member Roles

  • member: Can read and send messages
  • moderator: Can moderate messages and members
  • admin: Full control over channel

Broadcasting Messages

Send real-time messages to channels. All broadcasts are stored for history and message replay.

Send Message

# Basic broadcast
ɳSelf realtime broadcast general user.joined '{"user_id": "123"}'

# With sender
ɳSelf realtime broadcast general message.sent '{"text": "Hello"}' user-123

# Complex payload
ɳSelf realtime broadcast game-lobby player.moved '{
  "player_id": "player-1",
  "position": {"x": 100, "y": 200},
  "timestamp": 1643723456
}'

Get Messages

# Get last 50 messages (default)
ɳSelf realtime messages general

# Get last 100 messages
ɳSelf realtime messages general 100

# JSON format
ɳSelf realtime messages general 50 json

Message Replay

Replay messages since a specific timestamp (unique to ɳSelf, not available in Supabase/Nhost):

# Replay from UNIX timestamp
ɳSelf realtime replay general 1643723400

# Replay from ISO 8601
ɳSelf realtime replay general "2026-01-30T10:00:00Z"

# Get JSON output
ɳSelf realtime replay general 1643723400 json

Use Cases for Message Replay

  • Reconnecting clients can get missed messages
  • Debugging message flow
  • Auditing communication history
  • Syncing offline clients

Event Types

# List event types in last 24 hours
ɳSelf realtime events general

# List events in last 48 hours
ɳSelf realtime events general 48

Example Output

 event_type      | count | last_sent
-----------------+-------+---------------------
 user.joined     | 234   | 2026-01-30 10:30:00
 message.sent    | 1843  | 2026-01-30 10:29:45
 user.left       | 189   | 2026-01-30 10:28:12

Cleanup Old Messages

# Delete messages older than 24 hours (default)
ɳSelf realtime cleanup

# Custom retention (48 hours)
ɳSelf realtime broadcast cleanup 48

Presence Tracking

Track user online/away/offline status in channels. Perfect for chat applications, multiplayer games, and collaborative tools.

Track Presence

# Set user online in channel
ɳSelf realtime presence track user-123 general online

# Set user away
ɳSelf realtime presence track user-123 general away

# Set user offline
ɳSelf realtime presence track user-123 general offline

# Track with metadata
ɳSelf realtime presence track user-123 game-lobby online '{"level": 5, "score": 1000}'

Status Values

StatusDescription
onlineUser is actively present
awayUser is idle/inactive
offlineUser has disconnected

Get Presence

# Get user presence in specific channel
ɳSelf realtime presence get user-123 general

# Get global user presence
ɳSelf realtime presence get user-123

# JSON format
ɳSelf realtime presence get user-123 general json

List Online Users

# List online users in channel
ɳSelf realtime presence online general

# List all online users (global)
ɳSelf realtime presence online

# JSON format
ɳSelf realtime presence online general json

Example Output

 user_id   | channel | status | metadata              | last_seen_at        | seconds_ago
-----------+---------+--------+-----------------------+---------------------+-------------
 user-123  | general | online | {"device": "mobile"}  | 2026-01-30 10:30:00 | 5
 user-456  | general | away   | {"device": "desktop"} | 2026-01-30 10:25:00 | 305

Count Online Users

# Count in channel
ɳSelf realtime presence count general

# Count global
ɳSelf realtime presence count

Set User Offline

# Set offline in specific channel
ɳSelf realtime presence offline user-123 general

# Set offline in all channels
ɳSelf realtime presence offline user-123

Presence Cleanup

# Cleanup users inactive for 5 minutes (default: 300s)
ɳSelf realtime presence cleanup

# Custom timeout (10 minutes)
ɳSelf realtime presence cleanup 600

Presence Statistics

nself realtime presence stats

Example Output

 online | away | offline | total_users | total_channels
--------+------+---------+-------------+----------------
 45     | 12   | 234     | 291         | 8

Connection Management

Monitor active WebSocket connections for debugging and monitoring.

View Connections

# Show all active connections
ɳSelf realtime connections

# JSON output
ɳSelf realtime connections --json

Example Output

 user_id   | connection_id        | status     | connected_at        | seconds_since_seen
-----------+----------------------+------------+---------------------+-------------------
 user-123  | conn-abc-123         | connected  | 2026-01-30 10:00:00 | 45
 user-456  | conn-def-456         | connected  | 2026-01-30 09:55:00 | 345

Cleanup Stale Connections

# Clean connections inactive for 5 minutes
ɳSelf realtime cleanup

Complete CLI Reference

The ɳSelf real-time system provides 40+ CLI commands for complete infrastructure management:

System Commands

CommandDescription
ɳSelf realtime initInitialize real-time system
ɳSelf realtime statusShow system status
ɳSelf realtime logs [--follow]Show logs
ɳSelf realtime cleanupClean up stale data
ɳSelf realtime statsShow detailed statistics

Database Subscriptions

CommandDescription
ɳSelf realtime subscribe <table> [events]Subscribe to table changes
ɳSelf realtime unsubscribe <table>Unsubscribe
ɳSelf realtime listen <table> [seconds]Listen to changes
ɳSelf realtime subscriptionsList subscriptions

Channel Management

CommandDescription
ɳSelf realtime channel create <name> [type]Create channel
ɳSelf realtime channel list [type]List channels
ɳSelf realtime channel get <id>Get channel details
ɳSelf realtime channel delete <id>Delete channel
ɳSelf realtime channel members <id>List members
ɳSelf realtime channel join <ch> <user>Add member
ɳSelf realtime channel leave <ch> <user>Remove member

Broadcasting

CommandDescription
ɳSelf realtime broadcast <ch> <event> <payload>Send message
ɳSelf realtime messages <ch> [limit]Get messages
ɳSelf realtime replay <ch> <timestamp>Replay messages
ɳSelf realtime events <ch> [hours]List event types

Presence

CommandDescription
ɳSelf realtime presence track <user> <ch> [status]Track presence
ɳSelf realtime presence get <user> [ch]Get presence
ɳSelf realtime presence online [ch]List online users
ɳSelf realtime presence count [ch]Count online
ɳSelf realtime presence offline <user> [ch]Set offline
ɳSelf realtime presence statsGet stats
ɳSelf realtime presence cleanup [timeout]Cleanup stale

Client SDK Examples

JavaScript/TypeScript

import { RealtimeClient } from '@nself/client';

const client = new RealtimeClient({
  url: 'ws://localhost:3100',
  apiKey: 'your-api-key'
});

// Subscribe to table changes
const subscription = client
  .channel('realtime:table:public.users')
  .on('INSERT', (payload) => {
    console.log('New user:', payload.new);
  })
  .on('UPDATE', (payload) => {
    console.log('Updated user:', payload.new);
  })
  .on('DELETE', (payload) => {
    console.log('Deleted user:', payload.old);
  })
  .subscribe();

// Broadcast message
const channel = client.channel('general');
channel.send({
  type: 'broadcast',
  event: 'message',
  payload: { text: 'Hello World!' }
});

// Track presence
const presence = client.channel('lobby');
presence.track({ user_id: 'user-123', status: 'online' });
presence.on('presence', { event: 'join' }, ({ key, currentPresence }) => {
  console.log('User joined:', key, currentPresence);
});

React Hook

import { useChannel } from '@nself/react';

function ChatRoom({ roomId }) {
  const { messages, send, online } = useChannel(roomId, {
    events: ['message.sent', 'user.joined', 'user.left'],
    presence: true
  });

  const sendMessage = (text: string) => {
    send('message.sent', { text, user_id: currentUser.id });
  };

  return (
    <div>
      <div>Online: {online.length}</div>
      {messages.map(msg => (
        <div key={msg.id}>{msg.payload.text}</div>
      ))}
    </div>
  );
}

PostgreSQL LISTEN (Direct)

-- Listen to channel
LISTEN "realtime:table:public.users";

-- In another session, trigger change
INSERT INTO public.users (email) VALUES ('new@example.com');

-- First session receives:
-- Asynchronous notification "realtime:table:public.users" received
-- Payload: {"operation":"INSERT","new":{...}}

Use Case Examples

Chat Application

// Set up real-time chat
const setupChat = async () => {
  // Initialize channel
  await runCommand('ɳSelf realtime channel create chat-room-1 presence');

  // Subscribe to database for message persistence
  await runCommand('ɳSelf realtime subscribe public.messages');

  // Client code
  const channel = client.channel('chat-room-1');

  // Track presence
  channel.track({ user_id: currentUser.id, status: 'online' });

  // Listen for messages
  channel.on('message.sent', (payload) => {
    displayMessage(payload);
  });

  // Send message
  const sendMessage = (text) => {
    channel.send({
      type: 'broadcast',
      event: 'message.sent',
      payload: {
        text,
        user_id: currentUser.id,
        timestamp: Date.now()
      }
    });
  };
};

Collaborative Editing

// Real-time collaborative document editing
const setupCollaboration = async (documentId: string) => {
  const channel = client.channel(`doc-${documentId}`);

  // Track user cursor position
  channel.track({
    user_id: currentUser.id,
    cursor: { line: 0, column: 0 },
    selection: null
  });

  // Broadcast operations
  const broadcastOperation = (operation: Operation) => {
    channel.send({
      type: 'broadcast',
      event: 'operation',
      payload: operation
    });
  };

  // Listen for operations from other users
  channel.on('operation', (payload) => {
    applyOperation(payload.operation);
  });

  // Show collaborators
  channel.on('presence', { event: 'sync' }, () => {
    const collaborators = channel.presenceState();
    displayCollaborators(collaborators);
  });
};

Live Dashboard

// Real-time dashboard with live metrics
const setupDashboard = async () => {
  // Subscribe to metrics tables
  await runCommand('ɳSelf realtime subscribe public.metrics');
  await runCommand('ɳSelf realtime subscribe public.events');

  // Listen for updates
  const metricsChannel = client.channel('realtime:table:public.metrics');

  metricsChannel.on('INSERT', (payload) => {
    updateChart(payload.new);
  });

  metricsChannel.on('UPDATE', (payload) => {
    refreshMetric(payload.new);
  });

  // Subscribe to alert channel
  const alertChannel = client.channel('system-alerts');
  alertChannel.on('alert', (payload) => {
    showNotification(payload);
  });
};

Multiplayer Game

// Real-time multiplayer game lobby
const setupGameLobby = async (gameId: string) => {
  const lobby = client.channel(`game-${gameId}`);

  // Track player state
  lobby.track({
    player_id: currentPlayer.id,
    status: 'ready',
    character: currentPlayer.character,
    level: currentPlayer.level
  });

  // Broadcast player actions
  const broadcastAction = (action: GameAction) => {
    lobby.send({
      type: 'broadcast',
      event: 'player.action',
      payload: {
        player_id: currentPlayer.id,
        action: action.type,
        data: action.data,
        timestamp: Date.now()
      }
    });
  };

  // Listen for other players' actions
  lobby.on('player.action', (payload) => {
    handlePlayerAction(payload);
  });

  // Monitor player presence
  lobby.on('presence', { event: 'join' }, ({ key }) => {
    announcePlayerJoined(key);
  });

  lobby.on('presence', { event: 'leave' }, ({ key }) => {
    announcePlayerLeft(key);
  });
};

Architecture

System Components

┌─────────────────────────────────────────────────────────────┐
│                       Client Applications                    │
│  (Browser, Mobile, Server-side)                             │
└─────────────────────┬───────────────────────────────────────┘
                      │ WebSocket / HTTP
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                   WebSocket Server (Optional)                │
│  - Connection handling                                       │
│  - Authentication                                            │
│  - Message routing                                           │
└─────────────────────┬───────────────────────────────────────┘
                      │ PostgreSQL LISTEN/NOTIFY
                      ▼
┌─────────────────────────────────────────────────────────────┐
│                  PostgreSQL (Core Engine)                    │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ realtime.channels      - Channel definitions         │   │
│  │ realtime.messages      - Broadcast messages          │   │
│  │ realtime.presence      - User presence               │   │
│  │ realtime.subscriptions - Table CDC config            │   │
│  │ realtime.connections   - Active connections          │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
│  ┌──────────────────────────────────────────────────────┐   │
│  │ Triggers on user tables (when subscribed)            │   │
│  │ - notify_<schema>_<table>()                          │   │
│  │ - Sends NOTIFY events on INSERT/UPDATE/DELETE        │   │
│  └──────────────────────────────────────────────────────┘   │
│                                                              │
└──────────────────────────────────────────────────────────────┘

Database Schema

-- Real-time schema tables
realtime.channels          -- Channel definitions
realtime.channel_members   -- Channel membership
realtime.messages          -- Broadcast message history
realtime.presence          -- User presence tracking
realtime.subscriptions     -- Table CDC subscriptions
realtime.connections       -- Active WebSocket connections

PostgreSQL NOTIFY Flow

  1. Table Change: INSERT INTO public.users (...)
  2. Trigger Fires: realtime_INSERT_users trigger executes
  3. Function Runs: realtime.notify_public_users() function
  4. NOTIFY Sent: pg_notify('realtime:table:public.users', payload)
  5. Clients Receive: All listeners get notification instantly

Message Format

{
  "timestamp": 1643723456,
  "operation": "INSERT",
  "schema": "public",
  "table": "users",
  "new": {
    "id": "user-123",
    "email": "john@example.com",
    "created_at": "2026-01-30T10:00:00Z"
  }
}

Best Practices

Performance

  • Cleanup Old Messages: Run regular cleanups to prevent table bloat
    # Add to cron: cleanup messages older than 24h
    0 */6 * * * ɳSelf realtime cleanup
  • Index Critical Columns: Already indexed in migration (messages.sent_at, presence.user_id)
  • Limit Message Payload Size: Keep payloads under 1MB for optimal performance
  • Use Presence Cleanup: Auto-cleanup stale presence every 5 minutes

Security

  • Channel Access Control: Use private channels for sensitive data
  • Row Level Security: Add RLS policies to realtime tables
    ALTER TABLE realtime.channels ENABLE ROW LEVEL SECURITY;
    
    CREATE POLICY "Users can only see channels they're members of"
    ON realtime.channels FOR SELECT
    USING (
      id IN (
        SELECT channel_id FROM realtime.channel_members
        WHERE user_id = auth.uid()
      )
    );
  • Validate Payloads: Always validate message payloads on the server

Scalability

  • Message Retention: Set appropriate retention periods for high-volume channels
  • Connection Limits: Monitor active connections with ɳSelf realtime connections
  • Presence Cleanup: Auto-cleanup stale presence to reduce database load

Monitoring

# Check system status
ɳSelf realtime status
ɳSelf realtime stats

# Monitor subscription health
ɳSelf realtime subscriptions

# Track message volume
ɳSelf realtime events general 24

Troubleshooting

Subscriptions Not Working

# Check if subscription exists
ɳSelf realtime subscriptions

# Verify triggers exist
psql -c "\d+ public.users"  # Should show triggers

# Test manually
ɳSelf realtime listen public.users &
psql -c "INSERT INTO public.users (email) VALUES ('test@example.com');"

Messages Not Arriving

# Check channel exists
ɳSelf realtime channel list

# Verify messages are being stored
ɳSelf realtime messages <channel> 10

# Check broadcast stats
ɳSelf realtime stats

Presence Not Updating

# Check presence records
ɳSelf realtime presence online

# Cleanup stale presence
ɳSelf realtime presence cleanup

# Track presence manually
ɳSelf realtime presence track <user> <channel> online

Production Deployment

Production Checklist

  • Set up automated cleanup cron jobs
  • Enable Row Level Security on realtime tables
  • Configure appropriate message retention periods
  • Set up monitoring and alerting
  • Test reconnection and message replay scenarios
  • Configure WebSocket server for production load

Next Steps

Now that you understand real-time communication in ɳSelf:

  • Hasura Integration - Use real-time with GraphQL subscriptions
  • Authentication - Secure real-time channels
  • Webhooks - Trigger webhooks from real-time events
  • Production Setup - Deploy real-time features to production
  • Monitoring - Monitor real-time system health

The real-time communication system provides enterprise-grade capabilities with comprehensive CLI management. Use it to build chat applications, collaborative tools, live dashboards, multiplayer games, and any application requiring instant updates.