Realtime — vyuh_server_plugin_realtime
RealtimePlugin is the shipped SSE fan-out plugin for Vyuh Server. Features contribute RealtimeChannelDescriptors; the plugin claims them, starts each ChannelSource, and broadcasts source events to connected clients.
The Supabase source is included, but the plugin boundary is source agnostic: implement ChannelSource to feed events from polling, webhooks, queues, or another realtime backend.
Install
dependencies:
vyuh_server_plugin_realtime:
hosted: https://pub.vyuh.tech
version: ^0.2.1Wiring
RealtimePlugin needs a SupabaseClient in vyuh.di when using SupabaseChannelSource and replay. Product apps typically register that client through their own Supabase client plugin before realtime.
import 'package:vyuh_server/vyuh_server.dart';
import 'package:vyuh_server_plugin_realtime/vyuh_server_plugin_realtime.dart';
final activityFeature = FeatureDescriptor(
name: 'elog.activities',
descriptors: [
RealtimeChannelDescriptor(
topic: 'activities',
source: SupabaseChannelSource(schema: 'elog', table: 'activities'),
metadataExtractor: (record) => {
if (record['site_id'] case final siteId?) 'site_id': siteId,
if (record['status'] case final status?) 'status': status,
},
),
],
);
final runtime = await VyuhServer.bootstrap(
name: 'elog-api',
plugins: [
YourSupabaseClientPlugin(), // registers SupabaseClient in vyuh.di
RealtimePlugin(
basePath: '/live',
maxConnections: 500,
eventLogSchema: 'public',
replayLimit: 50,
),
],
features: [activityFeature],
);The plugin registers ConnectionPool, RealtimeBroadcaster, and RealtimeRegistry in vyuh.di.
Mounted Routes
| Method | Path | Purpose |
|---|---|---|
GET | /live | Authenticated SSE stream |
GET | /live/status | Connection-pool diagnostics |
Both paths are protected by scope.protect. Clients authenticate with the normal auth middleware: Authorization: Bearer <token> or ?token=<jwt> for browser EventSource compatibility.
Subscribing
The SSE endpoint requires a subscriptions query parameter: base64url-encoded JSON array of topic subscriptions.
[
{
"topic": "activities",
"filter": {
"schema_type": "cdx.query.filter.condition",
"field": "site_id",
"operator": "equals",
"value": "site-123"
}
}
]Each event contains the topic, change type, entity id when available, and metadata emitted by the descriptor's metadataExtractor. The server applies metadata filters before delivering events to the client.
Reconnects may send Last-Event-ID or ?last_event_id=. The plugin replays recent rows from <eventLogSchema>.realtime_events and emits a replayUnavailable sentinel if the gap is too old.
Custom Sources
final class QueueChannelSource implements ChannelSource {
QueueChannelSource(this.queue);
final QueueClient queue;
StreamSubscription? _sub;
@override
Future<void> start({
required void Function(SourceEvent event) onEvent,
required TelemetrySink log,
}) async {
_sub = queue.events.listen((event) {
onEvent(
SourceEvent(
changeType: event.kind,
entityId: event.id,
record: event.payload,
),
);
});
}
@override
Future<void> dispose() async {
await _sub?.cancel();
}
}Then contribute it exactly like a Supabase-backed channel:
RealtimeChannelDescriptor(
topic: 'notifications',
source: QueueChannelSource(queue),
);Where to Go Next
- Descriptors — how plugins claim feature data
- Middleware & Context — auth enforcement on protected paths
- Real-Time Content Example — end-to-end SSE shape