Workflow Effects
Effects capture side effects declaratively, enabling pure executors that are easy to test and reason about.
Overview
Instead of calling mutation methods directly, executors return WorkflowEffect objects that describe what should happen. The engine's EffectProcessor interprets these effects and applies them to storage.
Benefits
- Pure executors: Node executors return effects instead of mutating state
- Testability: Effects can be asserted without mocking storage
- Batching: Multiple effects can be processed together
- Replay: Effects form an audit trail that can be replayed
// Instead of calling mutation methods...
await storage.userTaskInstances.cancel(taskId);
await storage.events.record(event);
// Return effects declaratively
return TaskSuccess(
output: {'processed': true},
effects: [
const CancelUserTasksEffect(),
RecordEventEffect(event: event),
],
);Effect Hierarchy
WorkflowEffect (sealed)
├── Output Effects
│ └── SetOutputEffect
├── Status Effects
│ ├── UpdateStatusEffect
│ ├── CompleteWorkflowEffect
│ └── FailWorkflowEffect
├── Token Effects
│ ├── CreateTokensEffect
│ ├── ConsumeTokenEffect
│ ├── ConsumeJoinTokensEffect
│ ├── DropTokenEffect
│ ├── WaitTokenEffect
│ └── ActivateTokenEffect
├── User Task Effects
│ ├── CreateUserTaskEffect
│ └── CancelUserTasksEffect
├── Event Effects
│ ├── RecordEventEffect
│ └── EmitErrorEffect
├── Scheduling Effects
│ ├── ScheduleExecutionEffect
│ ├── ScheduleRetryEffect
│ └── StoreAttemptNumberEffect
└── Cleanup Effects
└── CleanupResourcesEffectOutput Effects
SetOutputEffect
Set or merge output data into the workflow instance.
class SetOutputEffect extends WorkflowEffect {
const SetOutputEffect({
required this.output,
this.path,
});
/// The output data to merge into workflow output
final Map<String, dynamic> output;
/// Optional path to namespace the output
final String? path;
}Usage:
// Merge at root level
SetOutputEffect(output: {'validated': true})
// Namespace under a path to prevent collisions
SetOutputEffect(
output: {'count': 10, 'total': 500},
path: 'processingStats',
)
// Result: { processingStats: { count: 10, total: 500 } }Status Effects
UpdateStatusEffect
Update the workflow instance status.
class UpdateStatusEffect extends WorkflowEffect {
const UpdateStatusEffect({required this.status});
final WorkflowStatus status;
}CompleteWorkflowEffect
Complete the workflow with final output.
class CompleteWorkflowEffect extends WorkflowEffect {
const CompleteWorkflowEffect({this.output = const {}});
/// Final output to merge before completing
final Map<String, dynamic> output;
}FailWorkflowEffect
Fail the workflow with an error.
class FailWorkflowEffect extends WorkflowEffect {
const FailWorkflowEffect({
required this.error,
this.nodeId,
});
/// The error that caused the failure
final WorkflowError error;
/// The node where the failure occurred
final String? nodeId;
}Token Effects
Tokens track execution position in the workflow. These effects manage token lifecycle.
CreateTokensEffect
Create new tokens for target nodes. Used when continuing to next nodes or splitting into parallel branches.
class CreateTokensEffect extends WorkflowEffect {
const CreateTokensEffect({
required this.targetNodeIds,
required this.fromNodeId,
this.fromTokenId,
this.isParallelSplit = false,
this.output = const {},
this.branchId,
this.parentTokenId,
});
/// Target node IDs to create tokens for
final List<String> targetNodeIds;
/// The node ID that is the source of these tokens
final String fromNodeId;
/// Whether this is a parallel split (creates unique branchIds per token)
final bool isParallelSplit;
/// Output to carry on the new tokens
final Map<String, dynamic> output;
}ConsumeTokenEffect
Consume a token (mark as terminal). Used when a token reaches an end node or is merged at a join.
class ConsumeTokenEffect extends WorkflowEffect {
const ConsumeTokenEffect({required this.tokenId});
final String tokenId;
}ConsumeJoinTokensEffect
Consume all tokens at a join node when all branches have arrived.
class ConsumeJoinTokensEffect extends WorkflowEffect {
const ConsumeJoinTokensEffect({
required this.joinNodeId,
required this.tokenIds,
});
final String joinNodeId;
final List<String> tokenIds;
}DropTokenEffect
Drop a token (late arrival at join or explicit drop).
class DropTokenEffect extends WorkflowEffect {
const DropTokenEffect({
required this.tokenId,
this.reason,
});
final String tokenId;
final String? reason;
}WaitTokenEffect
Mark a token as waiting for an external event.
class WaitTokenEffect extends WorkflowEffect {
const WaitTokenEffect({required this.tokenId});
final String tokenId;
}ActivateTokenEffect
Activate a waiting token (resume from wait state).
class ActivateTokenEffect extends WorkflowEffect {
const ActivateTokenEffect({required this.tokenId});
final String tokenId;
}User Task Effects
CreateUserTaskEffect
Create a user task instance.
class CreateUserTaskEffect extends WorkflowEffect {
const CreateUserTaskEffect({
required this.taskId,
required this.nodeId,
required this.signalName,
required this.title,
required this.schemaType,
this.description,
this.assignedToRoleId,
this.assignedToUserId,
this.assignedToGroupId,
this.priority = UserTaskPriority.normal,
this.dueAt,
this.input = const {},
this.storeAs,
});
}CancelUserTasksEffect
Cancel pending user tasks for a workflow instance.
class CancelUserTasksEffect extends WorkflowEffect {
const CancelUserTasksEffect({this.nodeId});
/// Optional: Cancel only tasks for this node. If null, cancel all.
final String? nodeId;
}Usage:
// Cancel all pending user tasks
const CancelUserTasksEffect()
// Cancel only tasks for a specific node
CancelUserTasksEffect(nodeId: 'approvalNode')Common Use Case: Cancel Before Revision
class RevisionTaskExecutor extends UserTaskExecutor {
@override
Future<NodeResult> execute(ExecutionContext context) async {
return WaitForUserTaskResult(
signalName: 'revision_submitted',
config: UserTaskConfiguration(
title: 'Revise Document',
schemaType: 'revision',
assignedToUserId: context.getRequired<String>('submitterId'),
),
// Cancel any pending approval tasks before creating revision task
effects: [const CancelUserTasksEffect()],
);
}
}Event Effects
RecordEventEffect
Record a workflow event to the event log.
class RecordEventEffect extends WorkflowEffect {
const RecordEventEffect({required this.event});
final WorkflowEvent event;
}Usage:
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'approval_completed',
data: {'decision': 'approved', 'approvedBy': userId},
),
)EmitErrorEffect
Emit an error to the error stream.
class EmitErrorEffect extends WorkflowEffect {
const EmitErrorEffect({
required this.instanceId,
required this.error,
});
final String instanceId;
final WorkflowError error;
}Scheduling Effects
ScheduleExecutionEffect
Schedule execution of target nodes. Used internally by the orchestrator.
class ScheduleExecutionEffect extends WorkflowEffect {
const ScheduleExecutionEffect({
required this.nodeIds,
required this.tokenIds,
});
final List<String> nodeIds;
final List<String> tokenIds;
}ScheduleRetryEffect
Schedule a retry with delay.
class ScheduleRetryEffect extends WorkflowEffect {
const ScheduleRetryEffect({
required this.nodeId,
required this.tokenId,
required this.delay,
required this.attemptNumber,
});
final String nodeId;
final String tokenId;
final Duration delay;
final int attemptNumber;
}StoreAttemptNumberEffect
Store the attempt number for retry tracking.
class StoreAttemptNumberEffect extends WorkflowEffect {
const StoreAttemptNumberEffect({
required this.nodeId,
required this.attemptNumber,
});
final String nodeId;
final int attemptNumber;
}Cleanup Effects
CleanupResourcesEffect
Clean up workflow resources on termination. Deactivates all tokens and cancels pending user tasks.
class CleanupResourcesEffect extends WorkflowEffect {
const CleanupResourcesEffect();
}Using Effects in Executors
In TaskExecutor
class MyTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.myTask';
@override
String get name => 'My Task';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final result = await processData(context.input);
return TaskSuccess(
output: {'processed': true},
effects: [
// Set namespaced output
SetOutputEffect(
output: {'stats': result.stats},
path: 'taskResult',
),
// Record audit event
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'task_completed',
data: {'itemCount': result.count},
),
),
],
);
}
}In NodeExecutor
class MyNodeExecutor extends NodeExecutor {
@override
Future<NodeResult> execute(ExecutionContext context) async {
// Process node...
return ContinueResult.single(
'nextNode',
output: {'processed': true},
effects: [
const CancelUserTasksEffect(),
RecordEventEffect(event: ...),
],
);
}
}In UserTaskExecutor
class ApprovalExecutor extends UserTaskExecutor {
@override
Future<NodeResult> execute(ExecutionContext context) async {
return WaitForUserTaskResult(
signalName: 'approval_decision',
config: UserTaskConfiguration(
title: 'Approve Request',
schemaType: 'approval',
assignedToRoleId: 'managers',
),
// Effects run before user task is created
effects: [
RecordEventEffect(
event: WorkflowEvent.custom(
instanceId: context.workflowInstanceId,
nodeId: context.nodeId,
eventType: 'approval_requested',
data: {},
),
),
],
);
}
}Best Practices
- Keep executors pure: Return effects instead of calling mutation methods
- Use SetOutputEffect with paths: Namespace output to prevent collisions
- Cancel tasks before creating new ones: Use
CancelUserTasksEffectwhen transitioning between user tasks - Record events for audit: Use
RecordEventEffectfor important state changes - Order effects appropriately: Effects are processed in order
See Also
- Task Executors - Using effects in tasks
- Node Results - Result types with effects
- Data Flow - How data moves through workflows