NodeProcessor API
Base class for node executors. Node executors handle specific node types in the workflow graph.
NodeProcessor
abstractClass Signature
Methods
executereturnsFuture<NodeResult>Future<NodeResult> execute(WorkflowContext context)Execute the node and return the result.
onSignalReceivedreturnsFuture<NodeResult>Future<NodeResult> onSignalReceived(WorkflowContext context, String signalName, Map<String, dynamic>? payload)Handle a received signal (used by wait nodes).
validateNodereturnsList<String>List<String> validateNode(LeafWorkflowNode node)Validate node configuration. Returns a list of validation errors (empty if valid).
TypedNodeProcessor
abstractClass Signature
Type-safe base class with compile-time safety for node configuration.
WorkflowContext
Class Signature
Properties
| Property | Type | Default | Description |
|---|---|---|---|
workflow | Workflow | -- | The complete workflow definition |
instance | WorkflowInstance | -- | Current workflow instance state |
currentNode | LeafWorkflowNode | -- | The node being executed |
currentToken | WorkflowToken | -- | Current execution token |
incomingEdges | List<Edge> | -- | Edges leading into this node |
outgoingEdges | List<Edge> | -- | Edges leading out of this node |
output | Map<String, dynamic> | -- | Current workflow output (accumulated state) |
Context provided to node executors.
NodeResult Types
| Result Type | Engine Action |
|---|---|
ContinueResult | Move token to target node(s) |
WaitForSignalResult | Pause, set status to waitingForSignal |
WaitForUserTaskResult | Create user task, then wait |
WaitForJoinResult | Wait for parallel branches |
CompleteWorkflowResult | Mark workflow as completed |
FailWorkflowResult | Mark workflow as failed |
ContinueResult
Class Signature
WaitForSignalResult
Class Signature
FailWorkflowResult
Class Signature
Built-in Node Executors
| Executor | Node Type | Description |
|---|---|---|
StartEventNodeProcessor | start | Entry point, continues to first node |
EndEventNodeProcessor | end | Terminal point, completes workflow |
TaskNodeProcessor | task | Executes task via TaskExecutor |
UserTaskNodeProcessor | userTask | Creates user task, waits for signal |
SignalEventNodeProcessor | signalWait | Waits for external signal |
OneOfGatewayNodeProcessor | oneOf | Exclusive routing (XOR) |
AnyOfGatewayNodeProcessor | anyOf | Race routing (first wins) |
AllOfGatewayNodeProcessor | allOf | Parallel routing (AND) |
SubflowNodeProcessor | subflow | Invokes child workflow |
TimerEventNodeProcessor | timerWait | Waits for timer |
StartEventNodeProcessor
dart
class StartEventNodeProcessor extends NodeProcessor {
@override
NodeType get type => NodeType.start;
@override
Future<NodeResult> execute(WorkflowContext context) async {
final outgoing = context.outgoingEdges;
if (outgoing.isEmpty) {
return FailWorkflowResult.validation('Start node has no outgoing edges');
}
return ContinueResult.single(outgoing.first.targetNodeId);
}
}EndEventNodeProcessor
dart
class EndEventNodeProcessor extends NodeProcessor {
@override
NodeType get type => NodeType.end;
@override
Future<NodeResult> execute(WorkflowContext context) async {
return CompleteWorkflowResult(output: context.output);
}
}Examples
Custom NodeProcessor
dart
class TimerNodeProcessor extends NodeProcessor {
@override
NodeType get type => NodeType.timerWait;
@override
Future<NodeResult> execute(WorkflowContext context) async {
// Get typed configuration
final config = context.currentNode.config as TimerWaitNodeConfiguration;
// Calculate fire time based on config
DateTime fireAt;
if (config.timerType == TimerType.duration) {
fireAt = DateTime.now().add(config.duration!);
} else {
fireAt = config.dateTime!;
}
await timerService.schedule(
workflowInstanceId: context.instance.id,
signalName: 'timer_fired',
fireAt: fireAt,
);
return WaitForSignalResult(signalName: 'timer_fired');
}
@override
Future<NodeResult> onSignalReceived(
WorkflowContext context,
String signalName,
Map<String, dynamic>? payload,
) async {
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: {'timerFired': true, 'firedAt': DateTime.now().toIso8601String()},
);
}
}TypedNodeProcessor
dart
@JsonSerializable()
class RetryTaskConfig extends NodeConfiguration {
static const schemaTypeName = 'config.task.retry';
const RetryTaskConfig({
required this.schemaType,
this.maxRetries = 3,
this.retryDelayMs = 1000,
this.storeAs,
});
factory RetryTaskConfig.fromJson(Map<String, dynamic> json) =>
_$RetryTaskConfigFromJson(json);
@override
final String schemaType;
final int maxRetries;
final int retryDelayMs;
@override
final String? storeAs;
@override
Map<String, dynamic> toJson() => _$RetryTaskConfigToJson(this);
}
class RetryTaskNodeProcessor extends TypedNodeProcessor<RetryTaskConfig> {
@override
NodeType get type => NodeType.task;
@override
RetryTaskConfig fromConfig(Map<String, dynamic> config) =>
RetryTaskConfig.fromJson(config);
@override
Future<NodeResult> executeTyped(
RetryTaskConfig config,
WorkflowContext context,
) async {
// Fully typed access to configuration!
for (var attempt = 1; attempt <= config.maxRetries; attempt++) {
try {
final result = await executeTask(context);
return result;
} catch (e) {
if (attempt == config.maxRetries) {
return FailWorkflowResult.internal(
'Task failed after ${config.maxRetries} retries: $e',
);
}
await Future.delayed(Duration(milliseconds: config.retryDelayMs));
}
}
throw StateError('Unreachable');
}
Future<NodeResult> executeTask(WorkflowContext context) async {
// Actual task implementation
return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
}
}Batch Processing Node
dart
@JsonSerializable()
class BatchConfig extends NodeConfiguration {
const BatchConfig({
required this.schemaType,
this.batchSize = 100,
this.parallelism = 4,
this.storeAs,
});
factory BatchConfig.fromJson(Map<String, dynamic> json) =>
_$BatchConfigFromJson(json);
@override
final String schemaType;
final int batchSize;
final int parallelism;
@override
final String? storeAs;
@override
Map<String, dynamic> toJson() => _$BatchConfigToJson(this);
}
class BatchProcessingNodeProcessor extends TypedNodeProcessor<BatchConfig> {
@override
NodeType get type => NodeType.task;
@override
BatchConfig fromConfig(Map<String, dynamic> config) =>
BatchConfig.fromJson(config);
@override
Future<NodeResult> executeTyped(
BatchConfig config,
WorkflowContext context,
) async {
final items = context.output['items'] as List? ?? [];
// Process in batches with typed config
for (var i = 0; i < items.length; i += config.batchSize) {
final batch = items.skip(i).take(config.batchSize).toList();
await processInParallel(batch, config.parallelism);
}
return ContinueResult.single(
context.outgoingEdges.first.targetNodeId,
output: {'processed': items.length},
);
}
}Registration
dart
final descriptor = WorkflowDescriptor(
title: 'Custom Node Executors',
nodeProcessors: [
RetryTaskNodeProcessor(),
BatchProcessingNodeProcessor(),
],
);
// Create deserialization context with all descriptors
final context = RegistryTypeResolver(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();Comparison
| Feature | NodeProcessor | TypedNodeProcessor |
|---|---|---|
| Config access | context.currentNode.config['key'] | config.key |
| Type safety | Runtime | Compile-time |
| IDE support | Limited | Full autocomplete |
| Error handling | Manual | Auto-wrapped |
When to Use
Use NodeProcessor
- Simple nodes with minimal configuration
- Built-in node type overrides
- Prototyping and testing
- One-off custom node types
Use TypedNodeProcessor
- Complex configurations with many fields
- Custom node types with specific settings
- Production code requiring type safety
- Reusable node executor libraries
Best Practices
- Override buildNode for custom node building logic
- Implement onSignalReceived for wait nodes
- Use TypedNodeProcessor for complex configurations
- Return detailed FailWorkflowResult on errors
- Register via WorkflowDescriptor for engine integration
See Also
- Node Executors - Detailed guide
- Type Registries - Registration patterns
- NodeResult - Result types
- WorkflowDescriptor - Registration API