Storage Adapters
WorkflowStorage is the persistence boundary for the workflow engine. Storage stores data models and repository state; the engine owns type resolution, execution, effect processing, and event emission.
Interface
abstract class WorkflowStorage {
WorkflowRepository get workflows;
WorkflowInstanceRepository get instances;
UserTaskInstanceRepository get userTaskInstances;
WorkflowEventRepository get events;
Future<T> transaction<T>(Future<T> Function() operation);
Future<void> initialize();
Future<void> dispose();
Future<bool> healthCheck();
Future<StorageStats> getStats();
}Stored Models
| Repository | Model | Purpose |
|---|---|---|
workflows | WorkflowDefinition | Design-time/stored workflow definition. No executors attached. |
instances | WorkflowInstance | Runtime state, input, output, variables, tokens, status, and metadata. |
userTaskInstances | UserTaskInstance | Human task inbox records created by user-task nodes. |
events | WorkflowEvent | Append-only audit and timeline events. |
The engine converts WorkflowDefinition into executable Workflow through the configured WorkflowTypeResolver.
In-Memory Storage
Use InMemoryStorage for tests, demos, and editor simulation:
final storage = InMemoryStorage(
workflowLoader: () async => [
approvalTemplate.buildDefinition(),
],
);
await storage.initialize();workflowLoader is optional and runs once on first initialization. dispose() clears all in-memory data.
Supabase Storage
vyuh_workflow_storage_supabase provides SupabaseStorage, a Supabase/PostgreSQL implementation of WorkflowStorage.
final storage = SupabaseStorage(
client: supabaseClient,
config: const SupabaseStorageConfig(
schema: 'elog',
definitionsTable: 'workflows',
instancesTable: 'workflow_instances',
userTasksTable: 'workflow_user_tasks',
eventsTable: 'workflow_events',
),
);
await storage.initialize();Default table configuration:
| Config field | Default |
|---|---|
schema | elog |
definitionsTable | workflows |
instancesTable | workflow_instances |
userTasksTable | workflow_user_tasks |
eventsTable | workflow_events |
SupabaseStorage.initialize() creates repository objects and verifies that all four tables are queryable. It does not create schema objects. Migrations must create and seed the tables before runtime boot.
Supabase Responsibilities
The Supabase adapter:
- Reads and writes workflow definitions.
- Persists instances, tokens, variables, output, and status.
- Persists user task inbox records.
- Appends and queries workflow events.
- Provides health checks and basic stats.
The Supabase adapter does not:
- Resolve task/user-task/condition types.
- Create workflow tables.
- Enforce a global transaction through the Dart client.
transaction()currently executes the operation directly; use database RPC for true multi-statement transactions. - Decide workflow routing.
Repository Highlights
WorkflowRepository
Future<WorkflowDefinition> create(WorkflowDefinition workflow);
Future<WorkflowDefinition> update(WorkflowDefinition workflow);
Future<WorkflowDefinition?> getById(String id);
Future<WorkflowDefinition?> getByCode(String code, {int? version});
Future<List<WorkflowDefinition>> getAllVersions(String code);
Future<List<WorkflowDefinition>> list({
int limit = 100,
int offset = 0,
bool activeOnly = true,
});
Future<List<WorkflowDefinition>> search(String query, {int limit = 100});
Future<bool> delete(String code, {int? version});
Future<bool> activate(String code, {int? version});
Future<bool> deactivate(String code, {int? version});
Future<bool> codeExists(String code);
Future<int> getNextVersion(String code);WorkflowInstanceRepository
Future<WorkflowInstance> create(WorkflowInstance instance);
Future<WorkflowInstance> update(WorkflowInstance instance);
Future<WorkflowInstance?> getById(String id);
Future<List<WorkflowInstance>> query(WorkflowInstanceQuery query);
Future<List<WorkflowInstance>> listActive({int limit = 100, int offset = 0});
Future<bool> updateStatus(String id, WorkflowStatus status);
Future<bool> mergeOutput(String id, Map<String, dynamic> output);
Future<bool> setVariable(String id, WorkflowVariable variable);
Future<bool> updateTokens(String id, List<WorkflowToken> tokens);
Future<List<WorkflowInstance>> getWaitingForSignal(String signalName);
Future<List<WorkflowInstance>> getStaleRunningInstances({
required Duration staleThreshold,
});UserTaskInstanceRepository
Future<UserTaskInstance> create(UserTaskInstance task);
Future<UserTaskInstance> update(UserTaskInstance task);
Future<UserTaskInstance?> getById(String id);
Future<List<UserTaskInstance>> getByWorkflowInstanceId(String workflowInstanceId);
Future<UserTaskInstance> complete(
String taskId,
String userId,
Map<String, dynamic> output,
);
Future<UserTaskInstance> cancel(String taskId);
Future<int> cancelByWorkflowInstanceId(String workflowInstanceId);
Future<UserTaskInstance?> findActive({
required String workflowInstanceId,
required String nodeId,
});
Future<List<UserTaskInstance>> search({
String? workflowInstanceId,
String? assignedToUserId,
String? assignedToRoleId,
String? assignedToGroupId,
TaskStatus? status,
int limit = 100,
int offset = 0,
});WorkflowEventRepository
Future<WorkflowEvent> append(WorkflowEvent event);
Future<List<WorkflowEvent>> appendAll(List<WorkflowEvent> events);
Future<WorkflowEvent?> getById(String id);
Future<List<WorkflowEvent>> getByWorkflowInstanceId(
String workflowInstanceId, {
List<WorkflowEventType>? eventTypes,
int? limit,
int? offset,
});
Future<List<WorkflowEvent>> query(WorkflowEventQuery query);
Future<List<WorkflowEvent>> getByNodeId(String workflowInstanceId, String nodeId);
Future<List<WorkflowEvent>> getByUserTaskId(String userTaskId);
Future<List<WorkflowEvent>> getSummary(String workflowInstanceId);
Future<int> pruneOlderThan(DateTime cutoff);Engine Wiring
final context = RegistryTypeResolver(
descriptors: [
DefaultWorkflowDescriptor(),
approvalTemplate.buildDescriptor(),
],
);
final engine = WorkflowEngine(
context: context,
storage: storage,
executionMode: ExecutionMode.production,
);
await engine.initialize();
final definition = await engine.storage.workflows.getByCode('approval');
if (definition != null) {
engine.loadWorkflowDefinition(definition);
}Custom Storage Checklist
When implementing your own adapter:
- Keep stored workflow definitions as
WorkflowDefinition, not executableWorkflow. - Preserve token state exactly; recovery depends on active/waiting tokens.
- Make event append idempotent or collision-safe.
- Keep user task completion and workflow signal handling coordinated at the application boundary.
- Implement stale-running queries for crash recovery.
- Return UTC timestamps consistently.