Skip to content

NodeProcessor API

Base class for node executors. Node executors handle specific node types in the workflow graph.

NodeProcessor

abstract

Class Signature

Methods

  • execute returns Future<NodeResult>
    Future<NodeResult> execute(WorkflowContext context)

    Execute the node and return the result.

  • onSignalReceived returns Future<NodeResult>
    Future<NodeResult> onSignalReceived(WorkflowContext context, String signalName, Map<String, dynamic>? payload)

    Handle a received signal (used by wait nodes).

  • validateNode returns List<String>
    List<String> validateNode(LeafWorkflowNode node)

    Validate node configuration. Returns a list of validation errors (empty if valid).


TypedNodeProcessor

abstract

Class Signature

Type-safe base class with compile-time safety for node configuration.


WorkflowContext

Class Signature

Properties

PropertyTypeDefaultDescription
workflowWorkflow--The complete workflow definition
instanceWorkflowInstance--Current workflow instance state
currentNodeLeafWorkflowNode--The node being executed
currentTokenWorkflowToken--Current execution token
incomingEdgesList<Edge>--Edges leading into this node
outgoingEdgesList<Edge>--Edges leading out of this node
outputMap<String, dynamic>--Current workflow output (accumulated state)

Context provided to node executors.


NodeResult Types

Result TypeEngine Action
ContinueResultMove token to target node(s)
WaitForSignalResultPause, set status to waitingForSignal
WaitForUserTaskResultCreate user task, then wait
WaitForJoinResultWait for parallel branches
CompleteWorkflowResultMark workflow as completed
FailWorkflowResultMark workflow as failed

ContinueResult

Class Signature

WaitForSignalResult

Class Signature

FailWorkflowResult

Class Signature

Built-in Node Executors

ExecutorNode TypeDescription
StartEventNodeProcessorstartEntry point, continues to first node
EndEventNodeProcessorendTerminal point, completes workflow
TaskNodeProcessortaskExecutes task via TaskExecutor
UserTaskNodeProcessoruserTaskCreates user task, waits for signal
SignalEventNodeProcessorsignalWaitWaits for external signal
OneOfGatewayNodeProcessoroneOfExclusive routing (XOR)
AnyOfGatewayNodeProcessoranyOfRace routing (first wins)
AllOfGatewayNodeProcessorallOfParallel routing (AND)
SubflowNodeProcessorsubflowInvokes child workflow
TimerEventNodeProcessortimerWaitWaits for timer

StartEventNodeProcessor

dart
class StartEventNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.start;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    final outgoing = context.outgoingEdges;
    if (outgoing.isEmpty) {
      return FailWorkflowResult.validation('Start node has no outgoing edges');
    }
    return ContinueResult.single(outgoing.first.targetNodeId);
  }
}

EndEventNodeProcessor

dart
class EndEventNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.end;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    return CompleteWorkflowResult(output: context.output);
  }
}

Examples

Custom NodeProcessor

dart
class TimerNodeProcessor extends NodeProcessor {
  @override
  NodeType get type => NodeType.timerWait;

  @override
  Future<NodeResult> execute(WorkflowContext context) async {
    // Get typed configuration
    final config = context.currentNode.config as TimerWaitNodeConfiguration;

    // Calculate fire time based on config
    DateTime fireAt;
    if (config.timerType == TimerType.duration) {
      fireAt = DateTime.now().add(config.duration!);
    } else {
      fireAt = config.dateTime!;
    }

    await timerService.schedule(
      workflowInstanceId: context.instance.id,
      signalName: 'timer_fired',
      fireAt: fireAt,
    );

    return WaitForSignalResult(signalName: 'timer_fired');
  }

  @override
  Future<NodeResult> onSignalReceived(
    WorkflowContext context,
    String signalName,
    Map<String, dynamic>? payload,
  ) async {
    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: {'timerFired': true, 'firedAt': DateTime.now().toIso8601String()},
    );
  }
}

TypedNodeProcessor

dart
@JsonSerializable()
class RetryTaskConfig extends NodeConfiguration {
  static const schemaTypeName = 'config.task.retry';

