zoobzio December 11, 2025 Edit this page

Schema Design

Best practices for designing clear, maintainable Flume schemas.

Naming Conventions

Processor Names

Use kebab-case with verb-noun patterns:

// Good
pipz.Apply("validate-order", ...)
pipz.Apply("fetch-customer", ...)
pipz.Apply("send-notification", ...)

// Avoid
pipz.Apply("OrderValidation", ...)
pipz.Apply("customer", ...)
pipz.Apply("doStuff", ...)

Predicate Names

Use is-/has-/can- prefixes:

// Good
flume.Predicate[T]{Name: "is-premium", ...}
flume.Predicate[T]{Name: "has-discount-code", ...}
flume.Predicate[T]{Name: "can-ship-internationally", ...}

// Avoid
flume.Predicate[T]{Name: "premium", ...}
flume.Predicate[T]{Name: "check-discount", ...}

Condition Names

Use get- prefix or descriptive nouns:

// Good
flume.Condition[T]{Name: "get-order-type", ...}
flume.Condition[T]{Name: "payment-method", ...}
flume.Condition[T]{Name: "customer-tier", ...}

Schema Names

Use descriptive kebab-case:

factory.SetSchema("order-processing", ...)
factory.SetSchema("user-onboarding", ...)
factory.SetSchema("payment-v2", ...)

Structure Guidelines

Keep Depth Shallow

Deeply nested schemas are hard to read and debug:

# Hard to follow (5+ levels)
type: sequence
children:
  - type: filter
    predicate: a
    then:
      type: switch
        condition: b
        routes:
          x:
            type: retry
            child:
              type: timeout
                child:
                  ref: processor

Break into logical sub-schemas:

# Main schema
type: sequence
children:
  - ref: validate
  - ref: resilient-external-call  # Pre-composed
  - ref: finalize
// Register composed processor
resilientCall := pipz.NewCircuitBreaker("resilient",
    pipz.NewTimeout("timeout", apiCall, 5*time.Second),
    3, 30*time.Second,
)
factory.Add(resilientCall)

Use sequence to group related steps:

type: sequence
name: payment-processing
children:
  - ref: validate-payment
  - ref: authorize-payment
  - ref: capture-payment
  - ref: record-transaction

Name Complex Nodes

Add names to non-trivial nodes for debugging:

type: concurrent
name: parallel-enrichment
children:
  - ref: fetch-customer
  - ref: fetch-inventory

type: filter
name: premium-check
predicate: is-premium
then:
  ref: premium-handler

Version Management

Semantic Versioning

Use semantic versioning for schemas:

version: "1.0.0"  # Initial
version: "1.1.0"  # Added new route
version: "2.0.0"  # Breaking change to structure

Version in Schema Name

For A/B testing or gradual rollouts:

factory.SetSchema("checkout-v1", schemaV1)
factory.SetSchema("checkout-v2", schemaV2)

// Create bindings for each variant
bindingV1, _ := factory.Bind(v1ID, "checkout-v1", flume.WithAutoSync())
bindingV2, _ := factory.Bind(v2ID, "checkout-v2", flume.WithAutoSync())

// Route traffic
binding := getBindingForUser(user, bindingV1, bindingV2)
result, _ := binding.Process(ctx, order)

Error Handling Placement

Wrap at Boundaries

Apply error handling at service boundaries:

type: sequence
children:
  - ref: validate  # Internal - no wrap needed
  - type: circuit-breaker
    failure_threshold: 5
    child:
      type: retry
        attempts: 3
        child:
          ref: external-api  # External - wrap
  - ref: transform  # Internal - no wrap

Don't Over-Protect

Avoid redundant error handling:

# Overkill
type: retry
child:
  type: retry
    child:
      type: fallback
        children:
          - type: retry
              child:
                ref: processor
          - ref: fallback

Match Retry to Operation

# Quick operation - few retries, no backoff
type: retry
attempts: 2
child:
  ref: cache-lookup

# Slow external service - more retries with backoff
type: retry
attempts: 5
backoff: "500ms"
child:
  ref: payment-gateway

Performance Considerations

Minimize Cloning

concurrent and race nodes clone data. Avoid unnecessary parallelism:

# Unnecessary clone
type: concurrent
children:
  - ref: quick-transform  # Could be sequential

# Justified parallelism
type: concurrent
children:
  - ref: slow-api-call
  - ref: another-slow-call

Rate Limit at Entry Points

Place rate limiters early:

type: sequence
children:
  - type: rate-limit
    requests_per_second: 100.0
    child:
      ref: validate  # Everything after is protected
  - ref: process
  - ref: external-call

Use Timeouts Wisely

Set realistic timeouts:

type: timeout
duration: "30s"  # Based on actual SLA
child:
  ref: external-service

Schema Organization

Single Responsibility

Each schema should do one thing well:

// Separate concerns
factory.SetSchema("order-validation", validationSchema)
factory.SetSchema("order-payment", paymentSchema)
factory.SetSchema("order-fulfillment", fulfillmentSchema)

Compose Larger Flows

Build complex flows from simple schemas:

// High-level orchestration
factory.SetSchema("order-complete", Schema{
    Node: Node{
        Type: "sequence",
        Children: []Node{
            {Ref: "order-validation"},   // References another pipeline
            {Ref: "order-payment"},
            {Ref: "order-fulfillment"},
        },
    },
})

Documentation Practices

Use Descriptions

Add descriptions when registering:

factory.AddPredicate(flume.Predicate[Order]{
    Name:        "high-value",
    Description: "Orders exceeding $1000 threshold for premium handling",
    Predicate:   isHighValue,
})

factory.AddCondition(flume.Condition[Order]{
    Name:        "fulfillment-type",
    Description: "Determines shipping vs. digital delivery",
    Values:      []string{"ship", "digital", "pickup"},
    Condition:   getFulfillmentType,
})

Companion Documentation

Maintain schema documentation alongside files:

schemas/
  order-processing.yaml
  order-processing.md    # Documents the schema

Schema Diagrams

For complex pipelines, create visual diagrams:

validate → [concurrent: enrich] → [filter: premium?] → [switch: payment] → complete
                                        ↓                    ↓
                                  premium-handler     credit|paypal|crypto

Next Steps