Skip to content

Engine Lifecycle

WorkflowEngine lifecycle is explicit: construct, initialize, load definitions, execute, then dispose.

Construct

dart
final context = RegistryTypeResolver(
  descriptors: [
    DefaultWorkflowDescriptor(),
    myDescriptor,
  ],
);

final storage = InMemoryStorage();

final engine = WorkflowEngine(
  context: context,
  storage: storage,
  executionMode: ExecutionMode.production,
  config: const WorkflowEngineConfig(
    enableAutomaticRecovery: true,
    staleWorkflowThreshold: Duration(minutes: 5),
  ),
);

Use ExecutionMode.production for services. Use SimulationMode() or ExecutionMode.simulation for editor/test simulation.

Initialize

dart
await engine.initialize();

Initialization:

  1. Calls storage.initialize().
  2. Marks the engine initialized.
  3. Recovers stale running workflows when enableAutomaticRecovery is true.

Load Definitions

Load definitions after initialization:

dart
final definition = await engine.storage.workflows.getByCode('approval');
if (definition != null) {
  engine.loadWorkflowDefinition(definition);
}

Or register a code-built workflow:

dart
engine.registerWorkflow(workflow);

Start

dart
final instance = await engine.startWorkflow(
  workflowCode: 'approval',
  input: input,
  correlationId: correlationId,
  userId: currentUserId,
);

The engine creates a WorkflowInstance, creates the initial token, records lifecycle events, and executes the start node.

Signals

dart
await engine.sendSignal(
  workflowInstanceId: instance.id,
  node: 'approval_decision',
  port: 'approve',
  payload: {'output': decisionOutput},
);

Signals resume active or waiting tokens. Matching checks node ID, schema type, and signal name.

Recovery

Automatic recovery can run during initialize(). You can also call it manually:

dart
final recovered = await engine.recoverStaleWorkflows();

Recovery finds running instances older than staleWorkflowThreshold, increments their run number, records a resume event, and calls resumeWorkflow(instanceId).

Service tasks should be idempotent because recovery may re-enter an active node.

Cancellation And Failure

dart
final cancelled = await engine.cancelWorkflow(
  instance.id,
  reason: 'User withdrew request',
  userId: currentUserId,
);

final failed = await engine.failWorkflow(
  instance.id,
  message: 'Manual failure injection',
  nodeId: 'approval_decision',
  userId: currentUserId,
);

Both operations reject already-terminal instances, update status, clean up tokens/user tasks, and emit events. Cancellation can invoke lifecycle hooks registered for the workflow code.

Events

dart
final eventSub = engine.onEvent.listen(handleEvent);
final errorSub = engine.onError.listen(handleError);

The streams are broadcast streams. Events are also persisted through storage.events.

Dispose

dart
await eventSub.cancel();
await errorSub.cancel();
await engine.dispose();

dispose() closes engine streams and disposes storage. Create a new engine for a fresh lifecycle.

dart
Future<WorkflowEngine> createWorkflowEngine() async {
  final template = ApprovalTemplate(domains: [approvalDomain]);

  final context = RegistryTypeResolver(
    descriptors: [
      DefaultWorkflowDescriptor(),
      template.buildDescriptor(),
    ],
  );

  final engine = WorkflowEngine(
    context: context,
    storage: SupabaseStorage(
      client: supabaseClient,
      config: const SupabaseStorageConfig(schema: 'elog'),
    ),
    executionMode: ExecutionMode.production,
  );

  await engine.initialize();

  final definition = await engine.storage.workflows.getByCode(
    template.workflowCode,
  );
  if (definition == null) {
    throw StateError('Missing seeded workflow definition.');
  }
  engine.loadWorkflowDefinition(definition);

  return engine;
}

See Also