Building Pipelines
Learn to construct increasingly sophisticated pipelines with Flume schemas.
Simple Sequences
The most basic pipeline chains processors sequentially:
type: sequence
children:
- ref: validate
- ref: enrich
- ref: save
Each processor receives the output of the previous one.
Parallel Execution
Process data concurrently with the concurrent type:
type: sequence
children:
- ref: validate
- type: concurrent
children:
- ref: enrich-from-db
- ref: enrich-from-api
- ref: calculate-score
- ref: save
All concurrent children receive cloned data and execute in parallel. Results are merged.
Conditional Processing
Filter
Execute a branch only when a condition is met:
type: filter
predicate: is-premium
then:
ref: apply-discount
With else branch:
type: filter
predicate: is-premium
then:
ref: premium-handler
else:
ref: standard-handler
Switch
Route to different handlers based on a condition value:
type: switch
condition: order-type
routes:
subscription:
ref: handle-subscription
one-time:
ref: handle-single-purchase
gift:
type: sequence
children:
- ref: validate-gift
- ref: handle-gift
default:
ref: handle-unknown
Register the condition:
factory.AddCondition(flume.Condition[Order]{
Name: "order-type",
Condition: func(ctx context.Context, o Order) string {
return o.Type // "subscription", "one-time", "gift", etc.
},
})
Error Handling
Retry
Retry failed operations:
type: retry
attempts: 3
child:
ref: flaky-api
With exponential backoff:
type: retry
attempts: 5
backoff: "100ms"
child:
ref: external-service
Fallback
Use a backup handler on failure:
type: fallback
children:
- ref: primary-service
- ref: backup-service
Timeout
Enforce time limits:
type: timeout
duration: "5s"
child:
ref: slow-operation
Circuit Breaker
Prevent cascading failures:
type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
ref: unreliable-service
Rate Limiting
Control throughput:
type: rate-limit
requests_per_second: 100.0
burst_size: 10
child:
ref: rate-sensitive-api
Nesting Patterns
Resilient External Calls
type: circuit-breaker
failure_threshold: 3
recovery_timeout: "60s"
child:
type: timeout
duration: "10s"
child:
type: retry
attempts: 3
backoff: "200ms"
child:
ref: external-api
Conditional with Fallback
type: filter
predicate: has-cache
then:
type: fallback
children:
- ref: get-from-cache
- ref: get-from-db
else:
ref: get-from-db
Parallel with Error Handling
type: concurrent
children:
- type: retry
attempts: 2
child:
ref: service-a
- type: fallback
children:
- ref: service-b-primary
- ref: service-b-backup
- ref: service-c
Complete Example
An order processing pipeline:
version: "2.0.0"
type: sequence
name: order-processing
children:
# Step 1: Validate
- ref: validate-order
# Step 2: Parallel enrichment
- type: concurrent
name: enrich
children:
- ref: fetch-customer
- ref: fetch-inventory
- ref: calculate-tax
# Step 3: Premium handling
- type: filter
predicate: is-premium-customer
then:
type: sequence
children:
- ref: apply-loyalty-discount
- ref: priority-queue
# Step 4: Payment routing
- type: switch
condition: payment-method
routes:
credit:
type: circuit-breaker
failure_threshold: 3
recovery_timeout: "60s"
child:
type: retry
attempts: 3
backoff: "500ms"
child:
ref: charge-credit-card
paypal:
type: timeout
duration: "30s"
child:
ref: paypal-checkout
crypto:
ref: crypto-payment
default:
ref: manual-payment-review
# Step 5: Post-payment
- type: concurrent
children:
- ref: update-inventory
- ref: send-confirmation
- type: filter
predicate: requires-shipping
then:
ref: create-shipment
# Step 6: Finalize
- ref: complete-order
Registrations:
// Processors
factory.Add(
pipz.Apply("validate-order", validateOrder),
pipz.Apply("fetch-customer", fetchCustomer),
pipz.Apply("fetch-inventory", fetchInventory),
// ... more processors
)
// Predicates
factory.AddPredicate(
flume.Predicate[Order]{Name: "is-premium-customer", Predicate: isPremium},
flume.Predicate[Order]{Name: "requires-shipping", Predicate: needsShipping},
)
// Conditions
factory.AddCondition(flume.Condition[Order]{
Name: "payment-method",
Values: []string{"credit", "paypal", "crypto"},
Condition: func(ctx context.Context, o Order) string {
return o.PaymentMethod
},
})
Schema Design Tips
1. Name Complex Nodes
type: sequence
name: payment-handling # Helps with debugging
children:
- ref: charge
- ref: confirm
2. Keep Depth Manageable
Break deeply nested schemas into named sub-pipelines:
// Register sub-pipeline
factory.SetSchema("payment-flow", paymentSchema)
3. Version Your Schemas
version: "1.2.0"
type: sequence
# ...
4. Document with Comments
YAML comments aren't preserved, but you can add documentation in companion files or use the description fields when registering components:
factory.AddPredicate(flume.Predicate[Order]{
Name: "high-value",
Description: "Orders over $1000 for premium handling",
Predicate: isHighValue,
})
Next Steps
- Schema Design Guide - Best practices
- Error Handling Guide - Resilience patterns
- Connector Types Reference - All options