Core Concepts
Flume is built around three core concepts: the Factory, Schemas, and Components.
The Factory
The factory is Flume's central registry. It holds all registered components and builds pipelines from schemas.
factory := flume.New[Order]()
The factory is generic over your data type T. This type must implement pipz.Cloner[T] to support parallel processing:
type Order struct {
ID string
Total float64
}
func (o Order) Clone() Order {
return Order{ID: o.ID, Total: o.Total}
}
Factory Responsibilities
- Component Registry - Store processors, predicates, and conditions
- Schema Storage - Manage named schemas with hot-reload support
- Pipeline Building - Construct pipz pipelines from schemas
- Validation - Verify schemas before building
Components
Components are the building blocks registered with the factory. There are three types:
Processors
Processors are pipz Chainable[T] implementations - the actual processing logic:
// Using pipz constructors
factory.Add(
pipz.Apply("validate", validateOrder), // T -> (T, error)
pipz.Transform("normalize", normalizeOrder), // T -> T
pipz.Effect("log", logOrder), // T -> error (no modification)
)
Reference in schemas:
ref: validate
Predicates
Predicates are boolean functions for conditional processing:
factory.AddPredicate(flume.Predicate[Order]{
Name: "is-premium",
Description: "Checks if customer has premium tier",
Predicate: func(ctx context.Context, o Order) bool {
return o.CustomerTier == "premium"
},
})
Used in filter nodes:
type: filter
predicate: is-premium
then:
ref: premium-handler
Conditions
Conditions return strings for multi-way routing:
factory.AddCondition(flume.Condition[Order]{
Name: "order-status",
Description: "Returns the current order status",
Values: []string{"pending", "approved", "rejected"},
Condition: func(ctx context.Context, o Order) string {
return o.Status
},
})
Used in switch nodes:
type: switch
condition: order-status
routes:
pending:
ref: handle-pending
approved:
ref: handle-approved
default:
ref: handle-unknown
Schemas
Schemas define pipeline structure as YAML or JSON documents.
Schema Structure
version: "1.0.0" # Optional version tracking
type: sequence # Connector type
name: my-pipeline # Optional name override
children: # Child nodes
- ref: step1
- ref: step2
Node Types
Nodes are either references or connectors:
# Reference: points to registered processor
ref: validate
# Connector: defines structure
type: sequence
children:
- ref: step1
- ref: step2
Building Pipelines
Build from strings or files:
// From YAML string
pipeline, err := factory.BuildFromYAML(yamlStr)
// From JSON string
pipeline, err := factory.BuildFromJSON(jsonStr)
// From file (auto-detects format)
pipeline, err := factory.BuildFromFile("pipeline.yaml")
// From Schema struct
pipeline, err := factory.Build(schema)
Dynamic Schema Management
Flume supports named schemas that can be updated at runtime:
// Register a named schema
err := factory.SetSchema("order-pipeline", schema)
// Create a binding with auto-sync enabled
pipelineID := factory.Identity("order-processor", "Processes orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())
// Process requests (lock-free)
result, err := binding.Process(ctx, order)
// Update the schema - auto-sync bindings rebuild automatically
err = factory.SetSchema("order-pipeline", newSchema)
// Remove a schema
removed := factory.RemoveSchema("order-pipeline")
// List all schemas
names := factory.ListSchemas()
Hot Reloading
When you update a schema:
- New pipeline is built and validated
- All auto-sync bindings rebuild atomically
- In-flight requests complete with old pipeline
- New requests use updated pipeline
// Watcher pattern
for schemaChange := range watcher.Changes() {
if err := factory.SetSchema(schemaChange.Name, schemaChange.Schema); err != nil {
log.Printf("failed to update schema: %v", err)
continue
}
log.Printf("schema %s updated to version %s",
schemaChange.Name, schemaChange.Schema.Version)
}
Validation
Flume validates schemas before building, catching errors early:
err := factory.ValidateSchema(schema)
if err != nil {
// ValidationErrors with detailed paths
fmt.Println(err)
// Output:
// 3 validation errors:
// 1. root.children[0]: processor 'missing' not found
// 2. root.children[1]: predicate 'unknown' not found
// 3. root.children[1].then: processor 'also-missing' not found
}
What Gets Validated
- All processor/predicate/condition references exist
- Required fields are present
- Connector constraints are met (e.g., fallback needs 2 children)
- Configuration values are valid (e.g., positive retry attempts)
- Duration strings are parseable
- No circular references
Component Introspection
Query the factory's registrations:
// Check existence
factory.HasProcessor("validate")
factory.HasPredicate("is-premium")
factory.HasCondition("order-status")
// List all
processors := factory.ListProcessors() // []pipz.Name
predicates := factory.ListPredicates()
conditions := factory.ListConditions()
Component Removal
Remove components when no longer needed:
// Remove processors (returns count removed)
removed := factory.Remove("old-processor", "deprecated")
// Remove predicates
factory.RemovePredicate("old-predicate")
// Remove conditions
factory.RemoveCondition("old-condition")
Note: Removing a component doesn't affect already-built pipelines, but will cause validation errors if you try to build schemas that reference them.
Next Steps
- Architecture - How Flume works under the hood
- Quickstart - Build your first pipeline
- API Reference - Complete method documentation