Dynamic Pipeline Factory for Go. Logic in Code, Structure in Schema.

Register processing components once, define pipeline structure in YAML, and update behavior at runtime without redeployment. Built on pipz.

Get Started
import "github.com/zoobz-io/flume"

// 1. Register components once
factory := flume.New[Order]()
factory.Add(
    pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
        if o.Total <= 0 { return o, fmt.Errorf("invalid total") }
        return o, nil
    }),
    pipz.Apply("apply-discount", func(ctx context.Context, o Order) (Order, error) {
        o.Total *= 0.9
        return o, nil
    }),
)
factory.AddPredicate(flume.Predicate[Order]{
    Name: "is-premium",
    Predicate: func(_ context.Context, o Order) bool { return o.Tier == "premium" },
})

// 2. Define structure in YAML — not in code
pipeline, _ := factory.BuildFromYAML(`
type: sequence
children:
  - ref: validate
  - type: filter
    predicate: is-premium
    then:
      ref: apply-discount
`)

// 3. Process data
result, _ := pipeline.Process(ctx, order)

// 4. Update behavior at runtime — no restart
factory.SetSchema("order-pipeline", newSchema)
90%Test Coverage
A+Go Report
MITLicense
1.24+Go Version
v1.0.1Latest Release

Why Flume?

Separate what your code does from how it's wired together.

Atomic Hot Reloading

Pipeline structure updates without restarts via atomic pointer swap. In-flight requests finish with old pipeline, new requests get the new one.

14 Connector Types

Sequence, filter, retry, timeout, circuit-breaker, rate-limit, fallback, switch, concurrent, race, contest, handle, scaffold, worker-pool.

Lock-Free Execution

Pipeline reads use atomic loads — zero lock overhead during processing. Updates serialize at low priority.

Comprehensive Validation

Catches reference errors, circular dependencies, and constraint violations before build. Full path context in error messages.

Built-In Observability

Capitan events for factory creation, registration, validation, builds, and schema updates. Metrics and audit out of the box.

Type-Safe Generics

Full compile-time safety from factory to pipeline. Your data type flows through the entire stack with zero reflection.

Capabilities

Schema-driven pipeline construction with hot reloading, validation, and observability.

FeatureDescriptionLink
Schema-Driven ConstructionDefine pipelines in YAML or JSON at runtime. Operators modify config, not Go code.Schema Format
Hot ReloadingSetSchema() triggers automatic rebuild for all auto-sync bindings. Atomic swap, zero downtime.Hot Reloading
Lock-Free BindingsConcurrent-safe pipeline access with atomic pointer swaps. No locks during processing.Architecture
Schema ValidationReference checking, circular detection, constraint satisfaction — all before building. CI/CD linting support.Schema Design
Error Handling PatternsRetry with backoff, fallback chains, circuit breakers, and timeouts — all declarative in schema.Error Handling
Event EmissionCapitan signals for factory lifecycle, schema changes, builds, and pipeline access.Observability

Articles

Browse the full flume documentation.

OverviewSchema-driven pipeline factory for pipz with hot-reloading capabilities

Learn

QuickstartBuild your first Flume pipeline in 5 minutes
Core ConceptsUnderstanding Flume's fundamental building blocks - factories, schemas, and components
ArchitectureHow Flume works under the hood - from schema to pipeline
Building PipelinesFrom simple sequences to complex nested pipelines

Guides

Schema DesignBest practices for designing maintainable Flume schemas
Hot ReloadingUpdate pipeline behaviour at runtime without restarts
Error HandlingBuild resilient pipelines with retry, fallback, circuit breakers, and timeouts
TestingTesting strategies for schema-driven pipelines
ObservabilityMonitor Flume pipelines with Capitan events

Cookbook

Common PatternsRecipes and patterns for building pipelines with flume

Reference

API ReferenceComplete reference for Flume's public API
Schema FormatComplete YAML/JSON schema specification
Connector TypesDetailed reference for all Flume connector types
EventsComplete reference for Flume observability events