Skip to content

Node Executors

Node executors execute specific node types. The engine maintains a registry mapping NodeType to its executor.

Executor Registry

NodeTypeExecutor
startStartEventNodeProcessor
endEndEventNodeProcessor
taskTaskNodeProcessor
userTaskUserTaskNodeProcessor
signalWaitSignalEventNodeProcessor
timerWaitTimerEventNodeProcessor
oneOfOneOfGatewayNodeProcessor
anyOfAnyOfGatewayNodeProcessor
allOfAllOfGatewayNodeProcessor
subflowSubflowNodeProcessor

NodeProcessor Interface

dart
abstract class NodeProcessor {
  /// The node type this executor supports
  NodeType get type;

  /// Execute the node logic
  Future<NodeResult> execute(WorkflowContext context);

  /// Handle signal received (for wait nodes)
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  );

  /// Validate node configuration
  List<String> validateNode(LeafWorkflowNode node);
}

Executor Resolution

Node executors are resolved automatically when loading workflows:

dart
// When you load a workflow, executors are resolved
final workflow = engine.loadWorkflow(jsonData);

// Each node now has its executor attached
for (final node in workflow.nodes) {
  print('${node.name}: ${node.executor?.runtimeType}');
  // Start: StartEventNodeProcessor
  // Validate: TaskNodeProcessor
  // etc.
}

The engine uses the NodeProcessorRegistry to match each node's type field to the appropriate executor.

WorkflowContext

Context provided to executors:

dart
class WorkflowContext {
  final Workflow workflow;
  final WorkflowInstance instance;
  final LeafWorkflowNode currentNode;
  final WorkflowToken currentToken;
  final List<WorkflowEdge> incomingEdges;
  final List<WorkflowEdge> outgoingEdges;

  /// Current workflow output (accumulated state)
  Map<String, dynamic> get output => instance.output;
}

Built-in Executors

StartEventNodeProcessor

dart
class StartEventNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.start;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final outgoing = context.outgoingEdges;
    if (outgoing.isEmpty) {
      return FailWorkflowResult.validation('Start node has no outgoing edges');
    }
    return ContinueResult.single(outgoing.first.targetNodeId);
  }
}

EndEventNodeProcessor

dart
class EndEventNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.end;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    return CompleteWorkflowResult(output: context.output);
  }
}

TaskNodeProcessor

dart
class TaskNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.task;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Get typed configuration
    final config = context.currentNode.config as TaskNodeConfiguration;

    // Get executor from registry by schemaType
    final executor = registry.tasks.createRequired(config.schemaType);

    // Create execution context
    final executionContext = ExecutionContext(
      workflowContext: context,
      input: context.output,
      currentNode: context.currentNode,
    );

    // Execute task
    final result = await executor.execute(executionContext);

    // Handle result
    switch (result) {
      case TaskSuccess(:final output):
        return ContinueResult.single(
          context.outgoingEdges.first.targetNodeId,
          output: output,
        );
      case TaskFailure(:final errorType, :final message):
        return FailWorkflowResult(
          errorType: errorType,
          message: message,
        );
    }
  }
}

SignalEventNodeProcessor

dart
class SignalEventNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.signalWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Typed config access
    final config = context.currentNode.config as SignalWaitNodeConfiguration;

    return WaitForSignalResult(
      signalName: config.signalName,
      storeAs: config.storeAs,
    );
  }

  @override
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  ) async {
    final config = context.currentNode.config as SignalWaitNodeConfiguration;
    final output = config.storeAs != null
        ? {config.storeAs!: payload}
        : payload ?? {};

    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: output,
    );
  }
}

NodeResult Types

Handlers return results that determine the next action:

Result TypeEngine Action
ContinueResultMove token to target node(s)
WaitForSignalResultPause, set status to waitingForSignal
WaitForUserTaskResultCreate user task, then wait
WaitForJoinResultWait for parallel branches
CompleteWorkflowResultMark workflow as completed
FailWorkflowResultMark workflow as failed

ContinueResult

dart
// Single target
return ContinueResult.single('nextNode', output: {'data': 'value'});

// Multiple targets (for parallel fork)
return ContinueResult(
  targetNodeIds: ['branchA', 'branchB', 'branchC'],
  output: {'forkedAt': DateTime.now().toIso8601String()},
);

WaitForSignalResult

dart
return WaitForSignalResult(
  signalName: 'external_callback',
  storeAs: 'callbackData',
);

WaitForJoinResult

dart
// Parallel gateway join waiting for branches
return WaitForJoinResult(
  waitingForBranches: ['branchA', 'branchB'],
);

Custom Node Executors

Create custom executors for specialized node types:

