Common Patterns
Practical recipes for building pipelines with flume.
Validation Pipeline
A common pattern for validating input before processing:
factory := flume.New[Order]()
validateID := factory.Identity("validate", "Validates order data")
processID := factory.Identity("process", "Processes valid orders")
rejectID := factory.Identity("reject", "Handles invalid orders")
isValidID := factory.Identity("is-valid", "Checks order validity")
factory.Add(
pipz.Apply(validateID, validateOrder),
pipz.Apply(processID, processOrder),
pipz.Apply(rejectID, rejectOrder),
)
factory.AddPredicate(flume.Predicate[Order]{
Identity: isValidID,
Predicate: func(ctx context.Context, o Order) bool { return o.Total > 0 },
})
pipeline, _ := factory.BuildFromYAML(`
type: filter
predicate: is-valid
then:
ref: process
else:
ref: reject
`)
Retry with Fallback
Retry an operation with a fallback on exhaustion:
type: fallback
children:
- type: retry
attempts: 3
backoff: "100ms"
child:
ref: primary-service
- ref: fallback-service
Fan-Out Processing
Process data through multiple paths concurrently:
type: concurrent
children:
- ref: save-to-database
- ref: send-notification
- ref: update-cache
Conditional Routing
Route data based on type or category:
type: switch
condition: order-type
routes:
standard:
ref: process-standard
express:
ref: process-express
priority:
ref: process-priority
default:
ref: process-unknown
Stream Output
Send processed data to a channel for async consumption:
output := make(chan Order, 100)
factory.AddChannel("processed-orders", output)
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
- ref: validate
- ref: enrich
- stream: processed-orders
`)
// Consume from channel
go func() {
for order := range output {
// Handle processed order
}
}()
Circuit Breaker Protection
Protect against cascading failures:
type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
ref: external-api-call
Rate-Limited Processing
Throttle requests to downstream services:
type: rate-limit
requests_per_second: 100
burst_size: 10
child:
type: sequence
children:
- ref: transform
- ref: send-to-api