Engine Lifecycle
WorkflowEngine lifecycle is explicit: construct, initialize, load definitions, execute, then dispose.
Construct
final context = RegistryTypeResolver(
descriptors: [
DefaultWorkflowDescriptor(),
myDescriptor,
],
);
final storage = InMemoryStorage();
final engine = WorkflowEngine(
context: context,
storage: storage,
executionMode: ExecutionMode.production,
config: const WorkflowEngineConfig(
enableAutomaticRecovery: true,
staleWorkflowThreshold: Duration(minutes: 5),
),
);Use ExecutionMode.production for services. Use SimulationMode() or ExecutionMode.simulation for editor/test simulation.
Initialize
await engine.initialize();Initialization:
- Calls
storage.initialize(). - Marks the engine initialized.
- Recovers stale running workflows when
enableAutomaticRecoveryis true.
Load Definitions
Load definitions after initialization:
final definition = await engine.storage.workflows.getByCode('approval');
if (definition != null) {
engine.loadWorkflowDefinition(definition);
}Or register a code-built workflow:
engine.registerWorkflow(workflow);Start
final instance = await engine.startWorkflow(
workflowCode: 'approval',
input: input,
correlationId: correlationId,
userId: currentUserId,
);The engine creates a WorkflowInstance, creates the initial token, records lifecycle events, and executes the start node.
Signals
await engine.sendSignal(
workflowInstanceId: instance.id,
node: 'approval_decision',
port: 'approve',
payload: {'output': decisionOutput},
);Signals resume active or waiting tokens. Matching checks node ID, schema type, and signal name.
Recovery
Automatic recovery can run during initialize(). You can also call it manually:
final recovered = await engine.recoverStaleWorkflows();Recovery finds running instances older than staleWorkflowThreshold, increments their run number, records a resume event, and calls resumeWorkflow(instanceId).
Service tasks should be idempotent because recovery may re-enter an active node.
Cancellation And Failure
final cancelled = await engine.cancelWorkflow(
instance.id,
reason: 'User withdrew request',
userId: currentUserId,
);
final failed = await engine.failWorkflow(
instance.id,
message: 'Manual failure injection',
nodeId: 'approval_decision',
userId: currentUserId,
);Both operations reject already-terminal instances, update status, clean up tokens/user tasks, and emit events. Cancellation can invoke lifecycle hooks registered for the workflow code.
Events
final eventSub = engine.onEvent.listen(handleEvent);
final errorSub = engine.onError.listen(handleError);The streams are broadcast streams. Events are also persisted through storage.events.
Dispose
await eventSub.cancel();
await errorSub.cancel();
await engine.dispose();dispose() closes engine streams and disposes storage. Create a new engine for a fresh lifecycle.
Recommended Service Boot
Future<WorkflowEngine> createWorkflowEngine() async {
final template = ApprovalTemplate(domains: [approvalDomain]);
final context = RegistryTypeResolver(
descriptors: [
DefaultWorkflowDescriptor(),
template.buildDescriptor(),
],
);
final engine = WorkflowEngine(
context: context,
storage: SupabaseStorage(
client: supabaseClient,
config: const SupabaseStorageConfig(schema: 'elog'),
),
executionMode: ExecutionMode.production,
);
await engine.initialize();
final definition = await engine.storage.workflows.getByCode(
template.workflowCode,
);
if (definition == null) {
throw StateError('Missing seeded workflow definition.');
}
engine.loadWorkflowDefinition(definition);
return engine;
}