Skip to content

TaskExecutor API

Base class for automated task executors. Tasks perform business logic without human interaction.

TaskExecutor

dart
abstract class TaskExecutor implements SchemaItem {
  /// Unique schema type identifier for registry lookup
  /// Example: 'task.email.send', 'task.entity.validate'
  String get schemaType;

  /// Human-readable name
  String get name;

  /// Optional description
  String get description => '';

  /// Execute the task
  Future<TaskResult> execute(ExecutionContext context);

  /// Validate task configuration (optional)
  List<String> validateConfig(Map<String, dynamic> config) => [];
}

TypedTaskExecutor

Type-safe base class with compile-time safety for input/output:

dart
abstract class TypedTaskExecutor<TInput, TOutput> extends TaskExecutor {
  /// Deserialize input from workflow variables
  TInput fromInput(Map<String, dynamic> input);

  /// Serialize output to workflow variables
  Map<String, dynamic> toOutput(TOutput output);

  /// Type-safe execute method
  Future<TOutput> executeTyped(TInput input, ExecutionContext context);
}

TaskResult

Sealed class returned by task execution:

dart
sealed class TaskResult {}

class TaskSuccess extends TaskResult {
  const TaskSuccess({
    this.output = const {},
    this.outputPortId,
    this.effects = const [],  // NEW: Declarative side effects
  });

  /// Output data merged into workflow variables
  final Map<String, dynamic> output;

  /// Optional output port ID for multi-path routing
  final String? outputPortId;

  /// Optional side effects to apply after task completion.
  ///
  /// Effects allow executors to declaratively request side effects
  /// without directly calling mutation methods. The engine will
  /// process these effects after the task completes successfully.
  ///
  /// Common effects include:
  /// - [CancelUserTasksEffect] - Cancel pending user tasks
  /// - [SetOutputEffect] - Set additional output data
  /// - [RecordEventEffect] - Record workflow events
  final List<WorkflowEffect> effects;
}

class TaskFailure extends TaskResult {
  const TaskFailure({
    required this.errorType,
    required this.message,
    this.isRetryable = true,
    this.details,
  });

  final ErrorType errorType;
  final String message;
  final bool isRetryable;
  final Map<String, dynamic>? details;

  // Factory constructors
  factory TaskFailure.permanent({...});
  factory TaskFailure.validation(String message, {...});
  factory TaskFailure.internal(String message, {...});
  factory TaskFailure.timeout(String message, {...});
}

ExecutionContext

A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.

Data Model

The context provides three levels of data access:

PropertyDescription
inputOutput from the previous node (your primary data source)
workflowInputOriginal input when workflow started (immutable)
accumulatedAll output accumulated during workflow execution

Core Properties

dart
class ExecutionContext {
  /// The executable workflow with nodes and edges
  final Workflow workflow;

  /// The current workflow instance
  final WorkflowInstance workflowInstance;

  /// The node being executed
  final WorkflowNode currentNode;

  /// The token that triggered this execution
  final WorkflowToken currentToken;

  /// Current accumulated output (read-only snapshot)
  final Map<String, dynamic> output;

  /// Deserialization context for resolving executors
  final WorkflowDeserializationContext deserializationContext;

  /// Executor-specific configuration from the node definition
  final Map<String, dynamic> config;

  /// Signal name for user tasks (null for other executor types)
  final String? signalName;

  /// Edge being evaluated (for condition executors only)
  final WorkflowEdge? edge;
}

Data Access Methods

dart
// ═══════════════════════════════════════════════════════════════════
// INPUT - Data from previous node (your primary data source)
// ═══════════════════════════════════════════════════════════════════

/// Get a value from input by key or path (supports dot notation)
T? get<T>(String path);              // e.g., get<String>('entityId')
T getRequired<T>(String path);       // Throws if missing

// ═══════════════════════════════════════════════════════════════════
// WORKFLOW INPUT - Original input when workflow started (immutable)
// ═══════════════════════════════════════════════════════════════════

Map<String, dynamic> get workflowInput;  // Original workflow input
T? getInitial<T>(String path);           // Get from workflowInput
T getInitialRequired<T>(String path);    // Throws if missing

// ═══════════════════════════════════════════════════════════════════
// ACCUMULATED - All output from workflow execution
// ═══════════════════════════════════════════════════════════════════

Map<String, dynamic> get accumulated;    // All accumulated output
T? getAny<T>(String path);               // Get from accumulated

// ═══════════════════════════════════════════════════════════════════
// CONFIG - Node configuration
// ═══════════════════════════════════════════════════════════════════

T? getConfig<T>(String key);             // Get config value
T getConfigRequired<T>(String key);      // Throws if missing
dart
/// Get outgoing edges from current node
List<WorkflowEdge> get outgoingEdges;

/// Get incoming edges to current node
List<WorkflowEdge> get incomingEdges;

/// Get a node by ID
WorkflowNode? getNode(String nodeId);

/// Get all active tokens at a specific node
List<WorkflowToken> getTokensAtNode(String nodeId);

/// Check node type
bool get isStartNode;
bool get isEndNode;

Retry Information

