TaskExecutor API
Base class for automated task executors. Tasks perform business logic without human interaction.
TaskExecutor
dart
abstract class TaskExecutor implements SchemaItem {
/// Unique schema type identifier for registry lookup
/// Example: 'task.email.send', 'task.entity.validate'
String get schemaType;
/// Human-readable name
String get name;
/// Optional description
String get description => '';
/// Execute the task
Future<TaskResult> execute(ExecutionContext context);
/// Validate task configuration (optional)
List<String> validateConfig(Map<String, dynamic> config) => [];
}TypedTaskExecutor
Type-safe base class with compile-time safety for input/output:
dart
abstract class TypedTaskExecutor<TInput, TOutput> extends TaskExecutor {
/// Deserialize input from workflow variables
TInput fromInput(Map<String, dynamic> input);
/// Serialize output to workflow variables
Map<String, dynamic> toOutput(TOutput output);
/// Type-safe execute method
Future<TOutput> executeTyped(TInput input, ExecutionContext context);
}TaskResult
Sealed class returned by task execution:
dart
sealed class TaskResult {}
class TaskSuccess extends TaskResult {
const TaskSuccess({
this.output = const {},
this.outputPortId,
this.effects = const [], // NEW: Declarative side effects
});
/// Output data merged into workflow variables
final Map<String, dynamic> output;
/// Optional output port ID for multi-path routing
final String? outputPortId;
/// Optional side effects to apply after task completion.
///
/// Effects allow executors to declaratively request side effects
/// without directly calling mutation methods. The engine will
/// process these effects after the task completes successfully.
///
/// Common effects include:
/// - [CancelUserTasksEffect] - Cancel pending user tasks
/// - [SetOutputEffect] - Set additional output data
/// - [RecordEventEffect] - Record workflow events
final List<WorkflowEffect> effects;
}
class TaskFailure extends TaskResult {
const TaskFailure({
required this.errorType,
required this.message,
this.isRetryable = true,
this.details,
});
final ErrorType errorType;
final String message;
final bool isRetryable;
final Map<String, dynamic>? details;
// Factory constructors
factory TaskFailure.permanent({...});
factory TaskFailure.validation(String message, {...});
factory TaskFailure.internal(String message, {...});
factory TaskFailure.timeout(String message, {...});
}ExecutionContext
A read-only context providing access to all execution state. Executors return results with effects instead of calling mutation methods directly.
Data Model
The context provides three levels of data access:
| Property | Description |
|---|---|
input | Output from the previous node (your primary data source) |
workflowInput | Original input when workflow started (immutable) |
accumulated | All output accumulated during workflow execution |
Core Properties
dart
class ExecutionContext {
/// The executable workflow with nodes and edges
final Workflow workflow;
/// The current workflow instance
final WorkflowInstance workflowInstance;
/// The node being executed
final WorkflowNode currentNode;
/// The token that triggered this execution
final WorkflowToken currentToken;
/// Current accumulated output (read-only snapshot)
final Map<String, dynamic> output;
/// Deserialization context for resolving executors
final WorkflowDeserializationContext deserializationContext;
/// Executor-specific configuration from the node definition
final Map<String, dynamic> config;
/// Signal name for user tasks (null for other executor types)
final String? signalName;
/// Edge being evaluated (for condition executors only)
final WorkflowEdge? edge;
}Data Access Methods
dart
// ═══════════════════════════════════════════════════════════════════
// INPUT - Data from previous node (your primary data source)
// ═══════════════════════════════════════════════════════════════════
/// Get a value from input by key or path (supports dot notation)
T? get<T>(String path); // e.g., get<String>('entityId')
T getRequired<T>(String path); // Throws if missing
// ═══════════════════════════════════════════════════════════════════
// WORKFLOW INPUT - Original input when workflow started (immutable)
// ═══════════════════════════════════════════════════════════════════
Map<String, dynamic> get workflowInput; // Original workflow input
T? getInitial<T>(String path); // Get from workflowInput
T getInitialRequired<T>(String path); // Throws if missing
// ═══════════════════════════════════════════════════════════════════
// ACCUMULATED - All output from workflow execution
// ═══════════════════════════════════════════════════════════════════
Map<String, dynamic> get accumulated; // All accumulated output
T? getAny<T>(String path); // Get from accumulated
// ═══════════════════════════════════════════════════════════════════
// CONFIG - Node configuration
// ═══════════════════════════════════════════════════════════════════
T? getConfig<T>(String key); // Get config value
T getConfigRequired<T>(String key); // Throws if missingNavigation Helpers
dart
/// Get outgoing edges from current node
List<WorkflowEdge> get outgoingEdges;
/// Get incoming edges to current node
List<WorkflowEdge> get incomingEdges;
/// Get a node by ID
WorkflowNode? getNode(String nodeId);
/// Get all active tokens at a specific node
List<WorkflowToken> getTokensAtNode(String nodeId);
/// Check node type
bool get isStartNode;
bool get isEndNode;Retry Information
dart
/// Current attempt number (1-based: 1 = first try, 2 = first retry)
int get attemptNumber;
/// Whether this is a retry attempt
bool get isRetry => attemptNumber > 1;Path Notation Examples
dart
// Simple key access
final entityId = context.get<String>('entityId');
// Nested path access (dot notation)
final decision = context.get<String>('approval.decision');
final level = context.get<int>('approval.levels.0.level');
// Required values (throws ArgumentError if missing)
final tenantId = context.getRequired<String>('tenantId');Examples
Basic TaskExecutor
dart
class ValidateEntityExecutor extends TaskExecutor {
static const _schemaType = 'task.entity.validate';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ValidateEntityExecutor(),
title: 'Validate Entity',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Validate Entity';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final entityId = context.getRequired<String>('entityId');
final entityType = context.getRequired<String>('entityType');
final errors = await validator.validate(entityType, entityId);
if (errors.isNotEmpty) {
return TaskFailure.validation(
'Validation failed',
details: {'errors': errors},
);
}
return TaskSuccess(output: {
'validated': true,
'validatedAt': DateTime.now().toIso8601String(),
});
}
}TypedTaskExecutor
dart
@JsonSerializable()
class ValidateInput {
final String entityId;
final String entityType;
ValidateInput({required this.entityId, required this.entityType});
factory ValidateInput.fromJson(Map<String, dynamic> json) =>
_$ValidateInputFromJson(json);
}
@JsonSerializable()
class ValidateOutput {
final bool validated;
final List<String> errors;
final DateTime validatedAt;
ValidateOutput({
required this.validated,
required this.errors,
required this.validatedAt,
});
Map<String, dynamic> toJson() => _$ValidateOutputToJson(this);
}
class ValidateEntityExecutor
extends TypedTaskExecutor<ValidateInput, ValidateOutput> {
static const _schemaType = 'task.entity.validate';
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (json) => ValidateEntityExecutor(),
title: 'Validate Entity',
);
@override
String get schemaType => _schemaType;
@override
String get name => 'Validate Entity';
@override
ValidateInput fromInput(Map<String, dynamic> input) =>
ValidateInput.fromJson(input);
@override
Map<String, dynamic> toOutput(ValidateOutput output) => output.toJson();
@override
Future<ValidateOutput> executeTyped(
ValidateInput input,
ExecutionContext context,
) async {
final errors = await validator.validate(input.entityType, input.entityId);
return ValidateOutput(
validated: errors.isEmpty,
errors: errors,
validatedAt: DateTime.now(),
);
}
}Multi-Output Task
dart
class RouteByAmountExecutor extends TaskExecutor {
@override
String get schemaType => 'task.routing.byAmount';
@override
String get name => 'Route by Amount';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final amount = context.getRequired<num>('amount');
// Return output port ID for routing
final portId = amount > 10000 ? 'high' : 'low';
return TaskSuccess(
output: {'routedAmount': amount},
outputPortId: portId, // Maps to edge via output port
);
}
}Task with Effects
Tasks can return effects for declarative side effects:
dart
class ProcessApprovalExecutor extends TaskExecutor {
@override
String get schemaType => 'task.approval.process';
@override
String get name => 'Process Approval';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final decision = context.getRequired<String>('decision');
final approvedBy = context.getRequired<String>('approvedBy');
// Return output with effects
return TaskSuccess(
output: {
'processed': true,
'decision': decision,
'processedAt': DateTime.now().toIso8601String(),
},
// Declarative side effects - engine processes these
effects: [
// Cancel any pending user tasks
const CancelUserTasksEffect(),
// Record an event for audit
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'approval_processed',
data: {'decision': decision, 'approvedBy': approvedBy},
),
),
],
);
}
}Registration
dart
final descriptor = WorkflowDescriptor(
title: 'Entity Tasks',
tasks: [
ValidateEntityExecutor.typeDescriptor,
RouteByAmountExecutor.typeDescriptor,
],
);
// Create context with descriptors
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();Comparison
| Feature | TaskExecutor | TypedTaskExecutor |
|---|---|---|
| Input access | context.get<T>('key') | input.key |
| Output | TaskSuccess(output: {...}) | Return typed object |
| Type safety | Runtime | Compile-time |
| Serialization | Manual | Via fromInput/toOutput |
| Error handling | Manual | Auto-wrapped |
| Effects | Manual via effects list | Same |
Best Practices
- Use namespaced schema types -
task.domain.actionformat - Prefer TypedTaskExecutor for complex I/O
- Include static typeDescriptor for registration
- Return detailed failures with error types and details
- Use outputPortId for multi-path routing instead of gateways
- Use effects for side effects - Keep executors pure by returning effects instead of calling mutation methods
- Use
get<T>for previous node output - This is your primary data source - Use
getInitial<T>for original workflow input - For configuration that should persist throughout
See Also
- Task Executors - Detailed guide
- Workflow Effects - Effect types reference
- Type Registries - Registration patterns
- WorkflowDescriptor - Registration API