Skip to content

Type Registries

Type registries enable schema-driven deserialization where a schemaType field in JSON maps to a registered Dart class.

Overview

The engine uses TypeRegistry<T> to manage mappings from schemaType strings to executor classes. When a workflow JSON references a task with schemaType: 'task.email.send', the engine looks up the corresponding executor from its registries.

dart
// Registries are populated from WorkflowDescriptors via the context
final context = RegistryDeserializationContext(
  descriptors: [
    DefaultWorkflowDescriptor(),  // Built-in executors
    myCustomDescriptor,           // Your custom executors
  ],
);

final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

TypeDescriptor

Executors are registered using TypeDescriptor:

dart
class TypeDescriptor<T> {
  const TypeDescriptor({
    required this.schemaType,  // Unique type identifier
    required this.fromJson,    // Factory function
    this.title,                // Human-readable name (optional)
  });
}

Registering Executors

Task Executor

TaskExecutor is an abstract class with schemaType as a getter:

dart
class SendEmailTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.email.send';

  // Type descriptor for registration
  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => SendEmailTaskExecutor(),
    title: 'Send Email',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Send Email';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final to = context.getRequired<String>('recipientEmail');
    final subject = context.get<String>('subject') ?? 'Notification';

    await emailService.send(to: to, subject: subject);

    return TaskSuccess(output: {'sent': true, 'sentAt': DateTime.now().toIso8601String()});
  }
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Email Tasks',
  tasks: [SendEmailTaskExecutor.typeDescriptor],
);

User Task Executor

UserTaskExecutor returns WaitForUserTaskResult (always waits for human):

dart
class ApprovalTaskExecutor extends UserTaskExecutor {
  static const _schemaType = 'userTask.approval';

  static final typeDescriptor = TypeDescriptor<UserTaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ApprovalTaskExecutor(),
    title: 'Approval Task',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Approval Task';

  @override
  Future<WaitForUserTaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final entityType = context.getRequired<String>('entityType');
    final roleId = context.getRequired<String>('approverRoleId');

    return WaitForUserTaskResult(
      signalName: context.signalName!,
      config: UserTaskConfiguration(
        title: 'Approve $entityType',
        description: 'Please review and approve this ${entityType.toLowerCase()}',
        schemaType: schemaType,
        assignedToRoleId: roleId,
        priority: UserTaskPriority.high,
        input: context.input,
      ),
    );
  }
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Approval Tasks',
  userTasks: [ApprovalTaskExecutor.typeDescriptor],
);

Condition Executor

ConditionExecutor is both configuration AND executor - it's deserialized directly from JSON:

dart
class RequiresNextLevelCondition extends ConditionExecutor {
  static const schemaName = 'condition.approval.requiresNextLevel';

  static final typeDescriptor = TypeDescriptor<ConditionExecutor>(
    schemaType: schemaName,
    fromJson: RequiresNextLevelCondition.fromJson,
    title: 'Requires Next Approval Level',
  );

  const RequiresNextLevelCondition({
    this.maxLevels = 3,
    super.name,
  }) : super(schemaType: schemaName);

  factory RequiresNextLevelCondition.fromJson(Map<String, dynamic> json) {
    return RequiresNextLevelCondition(
      maxLevels: json['maxLevels'] as int? ?? 3,
      name: json['name'] as String?,
    );
  }

  final int maxLevels;

  @override
  Future<bool> execute(ExecutionContext context) async {
    // Use get<T> for previous node output
    final currentLevel = context.get<int>('currentLevel') ?? 0;
    return currentLevel < maxLevels;
  }

  @override
  Map<String, dynamic> toJson() => {
    'schemaType': schemaType,
    'maxLevels': maxLevels,
    if (name != null) 'name': name,
  };
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Conditions',
  conditions: [RequiresNextLevelCondition.typeDescriptor],
);

TypeRegistry API

The TypeRegistry<T> class manages type descriptors:

dart
class TypeRegistry<T> {
  /// Register a single descriptor
  void register(TypeDescriptor<T> descriptor);

  /// Register multiple descriptors
  void registerAll(Iterable<TypeDescriptor<T>> descriptors);

  /// Check if a type is registered
  bool hasType(String schemaType);

  /// Get descriptor by schema type
  TypeDescriptor<T>? getDescriptor(String schemaType);

  /// Create instance by schema type (with optional config)
  T? create(String schemaType, {Map<String, dynamic>? config});

  /// Create instance, throw if not found
  T createRequired(String schemaType, {Map<String, dynamic>? config});

  /// Deserialize from JSON using schemaType field
  T? fromJson(Map<String, dynamic>? json);

