Architecture
Understanding Flume's internal architecture helps you use it effectively and debug issues.
Overview
┌─────────────────────────────────────────────────────────────┐
│ Factory[T] │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Processors │ │ Predicates │ │ Conditions │ │
│ │ map[Name] │ │ map[Name] │ │ map[Name] │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Schemas │ │ Pipelines │ │ Channels │ │
│ │ map[string] │ │ atomic.Ptr │ │ map[string] │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────┐
│ Schema Document │
│ (YAML/JSON/Struct) │
└────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Parse │ │ Validate │ │ Build │
└──────────┘ └──────────┘ └──────────┘
│
▼
┌────────────────────────┐
│ pipz.Chainable[T] │
│ (ready to use) │
└────────────────────────┘
Factory Structure
The factory maintains six internal maps protected by a read-write mutex:
type Factory[T pipz.Cloner[T]] struct {
processors map[pipz.Name]processorMeta[T]
predicates map[pipz.Name]predicateMeta[T]
conditions map[pipz.Name]conditionMeta[T]
schemas map[string]*Schema
pipelines map[string]*atomic.Pointer[pipz.Chainable[T]]
channels map[string]chan<- T
mu sync.RWMutex
}
Thread Safety
- Read operations (lookups, list, build) acquire read locks
- Write operations (add, remove, set schema) acquire write locks
- Pipeline access uses atomic pointers for lock-free retrieval
Schema Processing Pipeline
1. Parsing
Schemas can be loaded from multiple sources:
// String parsing
factory.BuildFromYAML(yamlStr)
factory.BuildFromJSON(jsonStr)
// File loading (detects format from extension)
factory.BuildFromFile("pipeline.yaml")
// Direct struct
factory.Build(schema)
The parser converts YAML/JSON into the Schema struct:
type Schema struct {
Version string
Node // embedded
}
type Node struct {
Ref string
Type string
Name string
Children []Node
Child *Node
// ... connector-specific fields
}
2. Validation
Before building, the factory validates the entire schema tree:
func (f *Factory[T]) ValidateSchema(schema Schema) error {
var errors ValidationErrors
f.validateNode(&schema.Node, []string{"root"}, &errors)
if len(errors) > 0 {
return errors
}
return nil
}
Validation traverses the tree recursively, checking:
- References exist - All
ref:values point to registered processors - Predicates exist - All
predicate:values are registered - Conditions exist - All
condition:values are registered - Required fields - Each node type has its required fields
- Constraints - Type-specific rules (e.g., fallback needs exactly 2 children)
- No cycles - Reference chains don't loop back
3. Building
The builder recursively constructs pipz chainables:
func (f *Factory[T]) buildNode(node *Node) (pipz.Chainable[T], error) {
// Handle processor reference
if node.Ref != "" {
return f.processors[pipz.Name(node.Ref)].processor, nil
}
// Handle connector types
switch node.Type {
case "sequence":
return f.buildSequence(node)
case "concurrent":
return f.buildConcurrent(node)
// ... other types
}
}
Each connector type has a dedicated builder that:
- Validates connector-specific requirements
- Recursively builds child nodes
- Constructs the pipz connector with appropriate options
Connector Mapping
Flume schema types map directly to pipz constructors:
| Schema Type | pipz Constructor |
|---|---|
sequence | pipz.NewSequence |
concurrent | pipz.NewConcurrent |
race | pipz.NewRace |
fallback | pipz.NewFallback |
retry | pipz.NewRetry or pipz.NewBackoff |
timeout | pipz.NewTimeout |
filter | pipz.NewFilter or custom routing |
switch | pipz.NewSwitch |
circuit-breaker | pipz.NewCircuitBreaker |
rate-limit | pipz.NewRateLimiter + pipz.NewSequence |
Hot Reload Mechanism
Named schemas support atomic updates:
func (f *Factory[T]) SetSchema(name string, schema Schema) error {
// 1. Validate first
if err := f.ValidateSchema(schema); err != nil {
return err
}
// 2. Build pipeline
pipeline, err := f.Build(schema)
if err != nil {
return err
}
// 3. Atomic update
f.mu.Lock()
defer f.mu.Unlock()
f.schemas[name] = &schema
if ptr, exists := f.pipelines[name]; exists {
ptr.Store(&pipeline) // Atomic swap
} else {
ptr := &atomic.Pointer[pipz.Chainable[T]]{}
ptr.Store(&pipeline)
f.pipelines[name] = ptr
}
return nil
}
Retrieval is lock-free after the initial lookup:
func (f *Factory[T]) Pipeline(name string) (pipz.Chainable[T], bool) {
f.mu.RLock()
ptr := f.pipelines[name]
f.mu.RUnlock()
if ptr == nil {
return nil, false
}
return *ptr.Load(), true // Atomic load
}
Observability Integration
Flume emits Capitan events throughout its lifecycle:
// Factory events
FactoryCreated
ProcessorRegistered
PredicateRegistered
ConditionRegistered
// Schema events
SchemaValidationStarted
SchemaValidationCompleted
SchemaValidationFailed
SchemaBuildStarted
SchemaBuildCompleted
SchemaBuildFailed
// Dynamic schema events
SchemaRegistered
SchemaUpdated
SchemaRemoved
PipelineRetrieved
Events include typed fields:
capitan.Emit(ctx, SchemaRegistered,
KeyName.Field(name),
KeyVersion.Field(schema.Version))
Channel Integration
Channels enable stream processing patterns:
factory.AddChannel("output", outputChan)
Stream nodes wrap channels in pipz effects:
func (f *Factory[T]) buildStream(node *Node) (pipz.Chainable[T], error) {
channel := f.channels[node.Stream]
return pipz.Effect(
pipz.Name(fmt.Sprintf("stream:%s", node.Stream)),
func(_ context.Context, item T) error {
channel <- item
return nil
},
), nil
}
Memory Model
Component Storage
Components are stored by value in maps:
type processorMeta[T any] struct {
processor pipz.Chainable[T] // Interface value
description string
tags []string
}
Pipeline Lifecycle
- Registration - Components stored in factory maps
- Build - New pipeline created from current registrations
- Storage - Pipeline stored via atomic pointer (for named schemas)
- Usage - Pipeline used directly, no factory reference needed
- Update - New pipeline built, pointer atomically swapped
- Removal - Schema removed, pipeline garbage collected when unreferenced
Concurrency
- Multiple goroutines can safely call
Pipeline()concurrently - Build operations are serialized through the write lock
- Built pipelines are independent of the factory
Next Steps
- Quickstart - Build your first pipeline
- Schema Design - Best practices
- Events Reference - All observability signals