Example: Real-Time Content Platform
A content moderation + publishing platform with live event delivery. Demonstrates patterns that go beyond plain REST:
- A custom
StreamingPluginthat mounts Server-Sent Events (SSE) - A
ModerationPluginthat owns a policy registry - Pub/sub fan-out from Postgres
NOTIFYto connected clients - Mixed auth (session cookies for the web app, API keys for crawlers)
- A custom
Descriptorfor "rule-based content classifiers"
What This Server Does
| Route | Purpose |
|---|---|
POST /posts | Create a post (draft) — JWT auth |
POST /posts/:id/publish | Submit for moderation |
GET /posts/feed | Public feed of approved posts |
GET /events/posts | SSE stream of new posts |
POST /moderation/decisions | Moderator approves / rejects |
POST /classifiers/run | Run a custom rule classifier |
Custom Descriptor: ContentClassifierDescriptor
Features can declare classifiers as data. A ModerationPlugin claims the descriptor at boot.
// lib/moderation/content_classifier_descriptor.dart
class ContentClassifierDescriptor extends Descriptor {
const ContentClassifierDescriptor({
required this.name,
required this.classify,
});
final String name;
final Future<ClassificationResult> Function(Post post) classify;
}
class ClassificationResult {
ClassificationResult({required this.action, this.reason});
final ClassifierAction action; // approve | reject | flag
final String? reason;
}Then a feature ships classifiers:
// lib/moderation/safety_classifiers_feature.dart
final safetyClassifiersFeature = FeatureDescriptor(
name: 'app.classifiers.safety',
descriptors: [
ContentClassifierDescriptor(
name: 'profanity-filter',
classify: (post) async {
if (profanityWords.any(post.body.contains)) {
return ClassificationResult(
action: ClassifierAction.flag,
reason: 'Contains profanity',
);
}
return ClassificationResult(action: ClassifierAction.approve);
},
),
ContentClassifierDescriptor(
name: 'spam-link-density',
classify: (post) async {
final linkCount = RegExp(r'https?://').allMatches(post.body).length;
if (linkCount > 5) {
return ClassificationResult(
action: ClassifierAction.reject,
reason: 'Too many links',
);
}
return ClassificationResult(action: ClassifierAction.approve);
},
),
],
);The ModerationPlugin
// lib/moderation/moderation_plugin.dart
class ModerationPlugin extends Plugin
implements DescriptorHandler<ContentClassifierDescriptor> {
final _classifiers = <ContentClassifierDescriptor>[];
@override
String get name => 'app.moderation';
@override
Future<bool> tryHandle(Descriptor d, MountContext ctx) async {
if (d is! ContentClassifierDescriptor) return false;
if (ctx.phase == DispatchPhase.register) {
_classifiers.add(d);
}
return true;
}
Future<ClassificationResult> runAll(Post post) async {
for (final c in _classifiers) {
final r = await c.classify(post);
if (r.action != ClassifierAction.approve) return r;
}
return ClassificationResult(action: ClassifierAction.approve);
}
@override
List<DiRegistration> get di => [
DiRegistration.factory<ModerationPlugin>((_) => this),
];
}A handler classifies a post:
Future<Response> publishPost(Request req) async {
final post = await loadPost(req.pathParameters['id']!);
final moderation = vyuh.di.get<ModerationPlugin>();
final result = await moderation.runAll(post);
switch (result.action) {
case ClassifierAction.approve:
await _markPublished(post.id);
case ClassifierAction.flag:
await _queueForHumanReview(post.id, result.reason);
case ClassifierAction.reject:
throw StructuredException(
code: 'moderation.rejected',
message: result.reason ?? 'Rejected by classifier',
);
}
return jsonResponse(body: {'success': true, 'data': {'status': 'published'}});
}The StreamingPlugin (SSE)
Server-Sent Events is one Relic handler that holds the connection open and writes data: frames. Wrap it in a plugin so multiple features can reuse the wiring.
// lib/streaming/streaming_plugin.dart
class StreamingPlugin extends Plugin {
final _channels = <String, _StreamChannel>{};
@override
String get name => 'app.streaming';
@override
List<DiRegistration> get di => [
DiRegistration.factory<StreamingBroker>((_) => _Broker(this)),
];
StreamSubscription? _pgListener;
@override
Future<void> init(Object runtime) async {
// One Postgres LISTEN per channel; fan out to attached clients.
_pgListener = vyuh.db.listen('streaming_events').listen((payload) {
final event = jsonDecode(payload) as Map<String, Object?>;
final channel = event['channel'] as String;
_channels[channel]?.broadcast(event['data']);
});
}
@override
Future<void> dispose(Object runtime) async {
await _pgListener?.cancel();
for (final c in _channels.values) c.close();
}
}A feature mounts an SSE endpoint:
// lib/posts/posts_feature.dart
final postsFeature = FeatureDescriptor(
name: 'app.posts',
routes: [
RouteSpec(
method: 'GET',
path: '/events/posts',
handler: (req) async {
final broker = vyuh.di.get<StreamingBroker>();
return broker.openStream('posts.published');
},
),
],
);When a post is published:
await vyuh.db.execute(
"SELECT pg_notify('streaming_events', @payload)",
parameters: {
'payload': jsonEncode({
'channel': 'posts.published',
'data': {'post_id': post.id, 'title': post.title},
}),
},
);Every connected SSE client receives a frame. The same fan-out works across multiple server instances — Postgres NOTIFY reaches every listener.
Wire It All Together
final runtime = await VyuhServer.bootstrap(
name: 'content-platform',
plugins: [
PostgresDbPlugin(),
JwtAuthPlugin(verifier: verifyToken), // web app
ApiKeyAuthPlugin(lookup: lookupApiKey), // crawlers
OtelTelemetryPlugin(defaultServiceName: 'content-platform'),
PostgresQueryPlugin(),
EntityCrudPlugin(),
OpenApiPlugin(title: 'Content Platform'),
// Custom plugins
ModerationPlugin(),
StreamingPlugin(),
],
features: [
postsFeature,
safetyClassifiersFeature, // ships ContentClassifierDescriptors
aiClassifiersFeature, // another feature shipping more
moderationDashboardFeature,
],
);
await runtime.start(port: 8080);A third feature can ship more classifiers — say, a paid AI-based classifier. The ModerationPlugin claims them automatically; the publishing handler runs the full chain in order.
Patterns This Example Highlights
- Open extension via descriptors —
ContentClassifierDescriptorisn't built into the framework; it's a domain-specific descriptor a feature defines and a feature ships. - Plugin owns a registry — the same shape as
EntityCrudPluginbut for a different domain. - DI for handler-time access — handlers reach into the plugin's registry via
vyuh.di.get<ModerationPlugin>(). - Postgres pub/sub for cross-instance fan-out —
LISTEN/NOTIFYbeats process-local pubsub for multi-replica deployments. - Multiple auth strategies — JWT for users, API key for crawlers, both on the same server.
Where to Go Next
- Descriptors — the open extension model
- Writing a Plugin — a step-by-step walkthrough
- SaaS API Example — a different shape, same primitives