Idempotency
Patterns for crash recovery and safe retries.
Why Idempotency Matters
Workflows can be interrupted at any point:
- Server crashes
- Network failures
- Database errors
- Process restarts
The engine resumes workflows from their last saved position, which means executors may be re-executed. Idempotent executors produce the same result whether executed once or multiple times.
Task Executor Idempotency
Check-Before-Create Pattern
dart
class CreateOrderTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.createOrder';
@override
String get name => 'Create Order';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final orderId = context.getRequired<String>('orderId');
// IDEMPOTENT: Check if already exists
final existing = await orderRepository.findById(orderId);
if (existing != null) {
// Return existing result - same as if we created it
return TaskSuccess(output: {
'orderId': existing.id,
'status': 'exists',
'createdAt': existing.createdAt.toIso8601String(),
});
}
// Create new order - use input from previous node
final order = await orderRepository.create(orderId, context.input);
return TaskSuccess(output: {
'orderId': order.id,
'status': 'created',
'createdAt': order.createdAt.toIso8601String(),
});
}
}Idempotency Key Pattern
dart
class PaymentTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.payment';
@override
String get name => 'Process Payment';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Use workflow instance ID + node ID as idempotency key
final idempotencyKey = '${context.workflowInstance.id}_${context.currentNode.id}';
// Check if payment already processed
final existing = await paymentService.findByIdempotencyKey(idempotencyKey);
if (existing != null) {
return TaskSuccess(output: {
'paymentId': existing.id,
'status': 'already_processed',
});
}
// Process payment with idempotency key
final payment = await paymentService.charge(
amount: context.getRequired<num>('amount'),
idempotencyKey: idempotencyKey,
);
return TaskSuccess(output: {
'paymentId': payment.id,
'status': 'processed',
});
}
}Record Progress Pattern
dart
class BulkProcessTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.bulkProcess';
@override
String get name => 'Bulk Process Items';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final items = context.getRequired<List>('items');
final processedIds = Set<String>.from(context.get<List>('processedIds') ?? []);
for (final item in items) {
final itemId = item['id'] as String;
// Skip already processed items
if (processedIds.contains(itemId)) {
continue;
}
// Process item
await processItem(item);
processedIds.add(itemId);
// Save progress (for long-running tasks)
// This allows resume from last processed item
}
return TaskSuccess(output: {
'processedIds': processedIds.toList(),
'totalProcessed': processedIds.length,
});
}
}User Task Idempotency
The engine handles user task idempotency automatically:
dart
// In the engine's user task node handling
Future<NodeResult> executeUserTaskNode(WorkflowContext context) async {
final nodeId = context.currentNode.id;
final instanceId = context.instance.id;
// IDEMPOTENT: Check for existing active task
final existing = await storage.userTaskInstances.findActive(instanceId, nodeId);
if (existing != null) {
// Task already exists, just wait for it
return WaitForSignalResult(signalName: existing.signalName);
}
// Create new task
final task = await createUserTask(context);
return WaitForSignalResult(signalName: task.signalName);
}Signal Idempotency
Prevent duplicate signal processing:
dart
// In your webhook handler
@PostMapping('/webhooks/payment')
Future<Response> handlePaymentWebhook(PaymentEvent event) async {
// Use event ID as idempotency key
final eventId = event.id;
// Check if already processed
if (await processedEvents.contains(eventId)) {
return Response.ok(); // Already handled, ignore
}
try {
await engine.sendSignal(
workflowInstanceId: event.metadata['workflowInstanceId'],
signalName: 'payment_completed',
payload: event.data,
);
// Mark as processed
await processedEvents.add(eventId);
} catch (e) {
// Log but don't re-add to processed
logger.error('Failed to process event: $eventId', e);
}
return Response.ok();
}Database Operations
Upsert Pattern
dart
class UpdateStatusTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.updateStatus';
@override
String get name => 'Update Status';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final entityId = context.getRequired<String>('entityId');
final newStatus = context.getRequired<String>('status');
// IDEMPOTENT: Upsert instead of insert
await db.execute('''
INSERT INTO entity_status (entity_id, status, updated_at)
VALUES (@entityId, @status, @now)
ON CONFLICT (entity_id) DO UPDATE
SET status = @status, updated_at = @now
''', {
'entityId': entityId,
'status': newStatus,
'now': DateTime.now(),
});
return TaskSuccess(output: {'statusUpdated': true});
}
}Conditional Update Pattern
dart
class IncrementCounterTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.incrementCounter';
@override
String get name => 'Increment Counter';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final entityId = context.getRequired<String>('entityId');
final expectedVersion = context.getRequired<int>('version');
// IDEMPOTENT: Only update if version matches
final updated = await db.execute('''
UPDATE entities
SET counter = counter + 1, version = version + 1
WHERE id = @id AND version = @version
''', {
'id': entityId,
'version': expectedVersion,
});
if (updated == 0) {
// Already incremented (version changed)
final current = await db.query('SELECT * FROM entities WHERE id = @id', {'id': entityId});
return TaskSuccess(output: {
'counter': current.first['counter'],
'version': current.first['version'],
'alreadyIncremented': true,
});
}
return TaskSuccess(output: {'incremented': true});
}
}External Service Calls
Pass-Through Idempotency Key
dart
class ExternalApiTaskExecutor extends TaskExecutor {
@override
String get schemaType => 'task.externalApi';
@override
String get name => 'External API Call';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final idempotencyKey = '${context.workflowInstance.id}_${context.currentNode.id}';
// External service uses idempotency key
final response = await httpClient.post(
'/api/process',
headers: {'Idempotency-Key': idempotencyKey},
body: context.input, // Pass previous node output as request body
);
return TaskSuccess(output: response.data);
}
}Testing Idempotency
dart
test('task executor is idempotent', () async {
final executor = CreateOrderTaskExecutor();
final context = createTestContext(input: {'orderId': 'ORD-1'});
// First execution
final result1 = await executor.execute(context);
expect(result1, isA<TaskSuccess>());
// Second execution (simulating retry)
final result2 = await executor.execute(context);
expect(result2, isA<TaskSuccess>());
// Results should be equivalent
expect((result2 as TaskSuccess).output['orderId'], equals('ORD-1'));
// Only one order should exist
final orders = await orderRepository.findAll();
expect(orders.length, equals(1));
});Best Practices
- Always check before create - Avoid duplicates
- Use idempotency keys - Unique identifiers for operations
- Prefer upserts - Over separate check-then-insert
- Track processed items - For bulk operations
- Test retry scenarios - Verify idempotency works
Next Steps
- Best Practices - Design guidance
- Error Handling - Error patterns
- Task Executors - Handler implementation