Skip to content

Workflow Effects

Effects capture side effects declaratively, enabling pure executors that are easy to test and reason about.

Overview

Instead of calling mutation methods directly, executors return WorkflowEffect objects that describe what should happen. The engine's EffectProcessor interprets these effects and applies them to storage.

Benefits

  • Pure executors: Node executors return effects instead of mutating state
  • Testability: Effects can be asserted without mocking storage
  • Batching: Multiple effects can be processed together
  • Replay: Effects form an audit trail that can be replayed
dart
// Instead of calling mutation methods...
await storage.userTaskInstances.cancel(taskId);
await storage.events.record(event);

// Return effects declaratively
return TaskSuccess(
  output: {'processed': true},
  effects: [
    const CancelUserTasksEffect(),
    RecordEventEffect(event: event),
  ],
);

Effect Hierarchy

WorkflowEffect hierarchyA sealed WorkflowEffect root groups output, status, token, user task, notification, event, scheduling, and cleanup effects. Each group lists the concrete effects it contains.WorkflowEffectsealedOutput EffectsSetOutputEffectSetVariableEffectRouteToPortEffectStatus EffectsUpdateStatusEffectCompleteWorkflowEffectFailWorkflowEffectToken EffectsCreateTokensEffectConsumeTokenEffectConsumeJoinTokensEffectDropTokenEffectWaitTokenEffectActivateTokenEffectUser Task EffectsCreateUserTaskEffectCancelUserTasksEffectReassignUserTaskEffectNotification EffectsSendNotificationEffectEvent EffectsRecordEventEffectEmitErrorEffectScheduling EffectsScheduleExecutionEffectScheduleRetryEffectStoreAttemptNumberEffectCleanup EffectsCleanupResourcesEffect

Output Effects

SetOutputEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
outputMap<String, dynamic>YesThe output data to merge into workflow output
pathString?NoOptional path to namespace the output

Set or merge output data into the workflow instance.

Usage:

dart
// Merge at root level
SetOutputEffect(output: {'validated': true})

// Namespace under a path to prevent collisions
SetOutputEffect(
  output: {'count': 10, 'total': 500},
  path: 'processingStats',
)
// Result: { processingStats: { count: 10, total: 500 } }

SetVariableEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
nameStringYesVariable name
valuedynamicYesVariable value
scopeVariableScopeYesVariable scope (local, inherited, shared)

Set or update a workflow variable with explicit scoping.

Usage:

dart
// Set a local variable
SetVariableEffect(name: 'retryCount', value: 3, scope: VariableScope.local)

// Convenience factories
SetVariableEffect.local('retryCount', 3)
SetVariableEffect.inherited('approvalChain', chain)

RouteToPortEffect

Class Signature

Route execution to a specific output port. Used by executors that support multiple outcome paths (e.g., subflow nodes with success/failure/timeout ports).

Usage:

dart
// Route to the success port
RouteToPortEffect(portId: 'success')

// Route to a custom port
RouteToPortEffect(portId: 'needsReview')

Status Effects

UpdateStatusEffect

Update the workflow instance status.

dart
class UpdateStatusEffect extends WorkflowEffect {
  const UpdateStatusEffect({required this.status});

  final WorkflowStatus status;
}

CompleteWorkflowEffect

Complete the workflow with final output.

dart
class CompleteWorkflowEffect extends WorkflowEffect {
  const CompleteWorkflowEffect({this.output = const {}});

  /// Final output to merge before completing
  final Map<String, dynamic> output;
}

FailWorkflowEffect

Fail the workflow with an error.

dart
class FailWorkflowEffect extends WorkflowEffect {
  const FailWorkflowEffect({
    required this.error,
    this.nodeId,
  });

  /// The error that caused the failure
  final WorkflowError error;

  /// The node where the failure occurred
  final String? nodeId;
}

Token Effects

Tokens track execution position in the workflow. These effects manage token lifecycle.

CreateTokensEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
targetNodeIdsList<String>YesTarget node IDs to create tokens for
fromNodeIdStringYesThe node ID that is the source of these tokens
fromTokenIdString?NoThe token ID that triggered this creation
isParallelSplitboolNoWhether this is a parallel split (creates SplitTokens)
outputMap<String, dynamic>NoOutput to carry on the new tokens
splitContextSplitContext?NoSplit context for parallel/inclusive splits
splitTypeSplitType?NoType of split (exclusive, parallel, inclusive)