  /// Deserialize, throw if type not found
  T fromJsonRequired(Map<String, dynamic> json);

  /// Get all registered schema types
  Set<String> get registeredTypes;

  /// Get all descriptors
  Iterable<TypeDescriptor<T>> get descriptors;
}

Deserialization Context

The engine passes a WorkflowDeserializationContext directly during JSON deserialization. This context provides access to the type registries needed to resolve schemaType fields:

dart
// Engine handles this automatically via loadWorkflow()
final workflow = await engine.loadWorkflow(workflowId);

// Under the hood, the engine does:
final context = WorkflowDeserializationContext(registry: registry);
final workflow = Workflow.fromJson(jsonData, context: context);

The fromJson methods accept the context directly:

dart
// Workflow.fromJson passes context to nested types
factory Workflow.fromJson(
  Map<String, dynamic> json, {
  required WorkflowDeserializationContext context,
}) {
  return Workflow(
    id: json['id'] as String,
    code: json['code'] as String,
    nodes: (json['nodes'] as List)
        .map((n) => WorkflowNode.fromJson(n, context: context))
        .toList(),
    edges: (json['edges'] as List)
        .map((e) => WorkflowEdge.fromJson(e, context: context))
        .toList(),
    // ...
  );
}

// NodeConfiguration.fromJson uses the context to look up types
factory NodeConfiguration.fromJson(
  Map<String, dynamic> json, {
  required WorkflowDeserializationContext context,
}) {
  final schemaType = json['schemaType'] as String?;

  // Look up the correct configuration type from registry
  return context.registry.nodeConfigurations.fromJson(json)
      ?? EmptyNodeConfiguration();
}

Always Use engine.loadWorkflow()

Direct Workflow.fromJson() calls require a WorkflowDeserializationContext. Always use engine.loadWorkflow() for automatic context handling and proper type resolution.

Node Configurations Registry

Node configurations provide type-safe access to node-specific settings. Custom configurations are registered via WorkflowDescriptor:

dart
/// Custom node configuration for a specialized task
@JsonSerializable()
class RetryTaskNodeConfiguration extends NodeConfiguration {
  static const schemaTypeName = 'config.task.retry';

  const RetryTaskNodeConfiguration({
    required this.schemaType,
    this.maxRetries = 3,
    this.retryDelayMs = 1000,
    this.storeAs,
  });

  factory RetryTaskNodeConfiguration.fromJson(Map<String, dynamic> json) =>
      _$RetryTaskNodeConfigurationFromJson(json);

  @override
  final String schemaType;

  final int maxRetries;
  final int retryDelayMs;

  @override
  final String? storeAs;

  @override
  Map<String, dynamic> toJson() => _$RetryTaskNodeConfigurationToJson(this);
}

// Register via descriptor
final descriptor = WorkflowDescriptor(
  title: 'Custom Configurations',
  nodeConfigurations: [
    TypeDescriptor<NodeConfiguration>(
      schemaType: RetryTaskNodeConfiguration.schemaTypeName,
      fromJson: RetryTaskNodeConfiguration.fromJson,
      title: 'Retry Task Configuration',
    ),
  ],
);

Using Node Configurations

Access typed configurations via pattern matching:

dart
for (final node in workflow.nodes) {
  switch (node.config) {
    case TaskNodeConfiguration config:
      print('Task: ${config.schemaType}');
      print('Store as: ${config.storeAs}');

    case UserTaskNodeConfiguration config:
      print('User Task: ${config.title}');
      print('Priority: ${config.priority}');

    case SignalWaitNodeConfiguration config:
      print('Waiting for signal: ${config.signalName}');

    case GatewayNodeConfiguration config:
      print('Output ports: ${config.outputPorts?.length}');

    case TimerWaitNodeConfiguration config:
      print('Timer: ${config.timerType}');

    case SubflowNodeConfiguration config:
      print('Subflow: ${config.workflowCode}');

    case EmptyNodeConfiguration _:
      print('Start/End node');

    case RetryTaskNodeConfiguration config:
      print('Retry: max ${config.maxRetries} attempts');
  }
}

Built-in Node Configurations

ConfigurationNode TypeKey Properties
TaskNodeConfigurationtaskschemaType, storeAs, input, outputPorts
UserTaskNodeConfigurationuserTaskschemaType, title, assignToRole, priority, signalName
SignalWaitNodeConfigurationsignalWaitsignalName, storeAs, timeout
TimerWaitNodeConfigurationtimerWaittimerType, duration, dateTime
GatewayNodeConfigurationoneOf/anyOf/allOfstoreAs, outputPorts
SubflowNodeConfigurationsubflowworkflowCode, inputMappings, outputMappings
EmptyNodeConfigurationstart/end(none)

