Type Registries
Type registries enable schema-driven deserialization where a schemaType field in JSON maps to a registered Dart class.
Overview
The engine uses TypeRegistry<T> to manage mappings from schemaType strings to executor classes. When a workflow JSON references a task with schemaType: 'task.email.send', the engine looks up the corresponding executor from its registries.
// Registries are populated from WorkflowDescriptors via the context
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(), // Built-in executors
myCustomDescriptor, // Your custom executors
],
);
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();TypeDescriptor
Executors are registered using TypeDescriptor:
class TypeDescriptor<T> {
const TypeDescriptor({
required this.schemaType, // Unique type identifier
required this.fromJson, // Factory function
this.title, // Human-readable name (optional)
});
}Registering Executors
Task Executor
TaskExecutor is an abstract class with schemaType as a getter:
class SendEmailTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.email.send';
// Type descriptor for registration
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => SendEmailTaskExecutor(),
title: 'Send Email',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Send Email';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final to = context.getRequired<String>('recipientEmail');
final subject = context.get<String>('subject') ?? 'Notification';
await emailService.send(to: to, subject: subject);
return TaskSuccess(output: {'sent': true, 'sentAt': DateTime.now().toIso8601String()});
}
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Email Tasks',
tasks: [SendEmailTaskExecutor.typeDescriptor],
);User Task Executor
UserTaskExecutor returns WaitForUserTaskResult (always waits for human):
class ApprovalTaskExecutor extends UserTaskExecutor {
static const _schemaType = 'userTask.approval';
static final typeDescriptor = TypeDescriptor<UserTaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ApprovalTaskExecutor(),
title: 'Approval Task',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Approval Task';
@override
Future<WaitForUserTaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final entityType = context.getRequired<String>('entityType');
final roleId = context.getRequired<String>('approverRoleId');
return WaitForUserTaskResult(
signalName: context.signalName!,
config: UserTaskConfiguration(
title: 'Approve $entityType',
description: 'Please review and approve this ${entityType.toLowerCase()}',
schemaType: schemaType,
assignedToRoleId: roleId,
priority: UserTaskPriority.high,
input: context.input,
),
);
}
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Approval Tasks',
userTasks: [ApprovalTaskExecutor.typeDescriptor],
);Condition Executor
ConditionExecutor is both configuration AND executor - it's deserialized directly from JSON:
class RequiresNextLevelCondition extends ConditionExecutor {
static const schemaName = 'condition.approval.requiresNextLevel';
static final typeDescriptor = TypeDescriptor<ConditionExecutor>(
schemaType: schemaName,
fromJson: RequiresNextLevelCondition.fromJson,
title: 'Requires Next Approval Level',
);
const RequiresNextLevelCondition({
this.maxLevels = 3,
super.name,
}) : super(schemaType: schemaName);
factory RequiresNextLevelCondition.fromJson(Map<String, dynamic> json) {
return RequiresNextLevelCondition(
maxLevels: json['maxLevels'] as int? ?? 3,
name: json['name'] as String?,
);
}
final int maxLevels;
@override
Future<bool> execute(ExecutionContext context) async {
// Use get<T> for previous node output
final currentLevel = context.get<int>('currentLevel') ?? 0;
return currentLevel < maxLevels;
}
@override
Map<String, dynamic> toJson() => {
'schemaType': schemaType,
'maxLevels': maxLevels,
if (name != null) 'name': name,
};
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Conditions',
conditions: [RequiresNextLevelCondition.typeDescriptor],
);TypeRegistry API
The TypeRegistry<T> class manages type descriptors:
class TypeRegistry<T> {
/// Register a single descriptor
void register(TypeDescriptor<T> descriptor);
/// Register multiple descriptors
void registerAll(Iterable<TypeDescriptor<T>> descriptors);
/// Check if a type is registered
bool hasType(String schemaType);
/// Get descriptor by schema type
TypeDescriptor<T>? getDescriptor(String schemaType);
/// Create instance by schema type (with optional config)
T? create(String schemaType, {Map<String, dynamic>? config});
/// Create instance, throw if not found
T createRequired(String schemaType, {Map<String, dynamic>? config});
/// Deserialize from JSON using schemaType field
T? fromJson(Map<String, dynamic>? json);
/// Deserialize, throw if type not found
T fromJsonRequired(Map<String, dynamic> json);
/// Get all registered schema types
Set<String> get registeredTypes;
/// Get all descriptors
Iterable<TypeDescriptor<T>> get descriptors;
}Deserialization Context
The engine passes a WorkflowDeserializationContext directly during JSON deserialization. This context provides access to the type registries needed to resolve schemaType fields:
// Engine handles this automatically via loadWorkflow()
final workflow = await engine.loadWorkflow(workflowId);
// Under the hood, the engine does:
final context = WorkflowDeserializationContext(registry: registry);
final workflow = Workflow.fromJson(jsonData, context: context);The fromJson methods accept the context directly:
// Workflow.fromJson passes context to nested types
factory Workflow.fromJson(
Map<String, dynamic> json, {
required WorkflowDeserializationContext context,
}) {
return Workflow(
id: json['id'] as String,
code: json['code'] as String,
nodes: (json['nodes'] as List)
.map((n) => WorkflowNode.fromJson(n, context: context))
.toList(),
edges: (json['edges'] as List)
.map((e) => WorkflowEdge.fromJson(e, context: context))
.toList(),
// ...
);
}
// NodeConfiguration.fromJson uses the context to look up types
factory NodeConfiguration.fromJson(
Map<String, dynamic> json, {
required WorkflowDeserializationContext context,
}) {
final schemaType = json['schemaType'] as String?;
// Look up the correct configuration type from registry
return context.registry.nodeConfigurations.fromJson(json)
?? EmptyNodeConfiguration();
}Always Use engine.loadWorkflow()
Direct Workflow.fromJson() calls require a WorkflowDeserializationContext. Always use engine.loadWorkflow() for automatic context handling and proper type resolution.
Node Configurations Registry
Node configurations provide type-safe access to node-specific settings. Custom configurations are registered via WorkflowDescriptor:
/// Custom node configuration for a specialized task
@JsonSerializable()
class RetryTaskNodeConfiguration extends NodeConfiguration {
static const schemaTypeName = 'config.task.retry';
const RetryTaskNodeConfiguration({
required this.schemaType,
this.maxRetries = 3,
this.retryDelayMs = 1000,
this.storeAs,
});
factory RetryTaskNodeConfiguration.fromJson(Map<String, dynamic> json) =>
_$RetryTaskNodeConfigurationFromJson(json);
@override
final String schemaType;
final int maxRetries;
final int retryDelayMs;
@override
final String? storeAs;
@override
Map<String, dynamic> toJson() => _$RetryTaskNodeConfigurationToJson(this);
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Custom Configurations',
nodeConfigurations: [
TypeDescriptor<NodeConfiguration>(
schemaType: RetryTaskNodeConfiguration.schemaTypeName,
fromJson: RetryTaskNodeConfiguration.fromJson,
title: 'Retry Task Configuration',
),
],
);Using Node Configurations
Access typed configurations via pattern matching:
for (final node in workflow.nodes) {
switch (node.config) {
case TaskNodeConfiguration config:
print('Task: ${config.schemaType}');
print('Store as: ${config.storeAs}');
case UserTaskNodeConfiguration config:
print('User Task: ${config.title}');
print('Priority: ${config.priority}');
case SignalWaitNodeConfiguration config:
print('Waiting for signal: ${config.signalName}');
case GatewayNodeConfiguration config:
print('Output ports: ${config.outputPorts?.length}');
case TimerWaitNodeConfiguration config:
print('Timer: ${config.timerType}');
case SubflowNodeConfiguration config:
print('Subflow: ${config.workflowCode}');
case EmptyNodeConfiguration _:
print('Start/End node');
case RetryTaskNodeConfiguration config:
print('Retry: max ${config.maxRetries} attempts');
}
}Built-in Node Configurations
| Configuration | Node Type | Key Properties |
|---|---|---|
TaskNodeConfiguration | task | schemaType, storeAs, input, outputPorts |
UserTaskNodeConfiguration | userTask | schemaType, title, assignToRole, priority, signalName |
SignalWaitNodeConfiguration | signalWait | signalName, storeAs, timeout |
TimerWaitNodeConfiguration | timerWait | timerType, duration, dateTime |
GatewayNodeConfiguration | oneOf/anyOf/allOf | storeAs, outputPorts |
SubflowNodeConfiguration | subflow | workflowCode, inputMappings, outputMappings |
EmptyNodeConfiguration | start/end | (none) |
Using in WorkflowBuilder
With Executor Instance
// Use executor instance directly
builder.task('sendNotification',
name: 'Send Notification',
executor: SendEmailTaskExecutor(),
);
// For user tasks
builder.userTask('approval',
name: 'Approval Decision',
executor: ApprovalTaskExecutor(),
);With Inline Execute
// Inline execute function (creates anonymous executor)
builder.task('sendNotification',
name: 'Send Notification',
execute: (ctx) async {
// inline logic
return TaskSuccess(output: {'sent': true});
},
);
// User tasks can also use inline execute
builder.userTask('approval',
name: 'Approval',
execute: (ctx) async {
return UserTaskConfiguration(
title: 'Review Request',
schemaType: 'approval',
assignedToRoleId: 'managers',
);
},
);Gateway Conditions
Expression-Based
builder.oneOf('routeDecision', [
Branch.when("decision == 'approved'", then: 'handleApproved'),
Branch.when("amount > 1000", then: 'requiresExtraApproval'),
Branch.otherwise(then: 'handleOther'),
]);Custom Condition
Conditions on edges are deserialized directly to ConditionExecutor instances:
{
"id": "e1",
"sourceNodeId": "gateway",
"targetNodeId": "nextLevel",
"condition": {
"schemaType": "condition.approval.requiresNextLevel",
"maxLevels": 3
}
}Parameterized Executors
Executors can read configuration from the node:
class SendEmailTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.email.send';
@override
String get schemaType => _schemaType;
@override
String get name => 'Send Email';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Configuration from node (design-time)
final template = context.getConfig<String>('template');
final priority = context.getConfig<String>('priority') ?? 'normal';
// Runtime data from previous node output
final recipient = context.getRequired<String>('recipientEmail');
await emailService.send(
template: template,
to: recipient,
priority: priority,
);
return TaskSuccess(output: {'sent': true});
}
}Best Practices
1. Use Namespaced Schema Types
// GOOD - namespaced and descriptive
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: 'task.notification.sendApprovalEmail',
...
);
// AVOID - generic names that may conflict
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: 'task1',
...
);2. Group Related Executors
// Create descriptors for related executors
final notificationDescriptor = WorkflowDescriptor(
title: 'Notification Tasks',
tasks: [
SendEmailTaskExecutor.typeDescriptor,
SendSmsTaskExecutor.typeDescriptor,
SendPushTaskExecutor.typeDescriptor,
],
);
final approvalDescriptor = WorkflowDescriptor(
title: 'Approval Tasks',
userTasks: [
ApprovalTaskExecutor.typeDescriptor,
ReviewTaskExecutor.typeDescriptor,
],
conditions: [
RequiresNextLevelCondition.typeDescriptor,
ApprovedCondition.typeDescriptor,
],
);
// Combine in context and create engine
final context = RegistryDeserializationContext(
descriptors: [
DefaultWorkflowDescriptor(),
notificationDescriptor,
approvalDescriptor,
],
);
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();3. Use Typed Executors for Type Safety
The workflow engine provides typed base classes for compile-time type safety:
| Base Class | Generic Parameters | Use Case |
|---|---|---|
TypedTaskExecutor<TInput, TOutput> | Input/output models | Task executors with complex I/O |
TypedUserTaskExecutor<TInput> | Input model | User tasks with typed input |
TypedConditionExecutor<TInput> | Input model | Conditions with typed input |
TypedTaskExecutor Example
// TypedTaskExecutor provides compile-time type safety for I/O
class ProcessOrderExecutor extends TypedTaskExecutor<OrderInput, OrderOutput> {
@override
String get schemaType => 'task.order.process';
@override
String get name => 'Process Order';
@override
OrderInput fromInput(Map<String, dynamic> input) =>
OrderInput.fromJson(input);
@override
Map<String, dynamic> toOutput(OrderOutput output) =>
output.toJson();
@override
Future<OrderOutput> executeTyped(OrderInput input, ExecutionContext context) async {
// Fully typed access!
final result = await orderService.process(input.orderId, input.items);
return OrderOutput(orderId: result.id, status: result.status);
}
}Next Steps
- Task Executors - Detailed task implementation
- User Task Executors - Human task executors
- WorkflowDescriptor - Descriptor organization
- WorkflowDescriptor API - Complete API reference