Parallel Tasks Workflow
A complete example demonstrating parallel execution patterns.
Overview
This workflow demonstrates:
- Parallel gateway for concurrent execution
- Multiple independent branches
- Join gateway to synchronize completion
- Aggregating results from parallel branches
Workflow Diagram
[Start]
↓
[Initialize]
↓
[Fork: All Reviews]
↙ ↓ ↘
[Technical] [Legal] [Finance]
[Review] [Review] [Review]
↘ ↓ ↙
[Join: Wait All]
↓
[Evaluate Results]
↓
[Route Decision]
↙ ↘
[Approve] [Reject]
↓ ↓
[End: Approved] [End: Rejected]Complete Implementation
dart
import 'package:vyuh_workflow_engine/vyuh_workflow_engine.dart';
final parallelReviewWorkflow = WorkflowBuilder(
'PARALLEL-REVIEW',
'Parallel Review Workflow',
description: 'Multiple reviewers evaluate a request in parallel',
)
.start('begin')
// Step 1: Initialize request
.task('initialize',
execute: (ctx) async {
// Use getInitial<T> for original workflow input
final requestId = ctx.getInitialRequired<String>('requestId');
final submittedBy = ctx.getInitialRequired<String>('submittedBy');
// Fetch request (simulated)
// final request = await requestService.getById(requestId);
return {
'requestId': requestId,
'requestType': 'Feature Request', // request.type
'requestTitle': 'Sample Request', // request.title
'requestDetails': 'Request details here', // request.details
'submittedBy': submittedBy,
'submittedAt': DateTime.now().toIso8601String(),
'requiredReviews': ['technical', 'legal', 'finance'],
};
},
)
// Step 2: Fork to parallel reviews
.allOf('forkReviews', [
'prepareTechnicalReview',
'prepareLegalReview',
'prepareFinanceReview',
])
// Branch A: Technical Review
.task('prepareTechnicalReview',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
return {
'reviewType': 'technical',
'reviewTitle': 'Technical Review: $requestTitle',
'reviewCriteria': ['Technical feasibility', 'Architecture alignment', 'Security considerations', 'Performance impact'],
};
},
)
.userTask('technicalReview',
signal: 'technical_review_decision',
schemaType: 'userTask.review',
title: 'Technical Review: {{requestTitle}}',
description: 'Please review the technical aspects of this request.',
assignToRole: 'technical_reviewers',
storeAs: 'technicalReviewResult',
)
// Branch B: Legal Review
.task('prepareLegalReview',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
return {
'reviewType': 'legal',
'reviewTitle': 'Legal Review: $requestTitle',
'reviewCriteria': ['Regulatory compliance', 'Contract implications', 'Liability assessment', 'Privacy requirements'],
};
},
)
.userTask('legalReview',
signal: 'legal_review_decision',
schemaType: 'userTask.review',
title: 'Legal Review: {{requestTitle}}',
description: 'Please review the legal aspects of this request.',
assignToRole: 'legal_reviewers',
storeAs: 'legalReviewResult',
)
// Branch C: Finance Review
.task('prepareFinanceReview',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final requestTitle = ctx.getAny<String>('requestTitle') ?? 'Request';
return {
'reviewType': 'finance',
'reviewTitle': 'Finance Review: $requestTitle',
'reviewCriteria': ['Budget availability', 'Cost-benefit analysis', 'ROI projection', 'Financial risk'],
};
},
)
.userTask('financeReview',
signal: 'finance_review_decision',
schemaType: 'userTask.review',
title: 'Finance Review: {{requestTitle}}',
description: 'Please review the financial aspects of this request.',
assignToRole: 'finance_reviewers',
storeAs: 'financeReviewResult',
)
// Step 3: Join - Wait for all reviews (implicit at evaluateResults)
// Note: The parallel gateway handles joining automatically
// Step 4: Evaluate combined results
.task('evaluateResults',
execute: (ctx) async {
// Use getAny<T> for stored user task results from parallel branches
final technicalResult = ctx.getAny<Map<String, dynamic>>('technicalReviewResult');
final legalResult = ctx.getAny<Map<String, dynamic>>('legalReviewResult');
final financeResult = ctx.getAny<Map<String, dynamic>>('financeReviewResult');
// Check approvals
final technicalApproved = technicalResult?['approved'] == true;
final legalApproved = legalResult?['approved'] == true;
final financeApproved = financeResult?['approved'] == true;
final allApproved = technicalApproved && legalApproved && financeApproved;
// Collect rejection reasons
final rejectionReasons = <String>[];
if (!technicalApproved) rejectionReasons.add('Technical: ${technicalResult?['reason'] ?? 'Not approved'}');
if (!legalApproved) rejectionReasons.add('Legal: ${legalResult?['reason'] ?? 'Not approved'}');
if (!financeApproved) rejectionReasons.add('Finance: ${financeResult?['reason'] ?? 'Not approved'}');
// Calculate overall score
final scores = [technicalResult?['score'] as int? ?? 0, legalResult?['score'] as int? ?? 0, financeResult?['score'] as int? ?? 0];
final averageScore = scores.reduce((a, b) => a + b) / scores.length;
return {
'allApproved': allApproved,
'technicalApproved': technicalApproved,
'legalApproved': legalApproved,
'financeApproved': financeApproved,
'rejectionReasons': rejectionReasons,
'averageScore': averageScore,
'reviewSummary': {
'technical': {'approved': technicalApproved, 'reviewer': technicalResult?['completedBy'], 'score': technicalResult?['score']},
'legal': {'approved': legalApproved, 'reviewer': legalResult?['completedBy'], 'score': legalResult?['score']},
'finance': {'approved': financeApproved, 'reviewer': financeResult?['completedBy'], 'score': financeResult?['score']},
},
};
},
)
// Step 5: Route based on evaluation
.oneOf('routeDecision', [
Branch.whenFn(
(o) => o['allApproved'] == true,
then: 'processApproval',
),
Branch.otherwise(then: 'processRejection'),
])
// Step 6a: Process approval
.task('processApproval',
execute: (ctx) async {
// Update status (simulated)
// await requestService.updateStatus(requestId, 'approved');
return {
'finalDecision': 'approved',
'approvedAt': DateTime.now().toIso8601String(),
};
},
)
// Step 6b: Process rejection
.task('processRejection',
execute: (ctx) async {
// Update status (simulated)
// await requestService.updateStatus(requestId, 'rejected');
return {
'finalDecision': 'rejected',
'rejectedAt': DateTime.now().toIso8601String(),
};
},
)
// End nodes
.end('approved', name: 'Approved')
.end('rejected', name: 'Rejected')
// Flow edges - Main flow
.connect('begin', 'initialize')
.connect('initialize', 'forkReviews')
// Note: allOf() already creates edges to all target nodes
// Technical branch
.connect('prepareTechnicalReview', 'technicalReview')
.connect('technicalReview', 'evaluateResults')
// Legal branch
.connect('prepareLegalReview', 'legalReview')
.connect('legalReview', 'evaluateResults')
// Finance branch
.connect('prepareFinanceReview', 'financeReview')
.connect('financeReview', 'evaluateResults')
// After all parallel branches join at evaluateResults
.connect('evaluateResults', 'routeDecision')
.connect('processApproval', 'approved')
.connect('processRejection', 'rejected')
.build();Token Behavior
With parallel gateways, tokens split and merge:
dart
// At fork: 1 token → 3 tokens
// Token 1 → technicalReview branch
// Token 2 → legalReview branch
// Token 3 → financeReview branch
// At join: 3 tokens → 1 token
// Join waits for all tokens to arrive
// Merges output from all branchesPartial Completion Handling
Handle scenarios where some reviews may not complete:
dart
builder
// Timeout for reviews
.task('checkTimeout',
execute: (ctx) async {
// Use getAny<T> for accumulated output from earlier nodes
final submittedAt = ctx.getAny<String>('submittedAt');
final startedAt = DateTime.parse(submittedAt!);
final deadline = startedAt.add(Duration(days: 5));
final now = DateTime.now();
return {'isOverdue': now.isAfter(deadline), 'daysRemaining': deadline.difference(now).inDays};
},
)
// Use anyOf gateway for racing parallel branches (first to complete wins)
.anyOf('raceReviews', [
'technicalReview',
'legalReview',
'financeReview',
])Alternative: Sequential Reviews with Priority
dart
builder
// Process reviews in priority order
.task('determineReviewOrder',
execute: (ctx) async {
// Use get<T> for previous node output
final requestType = ctx.get<String>('requestType');
// Different order based on request type
List<String> order;
if (requestType == 'LEGAL_CONTRACT') {
order = ['legal', 'finance', 'technical'];
} else if (requestType == 'BUDGET_REQUEST') {
order = ['finance', 'technical', 'legal'];
} else {
order = ['technical', 'legal', 'finance'];
}
return {'reviewOrder': order, 'currentReviewIndex': 0};
},
)
// Loop through reviews sequentially
.oneOf('checkMoreReviews', [
Branch.whenFn((o) => o['currentReviewIndex'] < 3, then: 'nextReview'),
Branch.otherwise(then: 'evaluateResults'),
]);Testing
dart
group('Parallel Review Workflow', () {
test('approves when all reviews pass', () async {
final instance = await engine.startWorkflow(
workflowId: 'parallel-review',
input: {
'requestId': 'REQ-001',
'submittedBy': 'user@company.com',
},
);
// Complete all three reviews with approval
await completeReview(instance.id, 'technical_reviewers', approved: true, score: 85);
await completeReview(instance.id, 'legal_reviewers', approved: true, score: 90);
await completeReview(instance.id, 'finance_reviewers', approved: true, score: 80);
final completed = await engine.getWorkflowInstance(instance.id);
expect(completed.status, WorkflowStatus.completed);
expect(completed.output['finalDecision'], 'approved');
expect(completed.output['averageScore'], 85);
});
test('rejects when any review fails', () async {
final instance = await engine.startWorkflow(
workflowId: 'parallel-review',
input: {
'requestId': 'REQ-002',
'submittedBy': 'user@company.com',
},
);
// Technical and finance approve, legal rejects
await completeReview(instance.id, 'technical_reviewers', approved: true, score: 85);
await completeReview(instance.id, 'legal_reviewers', approved: false, score: 40,
reason: 'Regulatory compliance concerns');
await completeReview(instance.id, 'finance_reviewers', approved: true, score: 80);
final completed = await engine.getWorkflowInstance(instance.id);
expect(completed.status, WorkflowStatus.completed);
expect(completed.output['finalDecision'], 'rejected');
expect(completed.output['rejectionReasons'], contains(startsWith('Legal:')));
});
test('reviews can complete in any order', () async {
final instance = await engine.startWorkflow(
workflowId: 'parallel-review',
input: {
'requestId': 'REQ-003',
'submittedBy': 'user@company.com',
},
);
// Complete in different order than branches defined
await completeReview(instance.id, 'finance_reviewers', approved: true, score: 88);
await completeReview(instance.id, 'technical_reviewers', approved: true, score: 92);
await completeReview(instance.id, 'legal_reviewers', approved: true, score: 85);
final completed = await engine.getWorkflowInstance(instance.id);
expect(completed.status, WorkflowStatus.completed);
expect(completed.output['allApproved'], true);
});
});
Future<void> completeReview(
String instanceId,
String role, {
required bool approved,
required int score,
String? reason,
}) async {
final tasks = await engine.getUserTasks(
workflowInstanceId: instanceId,
assignedToRole: role,
status: UserTaskStatus.pending,
);
await engine.completeUserTask(
taskId: tasks.first.id,
completedBy: '$role@company.com',
response: {
'approved': approved,
'score': score,
if (reason != null) 'reason': reason,
},
);
}Performance Considerations
Parallel execution benefits:
- Faster completion - Reviews happen simultaneously
- Better resource utilization - Multiple reviewers work concurrently
- Reduced wait time - No sequential bottlenecks
Considerations:
- All branches must complete before join
- Output from all branches is merged
- Memory usage scales with branch count
Next Steps
- Simple Approval - Single approval pattern
- Gateways - Gateway types reference
- Tokens - Token behavior in parallel flows