Overview
Building data pipelines in Go typically means hardcoding structure at compile time. Change the flow? Rebuild and redeploy.
Flume offers a different approach: define pipeline structure in configuration, update it at runtime.
factory := flume.New[Order]()
// Register reusable components
factory.Add(pipz.Apply("validate", validateOrder))
factory.Add(pipz.Transform("enrich", enrichOrder))
factory.AddPredicate(flume.Predicate[Order]{
Name: "high-value",
Predicate: func(ctx context.Context, o Order) bool { return o.Total > 1000 },
})
// Build from schema
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
- ref: validate
- type: filter
predicate: high-value
then:
ref: enrich
`)
// Update at runtime - no restart required
factory.SetSchema("orders", newSchema)
Type-safe, hot-reloadable, zero external dependencies.
Architecture
┌──────────────────────────────────────────────────────────────┐
│ Factory[T] │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Processors │ │ Predicates │ │ Conditions │ │
│ │ (Chainable) │ │ (bool fn) │ │ (string fn) │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────────┼────────────────┘ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ Schema (YAML/JSON) │ │
│ │ type: sequence │ │
│ │ children: │ │
│ │ - ref: validate │ │
│ │ - type: filter ... │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ Build │ Validate │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ pipz.Chainable[T] Pipeline │ │
│ └───────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
Register components once, compose them via schemas, build executable pipelines. Schemas validate against registered components before building.
Philosophy
Configuration Over Code
Pipeline structure belongs in configuration. Business logic belongs in code.
# Configuration: structure
type: sequence
children:
- ref: validate
- ref: process
// Code: behaviour
factory.Add(pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, errors.New("invalid total")
}
return o, nil
}))
Zero Magic
- Processors name themselves via
pipz.Name - Schemas are plain YAML/JSON
- Validation errors include full paths
- No reflection at runtime
Capabilities
Schema-driven pipelines open possibilities:
Hot Reloading - Update pipeline behaviour without restarts. A/B test processing logic, toggle features, respond to changing requirements.
Multi-tenant Processing - Different schemas per tenant. Same registered components, different compositions based on customer tier or configuration.
Channel Integration - Route pipeline outputs to Go channels. Fan-out to multiple streams, integrate with async consumers.
Resilience Patterns - Retry, timeout, circuit breaker, rate limiting - all declarative in schema. Change resilience strategy without code changes.
Flume provides the composition layer. What pipelines you build is up to you.
Priorities
Type Safety
Full generics from factory to pipeline. The type flows through:
factory := flume.New[Order]() // Factory[Order]
factory.Add(orderProcessor) // Only accepts Chainable[Order]
pipeline, _ := factory.Build(schema)
result, _ := pipeline.Process(ctx, order) // Returns (Order, error)
Validation
Schemas validate before building. Missing references, invalid configurations, structural errors - caught early with clear messages:
3 validation errors:
1. root.children[0]: processor 'missing' not found
2. root.children[1]: predicate 'unknown' not found
3. root.children[2].timeout: invalid duration "bad"
Performance
Built pipelines execute with zero overhead - the result is a pipz pipeline. Cost is only at build time:
| Operation | Time |
|---|---|
| Pipeline execution (3-step sequence) | ~459 ns |
| Schema build (10-step sequence) | ~6.7 µs |
Build once, execute many times.
Documentation
Learn
- Quickstart - Your first pipeline in 5 minutes
- Core Concepts - Factories, schemas, and components
- Architecture - How Flume works under the hood
- Building Pipelines - From simple to complex
Guides
- Schema Design - Best practices for schema structure
- Hot Reloading - Update pipelines without restarts
- Error Handling - Retry, fallback, circuit breakers
- Testing - Testing schema-driven pipelines
- Observability - Monitoring with capitan events
Reference
- API Reference - Factory methods and types
- Schema Format - YAML/JSON specification
- Connector Types - All 14 connectors
- Events - Observability signals