Skip to content

Workflow Effects

Effects capture side effects declaratively, enabling pure executors that are easy to test and reason about.

Overview

Instead of calling mutation methods directly, executors return WorkflowEffect objects that describe what should happen. The engine's EffectProcessor interprets these effects and applies them to storage.

Benefits

  • Pure executors: Node executors return effects instead of mutating state
  • Testability: Effects can be asserted without mocking storage
  • Batching: Multiple effects can be processed together
  • Replay: Effects form an audit trail that can be replayed
dart
// Instead of calling mutation methods...
await storage.userTaskInstances.cancel(taskId);
await storage.events.record(event);

// Return effects declaratively
return TaskSuccess(
  output: {'processed': true},
  effects: [
    const CancelUserTasksEffect(),
    RecordEventEffect(event: event),
  ],
);

Effect Hierarchy

WorkflowEffect (sealed)
├── Output Effects
│   ├── SetOutputEffect
│   ├── SetVariableEffect
│   └── RouteToPortEffect
├── Status Effects
│   ├── UpdateStatusEffect
│   ├── CompleteWorkflowEffect
│   └── FailWorkflowEffect
├── Token Effects
│   ├── CreateTokensEffect
│   ├── ConsumeTokenEffect
│   ├── ConsumeJoinTokensEffect
│   ├── DropTokenEffect
│   ├── WaitTokenEffect
│   └── ActivateTokenEffect
├── User Task Effects
│   ├── CreateUserTaskEffect
│   ├── CancelUserTasksEffect
│   └── ReassignUserTaskEffect
├── Notification Effects
│   └── SendNotificationEffect
├── Event Effects
│   ├── RecordEventEffect
│   └── EmitErrorEffect
├── Scheduling Effects
│   ├── ScheduleExecutionEffect
│   ├── ScheduleRetryEffect
│   └── StoreAttemptNumberEffect
└── Cleanup Effects
    └── CleanupResourcesEffect

Output Effects

SetOutputEffect

Set or merge output data into the workflow instance.

dart
class SetOutputEffect extends WorkflowEffect {
  const SetOutputEffect({
    required this.output,
    this.path,
  });

  /// The output data to merge into workflow output
  final Map<String, dynamic> output;

  /// Optional path to namespace the output
  final String? path;
}

Usage:

dart
// Merge at root level
SetOutputEffect(output: {'validated': true})

// Namespace under a path to prevent collisions
SetOutputEffect(
  output: {'count': 10, 'total': 500},
  path: 'processingStats',
)
// Result: { processingStats: { count: 10, total: 500 } }

SetVariableEffect

Set or update a workflow variable with explicit scoping.

dart
class SetVariableEffect extends WorkflowEffect {
  const SetVariableEffect({
    required this.name,
    required this.value,
    required this.scope,
  });

  /// Variable name
  final String name;

  /// Variable value
  final dynamic value;

  /// Variable scope (local, inherited, shared)
  final VariableScope scope;
}

Usage:

dart
// Set a local variable
SetVariableEffect(name: 'retryCount', value: 3, scope: VariableScope.local)

// Convenience factories
SetVariableEffect.local('retryCount', 3)
SetVariableEffect.inherited('approvalChain', chain)

RouteToPortEffect

Route execution to a specific output port. Used by executors that support multiple outcome paths (e.g., subflow nodes with success/failure/timeout ports).

dart
class RouteToPortEffect extends WorkflowEffect {
  const RouteToPortEffect({required this.portId});

  /// The output port ID to route to
  final String portId;
}

Usage:

dart
// Route to the success port
RouteToPortEffect(portId: 'success')

// Route to a custom port
RouteToPortEffect(portId: 'needsReview')

Status Effects

UpdateStatusEffect

Update the workflow instance status.

dart
class UpdateStatusEffect extends WorkflowEffect {
  const UpdateStatusEffect({required this.status});

  final WorkflowStatus status;
}

CompleteWorkflowEffect

Complete the workflow with final output.

dart
class CompleteWorkflowEffect extends WorkflowEffect {
  const CompleteWorkflowEffect({this.output = const {}});

  /// Final output to merge before completing
  final Map<String, dynamic> output;
}

FailWorkflowEffect

Fail the workflow with an error.

dart
class FailWorkflowEffect extends WorkflowEffect {
  const FailWorkflowEffect({
    required this.error,
    this.nodeId,
  });

  /// The error that caused the failure
  final WorkflowError error;

  /// The node where the failure occurred
  final String? nodeId;
}

Token Effects

Tokens track execution position in the workflow. These effects manage token lifecycle.

CreateTokensEffect

Create new tokens for target nodes. Used when continuing to next nodes or splitting into parallel branches.

dart
class CreateTokensEffect extends WorkflowEffect {
  const CreateTokensEffect({
    required this.targetNodeIds,
    required this.fromNodeId,
    this.fromTokenId,
    this.isParallelSplit = false,
    this.output = const {},
    this.splitContext,
    this.splitType,
  });

  /// Target node IDs to create tokens for
  final List<String> targetNodeIds;