Using in WorkflowBuilder

With Executor Instance

dart
// Use executor instance directly
builder.task('sendNotification',
  name: 'Send Notification',
  executor: SendEmailTaskExecutor(),
);

// For user tasks
builder.userTask('approval',
  name: 'Approval Decision',
  executor: ApprovalTaskExecutor(),
);

With Inline Execute

dart
// Inline execute function (creates anonymous executor)
builder.task('sendNotification',
  name: 'Send Notification',
  execute: (ctx) async {
    // inline logic
    return TaskSuccess(output: {'sent': true});
  },
);

// User tasks can also use inline execute
builder.userTask('approval',
  name: 'Approval',
  execute: (ctx) async {
    return UserTaskConfiguration(
      title: 'Review Request',
      schemaType: 'approval',
      assignedToRoleId: 'managers',
    );
  },
);

Gateway Conditions

Expression-Based

dart
builder.oneOf('routeDecision', [
  Branch.when("decision == 'approved'", then: 'handleApproved'),
  Branch.when("amount > 1000", then: 'requiresExtraApproval'),
  Branch.otherwise(then: 'handleOther'),
]);

Custom Condition

Conditions on edges are deserialized directly to ConditionExecutor instances:

json
{
  "id": "e1",
  "sourceNodeId": "gateway",
  "targetNodeId": "nextLevel",
  "condition": {
    "schemaType": "condition.approval.requiresNextLevel",
    "maxLevels": 3
  }
}

Parameterized Executors

Executors can read configuration from the node:

dart
class SendEmailTaskExecutor extends TaskExecutor {
  static const _schemaType = 'task.email.send';

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Send Email';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Configuration from node (design-time)
    final template = context.getConfig<String>('template');
    final priority = context.getConfig<String>('priority') ?? 'normal';

    // Runtime data from previous node output
    final recipient = context.getRequired<String>('recipientEmail');

    await emailService.send(
      template: template,
      to: recipient,
      priority: priority,
    );

    return TaskSuccess(output: {'sent': true});
  }
}

Best Practices

1. Use Namespaced Schema Types

dart
// GOOD - namespaced and descriptive
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
  schemaType: 'task.notification.sendApprovalEmail',
  ...
);

// AVOID - generic names that may conflict
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
  schemaType: 'task1',
  ...
);
dart
// Create descriptors for related executors
final notificationDescriptor = WorkflowDescriptor(
  title: 'Notification Tasks',
  tasks: [
    SendEmailTaskExecutor.typeDescriptor,
    SendSmsTaskExecutor.typeDescriptor,
    SendPushTaskExecutor.typeDescriptor,
  ],
);

final approvalDescriptor = WorkflowDescriptor(
  title: 'Approval Tasks',
  userTasks: [
    ApprovalTaskExecutor.typeDescriptor,
    ReviewTaskExecutor.typeDescriptor,
  ],
  conditions: [
    RequiresNextLevelCondition.typeDescriptor,
    ApprovedCondition.typeDescriptor,
  ],
);

// Combine in context and create engine
final context = RegistryDeserializationContext(
  descriptors: [
    DefaultWorkflowDescriptor(),
    notificationDescriptor,
    approvalDescriptor,
  ],
);

final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

3. Use Typed Executors for Type Safety

The workflow engine provides typed base classes for compile-time type safety:

Base ClassGeneric ParametersUse Case
TypedTaskExecutor<TInput, TOutput>Input/output modelsTask executors with complex I/O
TypedUserTaskExecutor<TInput>Input modelUser tasks with typed input
TypedConditionExecutor<TInput>Input modelConditions with typed input

TypedTaskExecutor Example

dart
// TypedTaskExecutor provides compile-time type safety for I/O
class ProcessOrderExecutor extends TypedTaskExecutor<OrderInput, OrderOutput> {
  @override
  String get schemaType => 'task.order.process';

  @override
  String get name => 'Process Order';

  @override
  OrderInput fromInput(Map<String, dynamic> input) =>
      OrderInput.fromJson(input);

  @override
  Map<String, dynamic> toOutput(OrderOutput output) =>
      output.toJson();

  @override
  Future<OrderOutput> executeTyped(OrderInput input, ExecutionContext context) async {
    // Fully typed access!
    final result = await orderService.process(input.orderId, input.items);
    return OrderOutput(orderId: result.id, status: result.status);
  }
}

Next Steps