Create new tokens for target nodes. Used when continuing to next nodes or splitting into parallel branches.

ConsumeTokenEffect

Consume a token (mark as terminal). Used when a token reaches an end node or is merged at a join.

dart
class ConsumeTokenEffect extends WorkflowEffect {
  const ConsumeTokenEffect({required this.tokenId});

  final String tokenId;
}

ConsumeJoinTokensEffect

Consume all tokens at a join node when all branches have arrived.

dart
class ConsumeJoinTokensEffect extends WorkflowEffect {
  const ConsumeJoinTokensEffect({
    required this.joinNodeId,
    required this.tokenIds,
  });

  final String joinNodeId;
  final List<String> tokenIds;
}

DropTokenEffect

Drop a token (late arrival at join or explicit drop).

dart
class DropTokenEffect extends WorkflowEffect {
  const DropTokenEffect({
    required this.tokenId,
    this.reason,
  });

  final String tokenId;
  final String? reason;
}

WaitTokenEffect

Mark a token as waiting for an external event.

dart
class WaitTokenEffect extends WorkflowEffect {
  const WaitTokenEffect({required this.tokenId});

  final String tokenId;
}

ActivateTokenEffect

Activate a waiting token (resume from wait state).

dart
class ActivateTokenEffect extends WorkflowEffect {
  const ActivateTokenEffect({required this.tokenId});

  final String tokenId;
}

User Task Effects

CreateUserTaskEffect

Create a user task instance.

dart
class CreateUserTaskEffect extends WorkflowEffect {
  const CreateUserTaskEffect({
    required this.taskId,
    required this.nodeId,
    required this.signalName,
    required this.title,
    required this.schemaType,
    this.description,
    this.assignedToRoleId,
    this.assignedToUserId,
    this.assignedToGroupId,
    this.priority = UserTaskPriority.normal,
    this.dueAt,
    this.input = const {},
    this.storeAs,
  });
}

CancelUserTasksEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
nodeIdString?NoCancel only tasks for this node. If null, cancel all.
reasonString?NoOptional reason for cancellation

Cancel pending user tasks for a workflow instance.

Usage:

dart
// Cancel all pending user tasks
const CancelUserTasksEffect()

// Cancel only tasks for a specific node
CancelUserTasksEffect(nodeId: 'approvalNode')

Common Use Case: Cancel Before Revision

dart
class RevisionTaskExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'revision_submitted',
      config: UserTaskConfiguration(
        title: 'Revise Document',
        schemaType: 'revision',
        assignedToUserId: context.getRequired<String>('submitterId'),
      ),
      // Cancel any pending approval tasks before creating revision task
      effects: [const CancelUserTasksEffect()],
    );
  }
}

ReassignUserTaskEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
taskIdStringYesThe user task ID to reassign
newAssignedToUserIdString?NoNew user ID to assign to
newAssignedToRoleIdString?NoNew role ID to assign to
newAssignedToGroupIdString?NoNew group ID to assign to
reasonString?NoReason for reassignment
reassignedByString?NoWho initiated the reassignment

Reassign a user task to a different user, role, or group.

Usage:

dart
// Reassign to a specific user
ReassignUserTaskEffect(
  taskId: 'task-123',
  newAssignedToUserId: 'manager-456',
  reason: 'Original assignee on leave',
  reassignedBy: 'admin-789',
)

// Reassign to a role
ReassignUserTaskEffect(
  taskId: 'task-123',
  newAssignedToRoleId: 'senior-managers',
  reason: 'Escalation due to SLA breach',
)

Notification Effects

SendNotificationEffect

Class Signature

Constructor Parameters

ParameterTypeRequiredDescription
notificationTypeStringYesNotification type identifier (e.g., "task_assigned")
recipientUserIdsList<String>YesUser IDs to notify
titleStringYesNotification title
recipientRoleIdsList<String>?NoRole IDs to notify (optional)
recipientGroupIdsList<String>?NoGroup IDs to notify (optional)
bodyString?NoNotification body (optional)
dataMap<String, dynamic>NoAdditional data payload
channelsList<NotificationChannel>NoDelivery channels
priorityNotificationPriorityNoNotification priority

