Dynamic Pipeline Factory for Go. Logic in Code, Structure in Schema.
Register processing components once, define pipeline structure in YAML, and update behavior at runtime without redeployment. Built on pipz.
Get Startedimport "github.com/zoobz-io/flume"
// 1. Register components once
factory := flume.New[Order]()
factory.Add(
pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 { return o, fmt.Errorf("invalid total") }
return o, nil
}),
pipz.Apply("apply-discount", func(ctx context.Context, o Order) (Order, error) {
o.Total *= 0.9
return o, nil
}),
)
factory.AddPredicate(flume.Predicate[Order]{
Name: "is-premium",
Predicate: func(_ context.Context, o Order) bool { return o.Tier == "premium" },
})
// 2. Define structure in YAML — not in code
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
- ref: validate
- type: filter
predicate: is-premium
then:
ref: apply-discount
`)
// 3. Process data
result, _ := pipeline.Process(ctx, order)
// 4. Update behavior at runtime — no restart
factory.SetSchema("order-pipeline", newSchema)Why Flume?
Separate what your code does from how it's wired together.
Atomic Hot Reloading
Pipeline structure updates without restarts via atomic pointer swap. In-flight requests finish with old pipeline, new requests get the new one.
14 Connector Types
Sequence, filter, retry, timeout, circuit-breaker, rate-limit, fallback, switch, concurrent, race, contest, handle, scaffold, worker-pool.
Lock-Free Execution
Pipeline reads use atomic loads — zero lock overhead during processing. Updates serialize at low priority.
Comprehensive Validation
Catches reference errors, circular dependencies, and constraint violations before build. Full path context in error messages.
Built-In Observability
Capitan events for factory creation, registration, validation, builds, and schema updates. Metrics and audit out of the box.
Type-Safe Generics
Full compile-time safety from factory to pipeline. Your data type flows through the entire stack with zero reflection.
Capabilities
Schema-driven pipeline construction with hot reloading, validation, and observability.
| Feature | Description | Link |
|---|---|---|
| Schema-Driven Construction | Define pipelines in YAML or JSON at runtime. Operators modify config, not Go code. | Schema Format |
| Hot Reloading | SetSchema() triggers automatic rebuild for all auto-sync bindings. Atomic swap, zero downtime. | Hot Reloading |
| Lock-Free Bindings | Concurrent-safe pipeline access with atomic pointer swaps. No locks during processing. | Architecture |
| Schema Validation | Reference checking, circular detection, constraint satisfaction — all before building. CI/CD linting support. | Schema Design |
| Error Handling Patterns | Retry with backoff, fallback chains, circuit breakers, and timeouts — all declarative in schema. | Error Handling |
| Event Emission | Capitan signals for factory lifecycle, schema changes, builds, and pipeline access. | Observability |
Articles
Browse the full flume documentation.