dart
/// Current attempt number (1-based: 1 = first try, 2 = first retry)
int get attemptNumber;

/// Whether this is a retry attempt
bool get isRetry => attemptNumber > 1;

Path Notation Examples

dart
// Simple key access
final entityId = context.get<String>('entityId');

// Nested path access (dot notation)
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approval.levels.0.level');

// Required values (throws ArgumentError if missing)
final tenantId = context.getRequired<String>('tenantId');

Examples

Basic TaskExecutor

dart
class ValidateEntityExecutor extends TaskExecutor {
  static const _schemaType = 'task.entity.validate';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ValidateEntityExecutor(),
    title: 'Validate Entity',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Validate Entity';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    // Get data from previous node output
    final entityId = context.getRequired<String>('entityId');
    final entityType = context.getRequired<String>('entityType');

    final errors = await validator.validate(entityType, entityId);

    if (errors.isNotEmpty) {
      return TaskFailure.validation(
        'Validation failed',
        details: {'errors': errors},
      );
    }

    return TaskSuccess(output: {
      'validated': true,
      'validatedAt': DateTime.now().toIso8601String(),
    });
  }
}

TypedTaskExecutor

dart
@JsonSerializable()
class ValidateInput {
  final String entityId;
  final String entityType;

  ValidateInput({required this.entityId, required this.entityType});

  factory ValidateInput.fromJson(Map<String, dynamic> json) =>
      _$ValidateInputFromJson(json);
}

@JsonSerializable()
class ValidateOutput {
  final bool validated;
  final List<String> errors;
  final DateTime validatedAt;

  ValidateOutput({
    required this.validated,
    required this.errors,
    required this.validatedAt,
  });

  Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}

class ValidateEntityExecutor
    extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
  static const _schemaType = 'task.entity.validate';

  static final typeDescriptor = TypeDescriptor<TaskExecutor>(
    schemaType: _schemaType,
    fromJson: (json) => ValidateEntityExecutor(),
    title: 'Validate Entity',
  );

  @override
  String get schemaType => _schemaType;

  @override
  String get name => 'Validate Entity';

  @override
  ValidateInput fromInput(Map<String, dynamic> input) =>
      ValidateInput.fromJson(input);

  @override
  Map<String, dynamic> toOutput(ValidateOutput output) => output.toJson();

  @override
  Future<ValidateOutput> executeTyped(
    ValidateInput input,
    ExecutionContext context,
  ) async {
    final errors = await validator.validate(input.entityType, input.entityId);

    return ValidateOutput(
      validated: errors.isEmpty,
      errors: errors,
      validatedAt: DateTime.now(),
    );
  }
}

Multi-Output Task

dart
class RouteByAmountExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.routing.byAmount';

  @override
  String get name => 'Route by Amount';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final amount = context.getRequired<num>('amount');

    // Return output port ID for routing
    final portId = amount > 10000 ? 'high' : 'low';

    return TaskSuccess(
      output: {'routedAmount': amount},
      outputPortId: portId,  // Maps to edge via output port
    );
  }
}

Task with Effects

Tasks can return effects for declarative side effects:

dart
class ProcessApprovalExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.approval.process';

  @override
  String get name => 'Process Approval';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final decision = context.getRequired<String>('decision');
    final approvedBy = context.getRequired<String>('approvedBy');

    // Return output with effects
    return TaskSuccess(
      output: {
        'processed': true,
        'decision': decision,
        'processedAt': DateTime.now().toIso8601String(),
      },
      // Declarative side effects - engine processes these
      effects: [
        // Cancel any pending user tasks
        const CancelUserTasksEffect(),
        // Record an event for audit
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'approval_processed',
            data: {'decision': decision, 'approvedBy': approvedBy},
          ),
        ),
      ],
    );
  }
}

Registration

dart
final descriptor = WorkflowDescriptor(
  title: 'Entity Tasks',
  tasks: [
    ValidateEntityExecutor.typeDescriptor,
    RouteByAmountExecutor.typeDescriptor,
  ],
);

// Create context with descriptors
final context = RegistryDeserializationContext(
  descriptors: [DefaultWorkflowDescriptor(), descriptor],
);

// Create engine with context and storage
final engine = WorkflowEngine(
  context: context,
  storage: InMemoryStorage(context: context),
);
await engine.initialize();

Comparison

FeatureTaskExecutorTypedTaskExecutor
Input accesscontext.get<T>('key')input.key
OutputTaskSuccess(output: {...})Return typed object
Type safetyRuntimeCompile-time
SerializationManualVia fromInput/toOutput
Error handlingManualAuto-wrapped
EffectsManual via effects listSame

Best Practices

  1. Use namespaced schema types - task.domain.action format
  2. Prefer TypedTaskExecutor for complex I/O
  3. Include static typeDescriptor for registration
  4. Return detailed failures with error types and details
  5. Use outputPortId for multi-path routing instead of gateways
  6. Use effects for side effects - Keep executors pure by returning effects instead of calling mutation methods
  7. Use get<T> for previous node output - This is your primary data source
  8. Use getInitial<T> for original workflow input - For configuration that should persist throughout

See Also