zoobzio January 7, 2025 Edit this page

Common Patterns

Practical recipes for building pipelines with flume.

Validation Pipeline

A common pattern for validating input before processing:

factory := flume.New[Order]()

validateID := factory.Identity("validate", "Validates order data")
processID := factory.Identity("process", "Processes valid orders")
rejectID := factory.Identity("reject", "Handles invalid orders")
isValidID := factory.Identity("is-valid", "Checks order validity")

factory.Add(
    pipz.Apply(validateID, validateOrder),
    pipz.Apply(processID, processOrder),
    pipz.Apply(rejectID, rejectOrder),
)
factory.AddPredicate(flume.Predicate[Order]{
    Identity:  isValidID,
    Predicate: func(ctx context.Context, o Order) bool { return o.Total > 0 },
})

pipeline, _ := factory.BuildFromYAML(`
type: filter
predicate: is-valid
then:
  ref: process
else:
  ref: reject
`)

Retry with Fallback

Retry an operation with a fallback on exhaustion:

type: fallback
children:
  - type: retry
    attempts: 3
    backoff: "100ms"
    child:
      ref: primary-service
  - ref: fallback-service

Fan-Out Processing

Process data through multiple paths concurrently:

type: concurrent
children:
  - ref: save-to-database
  - ref: send-notification
  - ref: update-cache

Conditional Routing

Route data based on type or category:

type: switch
condition: order-type
routes:
  standard:
    ref: process-standard
  express:
    ref: process-express
  priority:
    ref: process-priority
default:
  ref: process-unknown

Stream Output

Send processed data to a channel for async consumption:

output := make(chan Order, 100)
factory.AddChannel("processed-orders", output)

pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
  - ref: validate
  - ref: enrich
  - stream: processed-orders
`)

// Consume from channel
go func() {
    for order := range output {
        // Handle processed order
    }
}()

Circuit Breaker Protection

Protect against cascading failures:

type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
  ref: external-api-call

Rate-Limited Processing

Throttle requests to downstream services:

type: rate-limit
requests_per_second: 100
burst_size: 10
child:
  type: sequence
  children:
    - ref: transform
    - ref: send-to-api