  const RetryTaskConfig({
    required this.schemaType,
    this.maxRetries = 3,
    this.retryDelayMs = 1000,
    this.storeAs,
  });

  factory RetryTaskConfig.fromJson(Map<String, dynamic> json) =>
      _$RetryTaskConfigFromJson(json);

  @override
  final String schemaType;

  final int maxRetries;
  final int retryDelayMs;

  @override
  final String? storeAs;

  @override
  Map<String, dynamic> toJson() => _$RetryTaskConfigToJson(this);
}

class RetryTaskNodeProcessor extends TypedNodeProcessor<RetryTaskConfig> {
  @override
  NodeType get type => NodeType.task;

  @override
  RetryTaskConfig fromConfig(Map<String, dynamic> config) =>
      RetryTaskConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    RetryTaskConfig config,
    WorkflowContext context,
  ) async {
    // Fully typed access to configuration!
    for (var attempt = 1; attempt <= config.maxRetries; attempt++) {
      try {
        final result = await executeTask(context);
        return result;
      } catch (e) {
        if (attempt == config.maxRetries) {
          return FailWorkflowResult.internal(
            'Task failed after ${config.maxRetries} retries: $e',
          );
        }
        await Future.delayed(Duration(milliseconds: config.retryDelayMs));
      }
    }
    throw StateError('Unreachable');
  }

  Future<NodeResult> executeTask(WorkflowContext context) async {
    // Actual task implementation
    return ContinueResult.single(context.outgoingEdges.first.targetNodeId);
  }
}

Batch Processing Node

dart
@JsonSerializable()
class BatchConfig extends NodeConfiguration {
  const BatchConfig({
    required this.schemaType,
    this.batchSize = 100,
    this.parallelism = 4,
    this.storeAs,
  });

  factory BatchConfig.fromJson(Map<String, dynamic> json) =>
      _$BatchConfigFromJson(json);

  @override
  final String schemaType;
  final int batchSize;
  final int parallelism;
  @override
  final String? storeAs;

  @override
  Map<String, dynamic> toJson() => _$BatchConfigToJson(this);
}

class BatchProcessingNodeProcessor extends TypedNodeProcessor<BatchConfig> {
  @override
  NodeType get type => NodeType.task;

  @override
  BatchConfig fromConfig(Map<String, dynamic> config) =>
      BatchConfig.fromJson(config);

  @override
  Future<NodeResult> executeTyped(
    BatchConfig config,
    WorkflowContext context,
  ) async {
    final items = context.output['items'] as List? ?? [];

    // Process in batches with typed config
    for (var i = 0; i < items.length; i += config.batchSize) {
      final batch = items.skip(i).take(config.batchSize).toList();
      await processInParallel(batch, config.parallelism);
    }

    return ContinueResult.single(
      context.outgoingEdges.first.targetNodeId,
      output: {'processed': items.length},
    );
  }
}

Registration

dart
final descriptor = WorkflowDescriptor(
  title: 'Custom Node Executors',
  nodeProcessors: [
    RetryTaskNodeProcessor(),
    BatchProcessingNodeProcessor(),
  ],
);

// Create deserialization context with all descriptors
final context = RegistryTypeResolver(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

Comparison

FeatureNodeProcessorTypedNodeProcessor
Config accesscontext.currentNode.config['key']config.key
Type safetyRuntimeCompile-time
IDE supportLimitedFull autocomplete
Error handlingManualAuto-wrapped

When to Use

Use NodeProcessor

  • Simple nodes with minimal configuration
  • Built-in node type overrides
  • Prototyping and testing
  • One-off custom node types

Use TypedNodeProcessor

  • Complex configurations with many fields
  • Custom node types with specific settings
  • Production code requiring type safety
  • Reusable node executor libraries

Best Practices

  1. Override buildNode for custom node building logic
  2. Implement onSignalReceived for wait nodes
  3. Use TypedNodeProcessor for complex configurations
  4. Return detailed FailWorkflowResult on errors
  5. Register via WorkflowDescriptor for engine integration

See Also