Skip to content

Realtime — vyuh_server_plugin_realtime

RealtimePlugin is the shipped SSE fan-out plugin for Vyuh Server. Features contribute RealtimeChannelDescriptors; the plugin claims them, starts each ChannelSource, and broadcasts source events to connected clients.

The Supabase source is included, but the plugin boundary is source agnostic: implement ChannelSource to feed events from polling, webhooks, queues, or another realtime backend.

Install

yaml
dependencies:
  vyuh_server_plugin_realtime:
    hosted: https://pub.vyuh.tech
    version: ^0.2.1

Wiring

RealtimePlugin needs a SupabaseClient in vyuh.di when using SupabaseChannelSource and replay. Product apps typically register that client through their own Supabase client plugin before realtime.

dart
import 'package:vyuh_server/vyuh_server.dart';
import 'package:vyuh_server_plugin_realtime/vyuh_server_plugin_realtime.dart';

final activityFeature = FeatureDescriptor(
  name: 'elog.activities',
  descriptors: [
    RealtimeChannelDescriptor(
      topic: 'activities',
      source: SupabaseChannelSource(schema: 'elog', table: 'activities'),
      metadataExtractor: (record) => {
        if (record['site_id'] case final siteId?) 'site_id': siteId,
        if (record['status'] case final status?) 'status': status,
      },
    ),
  ],
);

final runtime = await VyuhServer.bootstrap(
  name: 'elog-api',
  plugins: [
    YourSupabaseClientPlugin(), // registers SupabaseClient in vyuh.di
    RealtimePlugin(
      basePath: '/live',
      maxConnections: 500,
      eventLogSchema: 'public',
      replayLimit: 50,
    ),
  ],
  features: [activityFeature],
);

The plugin registers ConnectionPool, RealtimeBroadcaster, and RealtimeRegistry in vyuh.di.

Mounted Routes

MethodPathPurpose
GET/liveAuthenticated SSE stream
GET/live/statusConnection-pool diagnostics

Both paths are protected by scope.protect. Clients authenticate with the normal auth middleware: Authorization: Bearer <token> or ?token=<jwt> for browser EventSource compatibility.

Subscribing

The SSE endpoint requires a subscriptions query parameter: base64url-encoded JSON array of topic subscriptions.

json
[
  {
    "topic": "activities",
    "filter": {
      "schema_type": "cdx.query.filter.condition",
      "field": "site_id",
      "operator": "equals",
      "value": "site-123"
    }
  }
]

Each event contains the topic, change type, entity id when available, and metadata emitted by the descriptor's metadataExtractor. The server applies metadata filters before delivering events to the client.

Reconnects may send Last-Event-ID or ?last_event_id=. The plugin replays recent rows from <eventLogSchema>.realtime_events and emits a replayUnavailable sentinel if the gap is too old.

Custom Sources

dart
final class QueueChannelSource implements ChannelSource {
  QueueChannelSource(this.queue);

  final QueueClient queue;
  StreamSubscription? _sub;

  @override
  Future<void> start({
    required void Function(SourceEvent event) onEvent,
    required TelemetrySink log,
  }) async {
    _sub = queue.events.listen((event) {
      onEvent(
        SourceEvent(
          changeType: event.kind,
          entityId: event.id,
          record: event.payload,
        ),
      );
    });
  }

  @override
  Future<void> dispose() async {
    await _sub?.cancel();
  }
}

Then contribute it exactly like a Supabase-backed channel:

dart
RealtimeChannelDescriptor(
  topic: 'notifications',
  source: QueueChannelSource(queue),
);

Where to Go Next