zoobzio December 11, 2025 Edit this page

Overview

Building data pipelines in Go typically means hardcoding structure at compile time. Change the flow? Rebuild and redeploy.

Flume offers a different approach: define pipeline structure in configuration, update it at runtime.

factory := flume.New[Order]()

// Register reusable components
factory.Add(pipz.Apply("validate", validateOrder))
factory.Add(pipz.Transform("enrich", enrichOrder))
factory.AddPredicate(flume.Predicate[Order]{
    Name:      "high-value",
    Predicate: func(ctx context.Context, o Order) bool { return o.Total > 1000 },
})

// Build from schema
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
  - ref: validate
  - type: filter
    predicate: high-value
    then:
      ref: enrich
`)

// Update at runtime - no restart required
factory.SetSchema("orders", newSchema)

Type-safe, hot-reloadable, zero external dependencies.

Architecture

┌──────────────────────────────────────────────────────────────┐
│                         Factory[T]                           │
│                                                              │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │
│  │ Processors  │  │ Predicates  │  │ Conditions  │          │
│  │ (Chainable) │  │  (bool fn)  │  │ (string fn) │          │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘          │
│         │                │                │                  │
│         └────────────────┼────────────────┘                  │
│                          ▼                                   │
│  ┌───────────────────────────────────────────────────────┐  │
│  │                   Schema (YAML/JSON)                  │  │
│  │   type: sequence                                      │  │
│  │   children:                                           │  │
│  │     - ref: validate                                   │  │
│  │     - type: filter ...                                │  │
│  └───────────────────────────────────────────────────────┘  │
│                          │                                   │
│                    Build │ Validate                          │
│                          ▼                                   │
│  ┌───────────────────────────────────────────────────────┐  │
│  │              pipz.Chainable[T] Pipeline               │  │
│  └───────────────────────────────────────────────────────┘  │
└──────────────────────────────────────────────────────────────┘

Register components once, compose them via schemas, build executable pipelines. Schemas validate against registered components before building.

Philosophy

Configuration Over Code

Pipeline structure belongs in configuration. Business logic belongs in code.

# Configuration: structure
type: sequence
children:
  - ref: validate
  - ref: process
// Code: behaviour
factory.Add(pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
    if o.Total <= 0 {
        return o, errors.New("invalid total")
    }
    return o, nil
}))

Zero Magic

  • Processors name themselves via pipz.Name
  • Schemas are plain YAML/JSON
  • Validation errors include full paths
  • No reflection at runtime

Capabilities

Schema-driven pipelines open possibilities:

Hot Reloading - Update pipeline behaviour without restarts. A/B test processing logic, toggle features, respond to changing requirements.

Multi-tenant Processing - Different schemas per tenant. Same registered components, different compositions based on customer tier or configuration.

Channel Integration - Route pipeline outputs to Go channels. Fan-out to multiple streams, integrate with async consumers.

Resilience Patterns - Retry, timeout, circuit breaker, rate limiting - all declarative in schema. Change resilience strategy without code changes.

Flume provides the composition layer. What pipelines you build is up to you.

Priorities

Type Safety

Full generics from factory to pipeline. The type flows through:

factory := flume.New[Order]()      // Factory[Order]
factory.Add(orderProcessor)         // Only accepts Chainable[Order]
pipeline, _ := factory.Build(schema)
result, _ := pipeline.Process(ctx, order) // Returns (Order, error)

Validation

Schemas validate before building. Missing references, invalid configurations, structural errors - caught early with clear messages:

3 validation errors:
  1. root.children[0]: processor 'missing' not found
  2. root.children[1]: predicate 'unknown' not found
  3. root.children[2].timeout: invalid duration "bad"

Performance

Built pipelines execute with zero overhead - the result is a pipz pipeline. Cost is only at build time:

OperationTime
Pipeline execution (3-step sequence)~459 ns
Schema build (10-step sequence)~6.7 µs

Build once, execute many times.

Documentation

Learn

Guides

Reference