Send a notification to users, roles, or groups across multiple channels.

NotificationChannel

dart
enum NotificationChannel { inApp, email, push, sms }

NotificationPriority

dart
enum NotificationPriority { low, normal, high, urgent }

Usage:

dart
SendNotificationEffect(
  notificationType: 'approval_required',
  recipientRoleIds: ['managers'],
  recipientUserIds: [],
  title: 'New Approval Request',
  body: 'Order #1234 requires your approval',
  channels: [NotificationChannel.inApp, NotificationChannel.email],
  priority: NotificationPriority.high,
  data: {'orderId': '1234', 'amount': 5000},
)

Event Effects

RecordEventEffect

Record a workflow event to the event log.

dart
class RecordEventEffect extends WorkflowEffect {
  const RecordEventEffect({required this.event});

  final WorkflowEvent event;
}

Usage:

dart
RecordEventEffect(
  event: WorkflowEvent.custom(
    instanceId: context.workflowInstanceId,
    nodeId: context.nodeId,
    eventType: 'approval_completed',
    data: {'decision': 'approved', 'approvedBy': userId},
  ),
)

EmitErrorEffect

Emit an error to the error stream.

dart
class EmitErrorEffect extends WorkflowEffect {
  const EmitErrorEffect({
    required this.instanceId,
    required this.error,
  });

  final String instanceId;
  final WorkflowError error;
}

Scheduling Effects

ScheduleExecutionEffect

Schedule execution of target nodes. Used internally by the orchestrator.

dart
class ScheduleExecutionEffect extends WorkflowEffect {
  const ScheduleExecutionEffect({
    required this.nodeIds,
    required this.tokenIds,
  });

  final List<String> nodeIds;
  final List<String> tokenIds;
}

ScheduleRetryEffect

Schedule a retry with delay.

dart
class ScheduleRetryEffect extends WorkflowEffect {
  const ScheduleRetryEffect({
    required this.nodeId,
    required this.tokenId,
    required this.delay,
    required this.attemptNumber,
  });

  final String nodeId;
  final String tokenId;
  final Duration delay;
  final int attemptNumber;
}

StoreAttemptNumberEffect

Store the attempt number for retry tracking.

dart
class StoreAttemptNumberEffect extends WorkflowEffect {
  const StoreAttemptNumberEffect({
    required this.nodeId,
    required this.attemptNumber,
  });

  final String nodeId;
  final int attemptNumber;
}

Cleanup Effects

CleanupResourcesEffect

Clean up workflow resources on termination. Deactivates all tokens and cancels pending user tasks.

dart
class CleanupResourcesEffect extends WorkflowEffect {
  const CleanupResourcesEffect();
}

Using Effects in Executors

In TaskExecutor

dart
class MyTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.myTask';

  @override
  String get name => 'My Task';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final result = await processData(context.input);

    return TaskSuccess(
      output: {'processed': true},
      effects: [
        // Set namespaced output
        SetOutputEffect(
          output: {'stats': result.stats},
          path: 'taskResult',
        ),
        // Record audit event
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'task_completed',
            data: {'itemCount': result.count},
          ),
        ),
      ],
    );
  }
}

In NodeProcessor

dart
class MyNodeProcessor extends NodeProcessor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    // Process node...

    return ContinueResult.single(
      'nextNode',
      output: {'processed': true},
      effects: [
        const CancelUserTasksEffect(),
        RecordEventEffect(event: ...),
      ],
    );
  }
}

In UserTaskExecutor

dart
class ApprovalExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'approval_decision',
      config: UserTaskConfiguration(
        title: 'Approve Request',
        schemaType: 'approval',
        assignedToRoleId: 'managers',
      ),
      // Effects run before user task is created
      effects: [
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'approval_requested',
            data: {},
          ),
        ),
      ],
    );
  }
}

Best Practices

  1. Keep executors pure: Return effects instead of calling mutation methods
  2. Use SetOutputEffect with paths: Namespace output to prevent collisions
  3. Cancel tasks before creating new ones: Use CancelUserTasksEffect when transitioning between user tasks
  4. Record events for audit: Use RecordEventEffect for important state changes
  5. Order effects appropriately: Effects are processed in order

See Also