Skip to content

TaskExecutor API

Base class for automated task executors. Tasks perform business logic without human interaction.

TaskExecutor

abstract

Class Signature


TypedTaskExecutor

abstract

Class Signature

Type-safe base class with compile-time safety for input/output.


TaskResult

Class Signature

Sealed class returned by task execution.


ExecutionContext

Class Signature

Properties

PropertyTypeDefaultDescription
workflowWorkflow--The executable workflow with nodes and edges
workflowInstanceWorkflowInstance--The current workflow instance
currentNodeLeafWorkflowNode--The node being executed
currentTokenWorkflowToken--The token that triggered this execution
outputMap<String, dynamic>--Current accumulated output (read-only snapshot)
configMap<String, dynamic>--Executor-specific configuration from the node definition
signalNameString?--Signal name for user tasks (null for other executor types)
edgeWorkflowEdge?--Edge being evaluated (for condition executors only)
workflowInputMap<String, dynamic>--Original input when workflow started (immutable)
accumulatedMap<String, dynamic>--All output accumulated during workflow execution
attemptNumberint--Current attempt number (1-based: 1 = first try, 2 = first retry)
isRetrybool--Whether this is a retry attempt (attemptNumber > 1)
outgoingEdgesList<WorkflowEdge>--Outgoing edges from current node
incomingEdgesList<WorkflowEdge>--Incoming edges to current node
isStartNodebool--True if the current node is a start node
isEndNodebool--True if the current node is an end node

Methods

  • get returns T?
    T? get<T>(String path)

    Get a value from input by key or path (supports dot notation).

  • getRequired returns T
    T getRequired<T>(String path)

    Get a required value from input. Throws if missing.

  • getInitial returns T?
    T? getInitial<T>(String path)

    Get a value from the original workflow input.

  • getInitialRequired returns T
    T getInitialRequired<T>(String path)

    Get a required value from the original workflow input. Throws if missing.

  • getAny returns T?
    T? getAny<T>(String path)

    Get a value from accumulated output (any previous node).

  • getConfig returns T?
    T? getConfig<T>(String key)

    Get a value from node configuration.

  • getConfigRequired returns T
    T getConfigRequired<T>(String key)

    Get a required value from node configuration. Throws if missing.

  • getNode returns LeafWorkflowNode?
    LeafWorkflowNode? getNode(String nodeId)

    Get a node by ID.

  • getTokensAtNode returns List<WorkflowToken>
    List<WorkflowToken> getTokensAtNode(String nodeId)

    Get all active tokens at a specific node.

A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.

Data Model

The context provides three levels of data access:

PropertyDescription
inputOutput from the previous node (your primary data source)
workflowInputOriginal input when workflow started (immutable)
accumulatedAll output accumulated during workflow execution

Path Notation Examples

dart
// Simple key access
final entityId = context.get<String>('entityId');

// Nested path access (dot notation)
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approval.levels.0.level');

// Required values (throws ArgumentError if missing)
final tenantId = context.getRequired<String>('tenantId');

Examples

Basic TaskExecutor

dart
class ValidateEntityExecutor extends TaskExecutor {
  static const _schemaType = 'task.entity.validate';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ValidateEntityExecutor(),
    title: 'Validate Entity',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Validate Entity';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final entityId = context.getRequired<String>('entityId');
    final entityType = context.getRequired<String>('entityType');

    final errors = await validator.validate(entityType, entityId);

    if (errors.isNotEmpty) {
      return TaskFailure.validation(
        'Validation failed',
        details: {'errors': errors},
      );
    }

    return TaskSuccess([SetOutputEffect(output: {
      'validated': true,
      'validatedAt': DateTime.now().toIso8601String(),
    })]);
  }
}

TypedTaskExecutor

dart
@JsonSerializable()
class ValidateInput {
  final String entityId;
  final String entityType;

  ValidateInput({required this.entityId, required this.entityType});

  factory ValidateInput.fromJson(Map<String, dynamic> json) =>
      _$ValidateInputFromJson(json);
}

@JsonSerializable()
class ValidateOutput {
  final bool validated;
  final List<String> errors;
  final DateTime validatedAt;

  ValidateOutput({
    required this.validated,
    required this.errors,
    required this.validatedAt,
  });

  Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}

class ValidateEntityExecutor
    extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
  static const _schemaType = 'task.entity.validate';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ValidateEntityExecutor(),
    title: 'Validate Entity',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Validate Entity';

  @override
  ValidateInput fromInput(Map<String, dynamic> input) =>
      ValidateInput.fromJson(input);

  @override
  Map<String, dynamic> toOutput(ValidateOutput output) => output.toJson();

  @override
  Future<ValidateOutput> executeTyped(
    ValidateInput input,
    ExecutionContext context,
  ) async {
    final errors = await validator.validate(input.entityType, input.entityId);

    return ValidateOutput(
      validated: errors.isEmpty,
      errors: errors,
      validatedAt: DateTime.now(),
    );
  }
}

Multi-Output Task

dart
class RouteByAmountExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.routing.byAmount';

  @override
  String get name => 'Route by Amount';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final amount = context.getRequired<num>('amount');

    // Return output port ID for routing
    final portId = amount > 10000 ? 'high' : 'low';

    return TaskSuccess([
      SetOutputEffect(output: {'routedAmount': amount}),
      RouteToPortEffect(portId: portId),  // Maps to edge via output port
    ]);
  }
}

Task with Effects

Tasks can return effects for declarative side effects:

dart
class ProcessApprovalExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.approval.process';

  @override
  String get name => 'Process Approval';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final decision = context.getRequired<String>('decision');
    final approvedBy = context.getRequired<String>('approvedBy');

    // Return output with effects
    return TaskSuccess([
      SetOutputEffect(output: {
        'processed': true,
        'decision': decision,
        'processedAt': DateTime.now().toIso8601String(),
      }),
      // Cancel any pending user tasks
      const CancelUserTasksEffect(),
      // Record an event for audit
      RecordEventEffect(
        event: WorkflowEvent.custom(
          instanceId: context.workflowInstanceId,
          nodeId: context.nodeId,
          eventType: 'approval_processed',
          data: {'decision': decision, 'approvedBy': approvedBy},
        ),
      ),
    ]);
  }
}

Registration

dart
final descriptor = WorkflowDescriptor(
  title: 'Entity Tasks',
  tasks: [
    ValidateEntityExecutor.typeDescriptor,
    RouteByAmountExecutor.typeDescriptor,
  ],
);

// Create context with descriptors
final context = RegistryTypeResolver(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(),
  executionMode: ExecutionMode.production,
);
await engine.initialize();

Comparison

FeatureTaskExecutorTypedTaskExecutor
Input accesscontext.get<T>('key')input.key
OutputTaskSuccess([SetOutputEffect(...)])Return typed object
Type safetyRuntimeCompile-time
SerializationManualVia fromInput/toOutput
Error handlingManualAuto-wrapped
EffectsManual via effects listSame

Best Practices

  1. Use namespaced schema types - task.domain.action format
  2. Prefer TypedTaskExecutor for complex I/O
  3. Include static typeDescriptor for registration
  4. Return detailed failures with error types and details
  5. Use outputPortId for multi-path routing instead of gateways
  6. Use effects for side effects - Keep executors pure by returning effects instead of calling mutation methods
  7. Use get<T> for previous node output - This is your primary data source
  8. Use getInitial<T> for original workflow input - For configuration that should persist throughout

See Also