Task Nodes
Task nodes execute automated work in a workflow. They're the workhorses of business process automation.
Definition
Inline Execute Function
For simple tasks, define the logic inline:
dart
builder.task(
'validateRequest',
name: 'Validate Request',
execute: (ExecutionContext ctx) async {
// Use getInitial<T> for original workflow input
final entityId = ctx.getInitial<String>('entityId');
if (entityId == null) {
throw Exception('entityId is required');
}
return {'validated': true}; // Auto-wrapped in TaskSuccess
},
);Executor-Based
For reusable logic, use an executor class:
dart
// Define the executor
class SendEmailTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.email.send';
@override
String get schemaType => _schemaType;
@override
String get name => 'Send Email';
@override
Future<TaskResult> execute(ExecutionContext context) async {
// Get data from previous node output
final to = context.get<String>('recipientEmail');
// Get node configuration
final template = context.getConfig<String>('template');
await emailService.send(to: to, template: template);
return TaskSuccess(output: {'sentAt': DateTime.now().toIso8601String()});
}
static final typeDescriptor = TypeDescriptor<TaskExecutor>(
schemaType: _schemaType,
fromJson: (_) => SendEmailTaskExecutor(),
title: 'Send Email',
);
}
// Register via descriptor
final descriptor = WorkflowDescriptor(
title: 'Email Tasks',
tasks: [SendEmailTaskExecutor.typeDescriptor],
);
// Use in workflow
builder.task(
'notifyApprover',
name: 'Notify Approver',
executor: SendEmailTaskExecutor(),
);ExecutionContext
The context provided to task executors:
dart
class ExecutionContext {
final Workflow workflow; // The workflow definition
final WorkflowInstance workflowInstance; // Current instance
final WorkflowNode currentNode; // The node being executed
final Map<String, dynamic> input; // Previous node output
final Map<String, dynamic> config; // Node configuration
// Typed accessors for previous node output
T? get<T>(String path); // Get from previous node output
T getRequired<T>(String path); // Throws if missing
// Original workflow input (immutable)
T? getInitial<T>(String path);
// Accumulated output from all nodes
T? getAny<T>(String path);
// Node configuration
T? getConfig<T>(String key);
T getConfigRequired<T>(String key);
}TaskResult
Tasks return one of two result types:
TaskSuccess
dart
return TaskSuccess(output: {
'processedAt': DateTime.now().toIso8601String(),
'recordId': newRecord.id,
});TaskFailure
dart
return TaskFailure(
errorType: ErrorType.validation,
message: 'Invalid input: missing required field',
isRetryable: false,
);
return TaskFailure(
errorType: ErrorType.internal,
message: 'Database connection failed',
isRetryable: true, // Engine may retry
);Output Namespacing
Use storeAs to namespace task output:
dart
builder.task(
'fetchUser',
name: 'Fetch User',
storeAs: 'userData', // Output stored under 'userData' key
execute: (ctx) async {
// Use get<T> for previous node output
final userId = ctx.get<String>('userId')!;
final user = await userService.get(userId);
return TaskSuccess(output: user.toJson());
},
);
// Later, access with dot-notation: ctx.get<String>('userData.name')Idempotent Tasks
Tasks should be idempotent for crash recovery:
dart
class CreateOrderTaskExecutor extends TaskExecutor {
static const _schemaType = 'task.order.create';
@override
String get schemaType => _schemaType;
@override
String get name => 'Create Order';
@override
Future<TaskResult> execute(ExecutionContext context) async {
final orderId = context.getRequired<String>('orderId');
// IDEMPOTENT: Check if already created
final existing = await orderRepository.findById(orderId);
if (existing != null) {
return TaskSuccess(output: {
'orderId': orderId,
'status': 'already_exists',
});
}
// Create new order
final order = await orderRepository.create(orderId, context.input);
return TaskSuccess(output: {
'orderId': order.id,
'status': 'created',
});
}
}Error Handling
Retryable Errors
dart
try {
await externalService.call();
return TaskSuccess(output: {...});
} catch (e) {
if (e is TimeoutException) {
return TaskFailure(
errorType: ErrorType.timeout,
message: 'Service timeout',
isRetryable: true,
);
}
rethrow;
}Non-Retryable Errors
dart
if (!isValid(input)) {
return TaskFailure(
errorType: ErrorType.validation,
message: 'Invalid input',
isRetryable: false, // Don't retry validation failures
);
}Common Task Patterns
Data Transformation
dart
builder.task('transformData', execute: (ctx) async {
// Use get<T> for previous node output
final input = ctx.get<Map<String, dynamic>>('rawData')!;
final transformed = transformService.process(input);
return {'transformedData': transformed}; // Auto-wrapped in TaskSuccess
});External API Call
dart
builder.task('fetchExternalData', execute: (ctx) async {
// Use get<T> for previous node output
final id = ctx.get<String>('id')!;
final response = await apiClient.get('/data/$id');
return response.data; // Auto-wrapped in TaskSuccess
});Database Operation
dart
builder.task('updateStatus', execute: (ctx) async {
// Use get<T> for previous node output
final entityId = ctx.get<String>('entityId')!;
final newStatus = ctx.get<String>('newStatus')!;
await repository.updateStatus(entityId, newStatus);
return {'updatedAt': DateTime.now().toIso8601String()}; // Auto-wrapped in TaskSuccess
});Notification
dart
builder.task('sendNotification', execute: (ctx) async {
// Use get<T> for previous node output
final userId = ctx.get<String>('userId')!;
await notificationService.send(
userId: userId,
message: 'Your request has been processed',
);
return {'notified': true}; // Auto-wrapped in TaskSuccess
});Best Practices
- Keep tasks focused - One task, one responsibility
- Make tasks idempotent - Safe to retry
- Use storeAs - Avoid output key collisions
- Handle errors gracefully - Return appropriate TaskFailure
- Log important actions - For debugging and audit
Next Steps
- User Tasks - Human interaction
- Task Executors - Executor implementation
- Idempotency - Crash recovery patterns