  /// The node ID that is the source of these tokens
  final String fromNodeId;

  /// The token ID that triggered this creation
  final String? fromTokenId;

  /// Whether this is a parallel split (creates SplitTokens)
  final bool isParallelSplit;

  /// Output to carry on the new tokens
  final Map<String, dynamic> output;

  /// Split context for parallel/inclusive splits
  final SplitContext? splitContext;

  /// Type of split (exclusive, parallel, inclusive)
  final SplitType? splitType;
}

ConsumeTokenEffect

Consume a token (mark as terminal). Used when a token reaches an end node or is merged at a join.

dart
class ConsumeTokenEffect extends WorkflowEffect {
  const ConsumeTokenEffect({required this.tokenId});

  final String tokenId;
}

ConsumeJoinTokensEffect

Consume all tokens at a join node when all branches have arrived.

dart
class ConsumeJoinTokensEffect extends WorkflowEffect {
  const ConsumeJoinTokensEffect({
    required this.joinNodeId,
    required this.tokenIds,
  });

  final String joinNodeId;
  final List<String> tokenIds;
}

DropTokenEffect

Drop a token (late arrival at join or explicit drop).

dart
class DropTokenEffect extends WorkflowEffect {
  const DropTokenEffect({
    required this.tokenId,
    this.reason,
  });

  final String tokenId;
  final String? reason;
}

WaitTokenEffect

Mark a token as waiting for an external event.

dart
class WaitTokenEffect extends WorkflowEffect {
  const WaitTokenEffect({required this.tokenId});

  final String tokenId;
}

ActivateTokenEffect

Activate a waiting token (resume from wait state).

dart
class ActivateTokenEffect extends WorkflowEffect {
  const ActivateTokenEffect({required this.tokenId});

  final String tokenId;
}

User Task Effects

CreateUserTaskEffect

Create a user task instance.

dart
class CreateUserTaskEffect extends WorkflowEffect {
  const CreateUserTaskEffect({
    required this.taskId,
    required this.nodeId,
    required this.signalName,
    required this.title,
    required this.schemaType,
    this.description,
    this.assignedToRoleId,
    this.assignedToUserId,
    this.assignedToGroupId,
    this.priority = UserTaskPriority.normal,
    this.dueAt,
    this.input = const {},
    this.storeAs,
  });
}

CancelUserTasksEffect

Cancel pending user tasks for a workflow instance.

dart
class CancelUserTasksEffect extends WorkflowEffect {
  const CancelUserTasksEffect({this.nodeId, this.reason});

  /// Optional: Cancel only tasks for this node. If null, cancel all.
  final String? nodeId;

  /// Optional reason for cancellation
  final String? reason;
}

Usage:

dart
// Cancel all pending user tasks
const CancelUserTasksEffect()

// Cancel only tasks for a specific node
CancelUserTasksEffect(nodeId: 'approvalNode')

Common Use Case: Cancel Before Revision

dart
class RevisionTaskExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'revision_submitted',
      config: UserTaskConfiguration(
        title: 'Revise Document',
        schemaType: 'revision',
        assignedToUserId: context.getRequired<String>('submitterId'),
      ),
      // Cancel any pending approval tasks before creating revision task
      effects: [const CancelUserTasksEffect()],
    );
  }
}

ReassignUserTaskEffect

Reassign a user task to a different user, role, or group.

dart
class ReassignUserTaskEffect extends WorkflowEffect {
  const ReassignUserTaskEffect({
    required this.taskId,
    this.newAssignedToUserId,
    this.newAssignedToRoleId,
    this.newAssignedToGroupId,
    this.reason,
    this.reassignedBy,
  });

  /// The user task ID to reassign
  final String taskId;

  /// New user ID to assign to
  final String? newAssignedToUserId;

  /// New role ID to assign to
  final String? newAssignedToRoleId;

  /// New group ID to assign to
  final String? newAssignedToGroupId;

  /// Reason for reassignment
  final String? reason;

  /// Who initiated the reassignment
  final String? reassignedBy;
}

Usage:

dart
// Reassign to a specific user
ReassignUserTaskEffect(
  taskId: 'task-123',
  newAssignedToUserId: 'manager-456',
  reason: 'Original assignee on leave',
  reassignedBy: 'admin-789',
)

// Reassign to a role
ReassignUserTaskEffect(
  taskId: 'task-123',
  newAssignedToRoleId: 'senior-managers',
  reason: 'Escalation due to SLA breach',
)

Notification Effects

SendNotificationEffect

Send a notification to users, roles, or groups across multiple channels.

dart
class SendNotificationEffect extends WorkflowEffect {
  const SendNotificationEffect({
    required this.notificationType,
    required this.recipientUserIds,
    this.recipientRoleIds,
    this.recipientGroupIds,
    required this.title,
    this.body,
    this.data = const {},
    this.channels = const [NotificationChannel.inApp],
    this.priority = NotificationPriority.normal,
  });

  /// Notification type identifier (e.g., 'task_assigned', 'deadline_warning')
  final String notificationType;

  /// User IDs to notify
  final List<String> recipientUserIds;

