Skip to content

Example: Real-Time Content Platform

A content moderation + publishing platform with live event delivery. Demonstrates patterns that go beyond plain REST:

  • A custom StreamingPlugin that mounts Server-Sent Events (SSE)
  • A ModerationPlugin that owns a policy registry
  • Pub/sub fan-out from Postgres NOTIFY to connected clients
  • Mixed auth (session cookies for the web app, API keys for crawlers)
  • A custom Descriptor for "rule-based content classifiers"

What This Server Does

RoutePurpose
POST /postsCreate a post (draft) — JWT auth
POST /posts/:id/publishSubmit for moderation
GET /posts/feedPublic feed of approved posts
GET /events/postsSSE stream of new posts
POST /moderation/decisionsModerator approves / rejects
POST /classifiers/runRun a custom rule classifier

Custom Descriptor: ContentClassifierDescriptor

Features can declare classifiers as data. A ModerationPlugin claims the descriptor at boot.

dart
// lib/moderation/content_classifier_descriptor.dart
class ContentClassifierDescriptor extends Descriptor {
  const ContentClassifierDescriptor({
    required this.name,
    required this.classify,
  });

  final String name;
  final Future<ClassificationResult> Function(Post post) classify;
}

class ClassificationResult {
  ClassificationResult({required this.action, this.reason});
  final ClassifierAction action; // approve | reject | flag
  final String? reason;
}

Then a feature ships classifiers:

dart
// lib/moderation/safety_classifiers_feature.dart
final safetyClassifiersFeature = FeatureDescriptor(
  name: 'app.classifiers.safety',
  descriptors: [
    ContentClassifierDescriptor(
      name: 'profanity-filter',
      classify: (post) async {
        if (profanityWords.any(post.body.contains)) {
          return ClassificationResult(
            action: ClassifierAction.flag,
            reason: 'Contains profanity',
          );
        }
        return ClassificationResult(action: ClassifierAction.approve);
      },
    ),
    ContentClassifierDescriptor(
      name: 'spam-link-density',
      classify: (post) async {
        final linkCount = RegExp(r'https?://').allMatches(post.body).length;
        if (linkCount > 5) {
          return ClassificationResult(
            action: ClassifierAction.reject,
            reason: 'Too many links',
          );
        }
        return ClassificationResult(action: ClassifierAction.approve);
      },
    ),
  ],
);

The ModerationPlugin

dart
// lib/moderation/moderation_plugin.dart
class ModerationPlugin extends Plugin
    implements DescriptorHandler<ContentClassifierDescriptor> {
  final _classifiers = <ContentClassifierDescriptor>[];

  @override
  String get name => 'app.moderation';

  @override
  Future<bool> tryHandle(Descriptor d, MountContext ctx) async {
    if (d is! ContentClassifierDescriptor) return false;
    if (ctx.phase == DispatchPhase.register) {
      _classifiers.add(d);
    }
    return true;
  }

  Future<ClassificationResult> runAll(Post post) async {
    for (final c in _classifiers) {
      final r = await c.classify(post);
      if (r.action != ClassifierAction.approve) return r;
    }
    return ClassificationResult(action: ClassifierAction.approve);
  }

  @override
  List<DiRegistration> get di => [
        DiRegistration.factory<ModerationPlugin>((_) => this),
      ];
}

A handler classifies a post:

dart
Future<Response> publishPost(Request req) async {
  final post = await loadPost(req.pathParameters['id']!);
  final moderation = vyuh.di.get<ModerationPlugin>();
  final result = await moderation.runAll(post);

  switch (result.action) {
    case ClassifierAction.approve:
      await _markPublished(post.id);
    case ClassifierAction.flag:
      await _queueForHumanReview(post.id, result.reason);
    case ClassifierAction.reject:
      throw StructuredException(
        code: 'moderation.rejected',
        message: result.reason ?? 'Rejected by classifier',
      );
  }
  return jsonResponse(body: {'success': true, 'data': {'status': 'published'}});
}

The StreamingPlugin (SSE)

Server-Sent Events is one Relic handler that holds the connection open and writes data: frames. Wrap it in a plugin so multiple features can reuse the wiring.

dart
// lib/streaming/streaming_plugin.dart
class StreamingPlugin extends Plugin {
  final _channels = <String, _StreamChannel>{};

  @override
  String get name => 'app.streaming';

  @override
  List<DiRegistration> get di => [
        DiRegistration.factory<StreamingBroker>((_) => _Broker(this)),
      ];

  StreamSubscription? _pgListener;

  @override
  Future<void> init(Object runtime) async {
    // One Postgres LISTEN per channel; fan out to attached clients.
    _pgListener = vyuh.db.listen('streaming_events').listen((payload) {
      final event = jsonDecode(payload) as Map<String, Object?>;
      final channel = event['channel'] as String;
      _channels[channel]?.broadcast(event['data']);
    });
  }

  @override
  Future<void> dispose(Object runtime) async {
    await _pgListener?.cancel();
    for (final c in _channels.values) c.close();
  }
}

A feature mounts an SSE endpoint:

dart
// lib/posts/posts_feature.dart
final postsFeature = FeatureDescriptor(
  name: 'app.posts',
  routes: [
    RouteSpec(
      method: 'GET',
      path: '/events/posts',
      handler: (req) async {
        final broker = vyuh.di.get<StreamingBroker>();
        return broker.openStream('posts.published');
      },
    ),
  ],
);

When a post is published:

dart
await vyuh.db.execute(
  "SELECT pg_notify('streaming_events', @payload)",
  parameters: {
    'payload': jsonEncode({
      'channel': 'posts.published',
      'data': {'post_id': post.id, 'title': post.title},
    }),
  },
);

Every connected SSE client receives a frame. The same fan-out works across multiple server instances — Postgres NOTIFY reaches every listener.

Wire It All Together

dart
final runtime = await VyuhServer.bootstrap(
  name: 'content-platform',
  plugins: [
    PostgresDbPlugin(),
    JwtAuthPlugin(verifier: verifyToken),       // web app
    ApiKeyAuthPlugin(lookup: lookupApiKey),     // crawlers
    OtelTelemetryPlugin(defaultServiceName: 'content-platform'),
    PostgresQueryPlugin(),
    EntityCrudPlugin(),
    OpenApiPlugin(title: 'Content Platform'),

    // Custom plugins
    ModerationPlugin(),
    StreamingPlugin(),
  ],
  features: [
    postsFeature,
    safetyClassifiersFeature,       // ships ContentClassifierDescriptors
    aiClassifiersFeature,            // another feature shipping more
    moderationDashboardFeature,
  ],
);

await runtime.start(port: 8080);

A third feature can ship more classifiers — say, a paid AI-based classifier. The ModerationPlugin claims them automatically; the publishing handler runs the full chain in order.

Patterns This Example Highlights

  • Open extension via descriptorsContentClassifierDescriptor isn't built into the framework; it's a domain-specific descriptor a feature defines and a feature ships.
  • Plugin owns a registry — the same shape as EntityCrudPlugin but for a different domain.
  • DI for handler-time access — handlers reach into the plugin's registry via vyuh.di.get<ModerationPlugin>().
  • Postgres pub/sub for cross-instance fan-outLISTEN/NOTIFY beats process-local pubsub for multi-replica deployments.
  • Multiple auth strategies — JWT for users, API key for crawlers, both on the same server.

Where to Go Next