Skip to content

NodeExecutor API

Base class for node executors. Node executors handle specific node types in the workflow graph.

NodeExecutor

dart
abstract class NodeExecutor {
  /// The node type this executor supports
  NodeType get type;

  /// Execute the node and return the result
  Future<NodeResult> execute(WorkflowContext context);

  /// Handle signal received (for wait nodes)
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  );

  /// Validate node configuration
  List<String> validateNode(WorkflowNode node) => [];
}

TypedNodeExecutor

Type-safe base class with compile-time safety for node configuration:

dart
abstract class TypedNodeExecutor<TConfig extends NodeConfiguration>
    extends NodeExecutor {
  /// Deserialize configuration from raw JSON map
  TConfig fromConfig(Map<String, dynamic> config);

  /// Type-safe execute method with typed configuration
  Future<NodeResult> executeTyped(TConfig config, WorkflowContext context);
}

WorkflowContext

Context provided to node executors:

dart
class WorkflowContext {
  /// The complete workflow definition
  final Workflow workflow;

  /// Current workflow instance state
  final WorkflowInstance instance;

  /// The node being executed
  final WorkflowNode currentNode;

  /// Current execution token
  final WorkflowToken currentToken;

  /// Edges leading into this node
  final List<Edge> incomingEdges;

  /// Edges leading out of this node
  final List<Edge> outgoingEdges;

  /// Current workflow output (accumulated state)
  Map<String, dynamic> get output => instance.output;
}

NodeResult Types

Result TypeEngine Action
ContinueResultMove token to target node(s)
WaitForSignalResultPause, set status to waitingForSignal
WaitForUserTaskResultCreate user task, then wait
WaitForJoinResultWait for parallel branches
CompleteWorkflowResultMark workflow as completed
FailWorkflowResultMark workflow as failed

ContinueResult

dart
class ContinueResult extends NodeResult {
  const ContinueResult({
    required this.targetNodeIds,
    this.output = const {},
  });

  factory ContinueResult.single(String targetNodeId, {Map<String, dynamic>? output});

  final List<String> targetNodeIds;
  final Map<String, dynamic> output;
}

WaitForSignalResult

dart
class WaitForSignalResult extends NodeResult {
  const WaitForSignalResult({
    required this.signalName,
    this.storeAs,
    this.timeout,
  });

  final String signalName;
  final String? storeAs;
  final Duration? timeout;
}

FailWorkflowResult

dart
class FailWorkflowResult extends NodeResult {
  const FailWorkflowResult({
    required this.errorType,
    required this.message,
    this.isRetryable = true,
    this.details,
  });

  // Factory constructors
  factory FailWorkflowResult.validation(String message, {...});
  factory FailWorkflowResult.internal(String message, {...});
  factory FailWorkflowResult.timeout(String message, {...});

  final ErrorType errorType;
  final String message;
  final bool isRetryable;
  final Map<String, dynamic>? details;
}

Built-in Node Executors

ExecutorNode TypeDescription
StartEventNodeExecutorstartEntry point, continues to first node
EndEventNodeExecutorendTerminal point, completes workflow
TaskNodeExecutortaskExecutes task via TaskExecutor
UserTaskNodeExecutoruserTaskCreates user task, waits for signal
SignalEventNodeExecutorsignalWaitWaits for external signal
OneOfGatewayNodeExecutoroneOfExclusive routing (XOR)
AnyOfGatewayNodeExecutoranyOfRace routing (first wins)
AllOfGatewayNodeExecutorallOfParallel routing (AND)
SubflowNodeExecutorsubflowInvokes child workflow
TimerEventNodeExecutortimerWaitWaits for timer

StartEventNodeExecutor

dart
class StartEventNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.start;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final outgoing = context.outgoingEdges;
    if (outgoing.isEmpty) {
      return FailWorkflowResult.validation('Start node has no outgoing edges');
    }
    return ContinueResult.single(outgoing.first.targetNodeId);
  }
}

EndEventNodeExecutor

dart
class EndEventNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.end;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    return CompleteWorkflowResult(output: context.output);
  }
}

Examples

Custom NodeExecutor

dart
class TimerNodeExecutor extends NodeExecutor {
  @override
  NodeType get type => NodeType.timerWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Get typed configuration
    final config = context.currentNode.config as TimerWaitNodeConfiguration;

