zoobzio December 11, 2025 Edit this page

Building Pipelines

Learn to construct increasingly sophisticated pipelines with Flume schemas.

Simple Sequences

The most basic pipeline chains processors sequentially:

type: sequence
children:
  - ref: validate
  - ref: enrich
  - ref: save

Each processor receives the output of the previous one.

Parallel Execution

Process data concurrently with the concurrent type:

type: sequence
children:
  - ref: validate
  - type: concurrent
    children:
      - ref: enrich-from-db
      - ref: enrich-from-api
      - ref: calculate-score
  - ref: save

All concurrent children receive cloned data and execute in parallel. Results are merged.

Conditional Processing

Filter

Execute a branch only when a condition is met:

type: filter
predicate: is-premium
then:
  ref: apply-discount

With else branch:

type: filter
predicate: is-premium
then:
  ref: premium-handler
else:
  ref: standard-handler

Switch

Route to different handlers based on a condition value:

type: switch
condition: order-type
routes:
  subscription:
    ref: handle-subscription
  one-time:
    ref: handle-single-purchase
  gift:
    type: sequence
    children:
      - ref: validate-gift
      - ref: handle-gift
default:
  ref: handle-unknown

Register the condition:

factory.AddCondition(flume.Condition[Order]{
    Name: "order-type",
    Condition: func(ctx context.Context, o Order) string {
        return o.Type // "subscription", "one-time", "gift", etc.
    },
})

Error Handling

Retry

Retry failed operations:

type: retry
attempts: 3
child:
  ref: flaky-api

With exponential backoff:

type: retry
attempts: 5
backoff: "100ms"
child:
  ref: external-service

Fallback

Use a backup handler on failure:

type: fallback
children:
  - ref: primary-service
  - ref: backup-service

Timeout

Enforce time limits:

type: timeout
duration: "5s"
child:
  ref: slow-operation

Circuit Breaker

Prevent cascading failures:

type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
  ref: unreliable-service

Rate Limiting

Control throughput:

type: rate-limit
requests_per_second: 100.0
burst_size: 10
child:
  ref: rate-sensitive-api

Nesting Patterns

Resilient External Calls

type: circuit-breaker
failure_threshold: 3
recovery_timeout: "60s"
child:
  type: timeout
  duration: "10s"
  child:
    type: retry
    attempts: 3
    backoff: "200ms"
    child:
      ref: external-api

Conditional with Fallback

type: filter
predicate: has-cache
then:
  type: fallback
  children:
    - ref: get-from-cache
    - ref: get-from-db
else:
  ref: get-from-db

Parallel with Error Handling

type: concurrent
children:
  - type: retry
    attempts: 2
    child:
      ref: service-a
  - type: fallback
    children:
      - ref: service-b-primary
      - ref: service-b-backup
  - ref: service-c

Complete Example

An order processing pipeline:

version: "2.0.0"
type: sequence
name: order-processing
children:
  # Step 1: Validate
  - ref: validate-order

  # Step 2: Parallel enrichment
  - type: concurrent
    name: enrich
    children:
      - ref: fetch-customer
      - ref: fetch-inventory
      - ref: calculate-tax

  # Step 3: Premium handling
  - type: filter
    predicate: is-premium-customer
    then:
      type: sequence
      children:
        - ref: apply-loyalty-discount
        - ref: priority-queue

  # Step 4: Payment routing
  - type: switch
    condition: payment-method
    routes:
      credit:
        type: circuit-breaker
        failure_threshold: 3
        recovery_timeout: "60s"
        child:
          type: retry
          attempts: 3
          backoff: "500ms"
          child:
            ref: charge-credit-card
      paypal:
        type: timeout
        duration: "30s"
        child:
          ref: paypal-checkout
      crypto:
        ref: crypto-payment
    default:
      ref: manual-payment-review

  # Step 5: Post-payment
  - type: concurrent
    children:
      - ref: update-inventory
      - ref: send-confirmation
      - type: filter
        predicate: requires-shipping
        then:
          ref: create-shipment

  # Step 6: Finalize
  - ref: complete-order

Registrations:

// Processors
factory.Add(
    pipz.Apply("validate-order", validateOrder),
    pipz.Apply("fetch-customer", fetchCustomer),
    pipz.Apply("fetch-inventory", fetchInventory),
    // ... more processors
)

// Predicates
factory.AddPredicate(
    flume.Predicate[Order]{Name: "is-premium-customer", Predicate: isPremium},
    flume.Predicate[Order]{Name: "requires-shipping", Predicate: needsShipping},
)

// Conditions
factory.AddCondition(flume.Condition[Order]{
    Name:   "payment-method",
    Values: []string{"credit", "paypal", "crypto"},
    Condition: func(ctx context.Context, o Order) string {
        return o.PaymentMethod
    },
})

Schema Design Tips

1. Name Complex Nodes

type: sequence
name: payment-handling  # Helps with debugging
children:
  - ref: charge
  - ref: confirm

2. Keep Depth Manageable

Break deeply nested schemas into named sub-pipelines:

// Register sub-pipeline
factory.SetSchema("payment-flow", paymentSchema)

3. Version Your Schemas

version: "1.2.0"
type: sequence
# ...

4. Document with Comments

YAML comments aren't preserved, but you can add documentation in companion files or use the description fields when registering components:

factory.AddPredicate(flume.Predicate[Order]{
    Name:        "high-value",
    Description: "Orders over $1000 for premium handling",
    Predicate:   isHighValue,
})

Next Steps