dart
class CustomTimerNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.timerWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final config = context.currentNode.config as TimerWaitNodeConfiguration;

    // Schedule timer based on configuration
    if (config.timerType == TimerType.duration) {
      await timerService.schedule(
        workflowInstanceId: context.instance.id,
        signalName: 'timer_fired',
        fireAt: DateTime.now().add(config.duration!),
      );
    } else {
      await timerService.schedule(
        workflowInstanceId: context.instance.id,
        signalName: 'timer_fired',
        fireAt: config.dateTime!,
      );
    }

    return WaitForSignalResult(
      signalName: 'timer_fired',
    );
  }
}

// Register custom executor via WorkflowDescriptor
final descriptor = WorkflowDescriptor(
  title: 'Custom Executors',
  nodeProcessors: [CustomTimerNodeProcessor()],
);

final engine = WorkflowEngine(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
  storage: storage,
);

TypedNodeProcessor

For type-safe access to node configurations, use TypedNodeProcessor<TConfig>:

dart
abstract class TypedNodeProcessor<TConfig extends NodeConfiguration>
    extends NodeProcessor {
  /// Deserialize configuration from raw JSON map
  TConfig fromConfig(Map<String, dynamic> config);

  /// Type-safe execute method with typed configuration
  Future<NodeResult> executeTyped(TConfig config, WorkflowContext context);
}

Example: Custom Gateway Executor

dart
@JsonSerializable()
class PriorityGatewayConfig extends NodeConfiguration {
  static const schemaTypeName = 'config.gateway.priority';

  const PriorityGatewayConfig({
    this.priorityField = 'priority',
    this.defaultRoute,
    this.storeAs,
  });

  factory PriorityGatewayConfig.fromJson(Map<String, dynamic> json) =>
      _$PriorityGatewayConfigFromJson(json);

  final String priorityField;
  final String? defaultRoute;

  @override
  final String? storeAs;

  @override
  String? get schemaType => schemaTypeName;

  @override
  Map<String, dynamic> toJson() => _$PriorityGatewayConfigToJson(this);
}

class PriorityGatewayExecutor extends TypedNodeProcessor<PriorityGatewayConfig> {
  @override
  NodeType get type => NodeType.oneOf;

  @override
  PriorityGatewayConfig fromConfig(Map<String, dynamic> config) =>
      PriorityGatewayConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    PriorityGatewayConfig config,
    WorkflowContext context,
  ) async {
    // Fully typed access to configuration!
    final priority = context.output[config.priorityField] as String?;

    // Find matching edge
    for (final edge in context.outgoingEdges) {
      if (edge.label?.toLowerCase() == priority?.toLowerCase()) {
        return ContinueResult.single(edge.targetNodeId);
      }
    }

    // Use default route or first edge
    final targetId = config.defaultRoute ?? context.outgoingEdges.first.targetNodeId;
    return ContinueResult.single(targetId);
  }
}

// Register the custom configuration type
final descriptor = WorkflowDescriptor(
  title: 'Custom Gateway',
  nodeProcessors: [PriorityGatewayExecutor()],
  nodeConfigurations: [
    TypeDescriptor<NodeConfiguration>(
      schemaType: PriorityGatewayConfig.schemaTypeName,
      fromJson: PriorityGatewayConfig.fromJson,
    ),
  ],
);

Benefits of TypedNodeProcessor

FeatureNodeProcessorTypedNodeProcessor
Config accessconfig['key'] castconfig.key
Type safetyRuntime castingCompile-time
IDE supportLimitedFull autocomplete
RefactoringError-proneSafe

When to Use

  • Use TypedNodeProcessor for:

    • Complex configurations with many fields
    • Custom node types with specific settings
    • Production code requiring type safety
  • Use NodeProcessor for:

    • Simple nodes with minimal config
    • Prototyping and testing
    • One-off custom executors

Type-Safe Configuration Access

With the unified model, node configurations are already type-safe. Access them via pattern matching:

dart
// In your custom executor
@override
Future<NodeResult> execute(WorkflowContext context) async {
  final node = context.currentNode;

  switch (node.config) {
    case TaskNodeConfiguration config:
      print('Task schemaType: ${config.schemaType}');
      print('Store as: ${config.storeAs}');

    case UserTaskNodeConfiguration config:
      print('User task title: ${config.title}');
      print('Assigned to: ${config.assignToRole}');

    case SignalWaitNodeConfiguration config:
      print('Waiting for signal: ${config.signalName}');

    case GatewayNodeConfiguration config:
      print('Gateway with ${config.outputPorts?.length ?? 0} ports');

    case TimerWaitNodeConfiguration config:
      print('Timer type: ${config.timerType}');

    case SubflowNodeConfiguration config:
      print('Subflow: ${config.workflowCode}');

    case NoOpNodeConfiguration _:
      print('Start/End node');
  }

  // Continue execution...
  return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
}

Next Steps