    // Calculate fire time based on config
    DateTime fireAt;
    if (config.timerType == TimerType.duration) {
      fireAt = DateTime.now().add(config.duration!);
    } else {
      fireAt = config.dateTime!;
    }

    await timerService.schedule(
      workflowInstanceId: context.instance.id,
      signalName: 'timer_fired',
      fireAt: fireAt,
    );

    return WaitForSignalResult(signalName: 'timer_fired');
  }

  @override
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  ) async {
    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: {'timerFired': true, 'firedAt': DateTime.now().toIso8601String()},
    );
  }
}

TypedNodeExecutor

dart
@JsonSerializable()
class RetryTaskConfig extends NodeConfiguration {
  static const schemaTypeName = 'config.task.retry';

  const RetryTaskConfig({
    required this.schemaType,
    this.maxRetries = 3,
    this.retryDelayMs = 1000,
    this.storeAs,
  });

  factory RetryTaskConfig.fromJson(Map<String, dynamic> json) =>
      _$RetryTaskConfigFromJson(json);

  @override
  final String schemaType;

  final int maxRetries;
  final int retryDelayMs;

  @override
  final String? storeAs;

  @override
  Map<String, dynamic> toJson() => _$RetryTaskConfigToJson(this);
}

class RetryTaskNodeExecutor extends TypedNodeExecutor<RetryTaskConfig> {
  @override
  NodeType get type => NodeType.task;

  @override
  RetryTaskConfig fromConfig(Map<String, dynamic> config) =>
      RetryTaskConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    RetryTaskConfig config,
    WorkflowContext context,
  ) async {
    // Fully typed access to configuration!
    for (var attempt = 1; attempt <= config.maxRetries; attempt++) {
      try {
        final result = await executeTask(context);
        return result;
      } catch (e) {
        if (attempt == config.maxRetries) {
          return FailWorkflowResult.internal(
            'Task failed after ${config.maxRetries} retries: $e',
          );
        }
        await Future.delayed(Duration(milliseconds: config.retryDelayMs));
      }
    }
    throw StateError('Unreachable');
  }

  Future<NodeResult> executeTask(WorkflowContext context) async {
    // Actual task implementation
    return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
  }
}

Batch Processing Node

dart
@JsonSerializable()
class BatchConfig extends NodeConfiguration {
  const BatchConfig({
    required this.schemaType,
    this.batchSize = 100,
    this.parallelism = 4,
    this.storeAs,
  });

  factory BatchConfig.fromJson(Map<String, dynamic> json) =>
      _$BatchConfigFromJson(json);

  @override
  final String schemaType;
  final int batchSize;
  final int parallelism;
  @override
  final String? storeAs;

  @override
  Map<String, dynamic> toJson() => _$BatchConfigToJson(this);
}

class BatchProcessingNodeExecutor extends TypedNodeExecutor<BatchConfig> {
  @override
  NodeType get type => NodeType.task;

  @override
  BatchConfig fromConfig(Map<String, dynamic> config) =>
      BatchConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    BatchConfig config,
    WorkflowContext context,
  ) async {
    final items = context.output['items'] as List? ?? [];

    // Process in batches with typed config
    for (var i = 0; i < items.length; i += config.batchSize) {
      final batch = items.skip(i).take(config.batchSize).toList();
      await processInParallel(batch, config.parallelism);
    }

    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: {'processed': items.length},
    );
  }
}

Registration

dart
final descriptor = WorkflowDescriptor(
  title: 'Custom Node Executors',
  nodeExecutors: [
    RetryTaskNodeExecutor(),
    BatchProcessingNodeExecutor(),
  ],
);

// Create deserialization context with all descriptors
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

Comparison

FeatureNodeExecutorTypedNodeExecutor
Config accesscontext.currentNode.config['key']config.key
Type safetyRuntimeCompile-time
IDE supportLimitedFull autocomplete
Error handlingManualAuto-wrapped

When to Use

Use NodeExecutor

  • Simple nodes with minimal configuration
  • Built-in node type overrides
  • Prototyping and testing
  • One-off custom node types

Use TypedNodeExecutor

  • Complex configurations with many fields
  • Custom node types with specific settings
  • Production code requiring type safety
  • Reusable node executor libraries

Best Practices

  1. Override buildNode for custom node building logic
  2. Implement onSignalReceived for wait nodes
  3. Use TypedNodeExecutor for complex configurations
  4. Return detailed FailWorkflowResult on errors
  5. Register via WorkflowDescriptor for engine integration

See Also