Engine Lifecycle
Understanding the WorkflowEngine lifecycle is essential for proper integration.
Initialization
Creating the Engine
dart
// 1. Create deserialization context with descriptors
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor()],
);
// 2. Create storage (in-memory or custom implementation)
final storage = InMemoryStorage(context: context);
// Or for production: PostgresWorkflowStorage(connectionString)
// 3. Create engine with context and storage
final engine = WorkflowEngine(
context: context,
storage: storage,
);
// 4. Initialize engine (connects to storage, recovers stale workflows)
await engine.initialize();Configuration Options
dart
final engine = WorkflowEngine(
context: context,
storage: storage,
config: WorkflowEngineConfig(
enableDebugLogging: true,
maxConcurrentWorkflows: 100,
staleWorkflowThreshold: Duration(minutes: 5),
enableAutomaticRecovery: true,
),
);Registration Phase
Register executors via descriptors and workflows before starting instances:
dart
// 1. Create descriptors with executors
final descriptor = WorkflowDescriptor(
title: 'My Executors',
tasks: [
SendEmailTaskExecutor.typeDescriptor,
ValidateDataTaskExecutor.typeDescriptor,
],
userTasks: [ApprovalTaskExecutor.typeDescriptor],
conditions: [ApprovedCondition.typeDescriptor],
);
// 2. Create context with all descriptors
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor(), descriptor],
);
// 3. Create engine with context
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
);
await engine.initialize();
// 4. Register workflows
engine.registerWorkflow(approvalWorkflow);
engine.registerWorkflow(orderProcessingWorkflow);Execution Phase
Starting Workflows
dart
final instance = await engine.startWorkflow(
workflowCode: 'approval-workflow', // Use workflowCode for semantic lookup
input: {
'entityId': 'REQ-001',
'submittedBy': 'alice@example.com',
},
);Processing Signals
dart
await engine.sendSignal(
workflowInstanceId: instance.id,
node: 'awaitApproval', // The signal wait or user task node ID
payload: {'decision': 'approved'},
);Resuming Workflows
After server restart, the engine can automatically recover stale workflows:
dart
// Enable automatic recovery (default is true)
final context = RegistryDeserializationContext(
descriptors: [DefaultWorkflowDescriptor()],
);
final engine = WorkflowEngine(
context: context,
storage: InMemoryStorage(context: context),
config: WorkflowEngineConfig(
enableAutomaticRecovery: true,
staleWorkflowThreshold: Duration(minutes: 5),
),
);
await engine.initialize(); // Automatically recovers stale workflows
// Or manually resume a specific workflow
await engine.resumeWorkflow(instanceId);
// Or manually recover all stale workflows
final recoveredCount = await engine.recoverStaleWorkflows();Execution Flow
Shutdown
Graceful shutdown ensures no data loss:
dart
// Dispose engine and storage connections
await engine.dispose();Error Handling
Engine-Level Errors
dart
try {
await engine.startWorkflow(workflowCode: 'my-workflow');
} on WorkflowNotFoundException catch (e) {
print('Workflow not found: $e');
} on WorkflowNotActiveException catch (e) {
print('Workflow not active: $e');
} on WorkflowValidationException catch (e) {
print('Validation errors: ${e.errors}');
} on WorkflowAlreadyTerminatedException catch (e) {
print('Workflow already terminated: $e');
}Task-Level Errors
Task failures are handled by returning a FailWorkflowResult:
dart
// In task executor
return FailWorkflowResult.validation('Invalid input');
// Or with more details
return FailWorkflowResult(
errorType: WorkflowErrorType.taskError,
message: 'Task failed',
details: {'field': 'value'},
);
// Engine marks workflow as failed and emits error eventEvent Streams
The engine provides real-time event streams for monitoring and integration:
All Events Stream
dart
// Subscribe to all workflow events
engine.onEvent.listen((event) {
print('${event.eventType}: ${event.workflowInstanceId}');
});
// Filter for specific event types
engine.onEvent
.where((e) => e.eventType == WorkflowEventType.userTaskCreated)
.listen((event) {
// Handle user task creation - e.g., send notification
print('New task: ${event.data}');
});
// Filter by workflow instance
engine.onEvent
.where((e) => e.workflowInstanceId == instanceId)
.listen((event) {
// Track specific workflow progress
});Error Stream
dart
// Subscribe to error events specifically
engine.onError.listen((event) {
print('Workflow ${event.workflowInstanceId} failed: ${event.error.message}');
// Send alert, log to monitoring system, etc.
});Event Types
Events are categorized by WorkflowEventType:
| Category | Events |
|---|---|
| Lifecycle | workflowStarted, workflowCompleted, workflowFailed, workflowCancelled, workflowSuspended, workflowResumed |
| Tasks | taskStarted, taskCompleted, taskFailed, taskRetried |
| User Tasks | userTaskCreated, userTaskClaimed, userTaskCompleted, userTaskCancelled, userTaskExpired |
| Signals | signalWaitStarted, signalReceived, signalTimeout |
| Gateways | gatewayEntered, gatewayEvaluated |
| Tokens | tokenCreated, tokenMoved, tokenMerged, tokenCompleted |
Querying Historical Events
Events are also persisted for audit trails:
dart
// Query events for an instance
final events = await storage.events.getByWorkflowInstanceId(instanceId);
// Filter by event type
final taskEvents = await storage.events.getByWorkflowInstanceId(
instanceId,
eventTypes: [WorkflowEventType.taskCompleted],
);
// Get high-level summary
final summary = await storage.events.getSummary(instanceId);Next Steps
- Node Executors - Node executor implementation
- Task Executors - Task execution
- Storage Adapters - Persistence