  /// Role IDs to notify (optional)
  final List<String>? recipientRoleIds;

  /// Group IDs to notify (optional)
  final List<String>? recipientGroupIds;

  /// Notification title
  final String title;

  /// Notification body (optional)
  final String? body;

  /// Additional data payload
  final Map<String, dynamic> data;

  /// Delivery channels
  final List<NotificationChannel> channels;

  /// Notification priority
  final NotificationPriority priority;
}

NotificationChannel

dart
enum NotificationChannel { inApp, email, push, sms }

NotificationPriority

dart
enum NotificationPriority { low, normal, high, urgent }

Usage:

dart
SendNotificationEffect(
  notificationType: 'approval_required',
  recipientRoleIds: ['managers'],
  recipientUserIds: [],
  title: 'New Approval Request',
  body: 'Order #1234 requires your approval',
  channels: [NotificationChannel.inApp, NotificationChannel.email],
  priority: NotificationPriority.high,
  data: {'orderId': '1234', 'amount': 5000},
)

Event Effects

RecordEventEffect

Record a workflow event to the event log.

dart
class RecordEventEffect extends WorkflowEffect {
  const RecordEventEffect({required this.event});

  final WorkflowEvent event;
}

Usage:

dart
RecordEventEffect(
  event: WorkflowEvent.custom(
    instanceId: context.workflowInstanceId,
    nodeId: context.nodeId,
    eventType: 'approval_completed',
    data: {'decision': 'approved', 'approvedBy': userId},
  ),
)

EmitErrorEffect

Emit an error to the error stream.

dart
class EmitErrorEffect extends WorkflowEffect {
  const EmitErrorEffect({
    required this.instanceId,
    required this.error,
  });

  final String instanceId;
  final WorkflowError error;
}

Scheduling Effects

ScheduleExecutionEffect

Schedule execution of target nodes. Used internally by the orchestrator.

dart
class ScheduleExecutionEffect extends WorkflowEffect {
  const ScheduleExecutionEffect({
    required this.nodeIds,
    required this.tokenIds,
  });

  final List<String> nodeIds;
  final List<String> tokenIds;
}

ScheduleRetryEffect

Schedule a retry with delay.

dart
class ScheduleRetryEffect extends WorkflowEffect {
  const ScheduleRetryEffect({
    required this.nodeId,
    required this.tokenId,
    required this.delay,
    required this.attemptNumber,
  });

  final String nodeId;
  final String tokenId;
  final Duration delay;
  final int attemptNumber;
}

StoreAttemptNumberEffect

Store the attempt number for retry tracking.

dart
class StoreAttemptNumberEffect extends WorkflowEffect {
  const StoreAttemptNumberEffect({
    required this.nodeId,
    required this.attemptNumber,
  });

  final String nodeId;
  final int attemptNumber;
}

Cleanup Effects

CleanupResourcesEffect

Clean up workflow resources on termination. Deactivates all tokens and cancels pending user tasks.

dart
class CleanupResourcesEffect extends WorkflowEffect {
  const CleanupResourcesEffect();
}

Using Effects in Executors

In TaskExecutor

dart
class MyTaskExecutor extends TaskExecutor {
  @override
  String get schemaType => 'task.myTask';

  @override
  String get name => 'My Task';

  @override
  Future<TaskResult> execute(ExecutionContext context) async {
    final result = await processData(context.input);

    return TaskSuccess(
      output: {'processed': true},
      effects: [
        // Set namespaced output
        SetOutputEffect(
          output: {'stats': result.stats},
          path: 'taskResult',
        ),
        // Record audit event
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'task_completed',
            data: {'itemCount': result.count},
          ),
        ),
      ],
    );
  }
}

In NodeProcessor

dart
class MyNodeProcessor extends NodeProcessor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    // Process node...

    return ContinueResult.single(
      'nextNode',
      output: {'processed': true},
      effects: [
        const CancelUserTasksEffect(),
        RecordEventEffect(event: ...),
      ],
    );
  }
}

In UserTaskExecutor

dart
class ApprovalExecutor extends UserTaskExecutor {
  @override
  Future<NodeResult> execute(ExecutionContext context) async {
    return WaitForUserTaskResult(
      signalName: 'approval_decision',
      config: UserTaskConfiguration(
        title: 'Approve Request',
        schemaType: 'approval',
        assignedToRoleId: 'managers',
      ),
      // Effects run before user task is created
      effects: [
        RecordEventEffect(
          event: WorkflowEvent.custom(
            instanceId: context.workflowInstanceId,
            nodeId: context.nodeId,
            eventType: 'approval_requested',
            data: {},
          ),
        ),
      ],
    );
  }
}

Best Practices

  1. Keep executors pure: Return effects instead of calling mutation methods
  2. Use SetOutputEffect with paths: Namespace output to prevent collisions
  3. Cancel tasks before creating new ones: Use CancelUserTasksEffect when transitioning between user tasks
  4. Record events for audit: Use RecordEventEffect for important state changes
  5. Order effects appropriately: Effects are processed in order

See Also