TaskExecutor API
Base class for automated task executors. Tasks perform business logic without human interaction.
TaskExecutor
abstractClass Signature
TypedTaskExecutor
abstractClass Signature
Type-safe base class with compile-time safety for input/output.
TaskResult
Class Signature
Sealed class returned by task execution.
ExecutionContext
Class Signature
Properties
| Property | Type | Default | Description |
|---|---|---|---|
workflow | Workflow | -- | The executable workflow with nodes and edges |
workflowInstance | WorkflowInstance | -- | The current workflow instance |
currentNode | LeafWorkflowNode | -- | The node being executed |
currentToken | WorkflowToken | -- | The token that triggered this execution |
output | Map<String, dynamic> | -- | Current accumulated output (read-only snapshot) |
config | Map<String, dynamic> | -- | Executor-specific configuration from the node definition |
signalName | String? | -- | Signal name for user tasks (null for other executor types) |
edge | WorkflowEdge? | -- | Edge being evaluated (for condition executors only) |
workflowInput | Map<String, dynamic> | -- | Original input when workflow started (immutable) |
accumulated | Map<String, dynamic> | -- | All output accumulated during workflow execution |
attemptNumber | int | -- | Current attempt number (1-based: 1 = first try, 2 = first retry) |
isRetry | bool | -- | Whether this is a retry attempt (attemptNumber > 1) |
outgoingEdges | List<WorkflowEdge> | -- | Outgoing edges from current node |
incomingEdges | List<WorkflowEdge> | -- | Incoming edges to current node |
isStartNode | bool | -- | True if the current node is a start node |
isEndNode | bool | -- | True if the current node is an end node |
Methods
getreturnsT?T? get<T>(String path)Get a value from input by key or path (supports dot notation).
getRequiredreturnsTT getRequired<T>(String path)Get a required value from input. Throws if missing.
getInitialreturnsT?T? getInitial<T>(String path)Get a value from the original workflow input.
getInitialRequiredreturnsTT getInitialRequired<T>(String path)Get a required value from the original workflow input. Throws if missing.
getAnyreturnsT?T? getAny<T>(String path)Get a value from accumulated output (any previous node).
getConfigreturnsT?T? getConfig<T>(String key)Get a value from node configuration.
getConfigRequiredreturnsTT getConfigRequired<T>(String key)Get a required value from node configuration. Throws if missing.
getNodereturnsLeafWorkflowNode?LeafWorkflowNode? getNode(String nodeId)Get a node by ID.
getTokensAtNodereturnsList<WorkflowToken>List<WorkflowToken> getTokensAtNode(String nodeId)Get all active tokens at a specific node.
A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.
Data Model
The context provides three levels of data access:
| Property | Description |
|---|---|
input | Output from the previous node (your primary data source) |
workflowInput | Original input when workflow started (immutable) |
accumulated | All output accumulated during workflow execution |
Path Notation Examples
// Simple key access
final entityId = context.get<String>('entityId');
// Nested path access (dot notation)
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approval.levels.0.level');
// Required values (throws ArgumentError if missing)
final tenantId = context.getRequired<String>('tenantId');Examples
Basic TaskExecutor
class ValidateEntityExecutor extends TaskExecutor {
static const _schemaType = 'task.entity.validate';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ValidateEntityExecutor(),
title: 'Validate Entity',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Validate Entity';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final entityId = context.getRequired<String>('entityId');
final entityType = context.getRequired<String>('entityType');
final errors = await validator.validate(entityType, entityId);
if (errors.isNotEmpty) {
return TaskFailure.validation(
'Validation failed',
details: {'errors': errors},
);
}
return TaskSuccess([SetOutputEffect(output: {
'validated': true,
'validatedAt': DateTime.now().toIso8601String(),
})]);
}
}TypedTaskExecutor
@JsonSerializable()
class ValidateInput {
final String entityId;
final String entityType;
ValidateInput({required this.entityId, required this.entityType});
factory ValidateInput.fromJson(Map<String, dynamic> json) =>
_$ValidateInputFromJson(json);
}
@JsonSerializable()
class ValidateOutput {
final bool validated;
final List<String> errors;
final DateTime validatedAt;
ValidateOutput({
required this.validated,
required this.errors,
required this.validatedAt,
});
Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}
class ValidateEntityExecutor
extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
static const _schemaType = 'task.entity.validate';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ValidateEntityExecutor(),
title: 'Validate Entity',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Validate Entity';
@override
ValidateInput fromInput(Map<String, dynamic> input) =>
ValidateInput.fromJson(input);
@override
Map<String, dynamic> toOutput(ValidateOutput output) => output.toJson();
@override
Future<ValidateOutput> executeTyped(
ValidateInput input,
ExecutionContext context,
) async {
final errors = await validator.validate(input.entityType, input.entityId);
return ValidateOutput(
validated: errors.isEmpty,
errors: errors,
validatedAt: DateTime.now(),
);
}
}Multi-Output Task
class RouteByAmountExecutor extends TaskExecutor {
@override
String get schemaType => 'task.routing.byAmount';
@override
String get name => 'Route by Amount';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final amount = context.getRequired<num>('amount');
// Return output port ID for routing
final portId = amount > 10000 ? 'high' : 'low';
return TaskSuccess([
SetOutputEffect(output: {'routedAmount': amount}),
RouteToPortEffect(portId: portId), // Maps to edge via output port
]);
}
}Task with Effects
Tasks can return effects for declarative side effects:
class ProcessApprovalExecutor extends TaskExecutor {
@override
String get schemaType => 'task.approval.process';
@override
String get name => 'Process Approval';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final decision = context.getRequired<String>('decision');
final approvedBy = context.getRequired<String>('approvedBy');
// Return output with effects
return TaskSuccess([
SetOutputEffect(output: {
'processed': true,
'decision': decision,
'processedAt': DateTime.now().toIso8601String(),
}),
// Cancel any pending user tasks
const CancelUserTasksEffect(),
// Record an event for audit
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'approval_processed',
data: {'decision': decision, 'approvedBy': approvedBy},
),
),
]);
}
}Registration
final descriptor = WorkflowDescriptor(
title: 'Entity Tasks',
tasks: [
ValidateEntityExecutor.typeDescriptor,
RouteByAmountExecutor.typeDescriptor,
],
);
// Create context with descriptors
final context = RegistryTypeResolver(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(),
executionMode: ExecutionMode.production,
);
await engine.initialize();Comparison
| Feature | TaskExecutor | TypedTaskExecutor |
|---|---|---|
| Input access | context.get<T>('key') | input.key |
| Output | TaskSuccess([SetOutputEffect(...)]) | Return typed object |
| Type safety | Runtime | Compile-time |
| Serialization | Manual | Via fromInput/toOutput |
| Error handling | Manual | Auto-wrapped |
| Effects | Manual via effects list | Same |
Best Practices
- Use namespaced schema types -
task.domain.actionformat - Prefer TypedTaskExecutor for complex I/O
- Include static typeDescriptor for registration
- Return detailed failures with error types and details
- Use outputPortId for multi-path routing instead of gateways
- Use effects for side effects - Keep executors pure by returning effects instead of calling mutation methods
- Use
get<T>for previous node output - This is your primary data source - Use
getInitial<T>for original workflow input - For configuration that should persist throughout
See Also
- Task Executors - Detailed guide
- Workflow Effects - Effect types reference
- Type Registries - Registration patterns
- WorkflowDescriptor - Registration API