zoobzio December 11, 2025 Edit this page

Architecture

Understanding Flume's internal architecture helps you use it effectively and debug issues.

Overview

┌─────────────────────────────────────────────────────────────┐
│                         Factory[T]                          │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │ Processors  │  │ Predicates  │  │ Conditions  │         │
│  │  map[Name]  │  │  map[Name]  │  │  map[Name]  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Schemas    │  │  Pipelines  │  │  Channels   │         │
│  │ map[string] │  │ atomic.Ptr  │  │ map[string] │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└─────────────────────────────────────────────────────────────┘
                           │
                           ▼
              ┌────────────────────────┐
              │    Schema Document     │
              │  (YAML/JSON/Struct)    │
              └────────────────────────┘
                           │
            ┌──────────────┼──────────────┐
            ▼              ▼              ▼
      ┌──────────┐  ┌──────────┐  ┌──────────┐
      │  Parse   │  │ Validate │  │  Build   │
      └──────────┘  └──────────┘  └──────────┘
                           │
                           ▼
              ┌────────────────────────┐
              │   pipz.Chainable[T]    │
              │    (ready to use)      │
              └────────────────────────┘

Factory Structure

The factory maintains six internal maps protected by a read-write mutex:

type Factory[T pipz.Cloner[T]] struct {
    processors map[pipz.Name]processorMeta[T]
    predicates map[pipz.Name]predicateMeta[T]
    conditions map[pipz.Name]conditionMeta[T]
    schemas    map[string]*Schema
    pipelines  map[string]*atomic.Pointer[pipz.Chainable[T]]
    channels   map[string]chan<- T
    mu         sync.RWMutex
}

Thread Safety

  • Read operations (lookups, list, build) acquire read locks
  • Write operations (add, remove, set schema) acquire write locks
  • Pipeline access uses atomic pointers for lock-free retrieval

Schema Processing Pipeline

1. Parsing

Schemas can be loaded from multiple sources:

// String parsing
factory.BuildFromYAML(yamlStr)
factory.BuildFromJSON(jsonStr)

// File loading (detects format from extension)
factory.BuildFromFile("pipeline.yaml")

// Direct struct
factory.Build(schema)

The parser converts YAML/JSON into the Schema struct:

type Schema struct {
    Version string
    Node    // embedded
}

type Node struct {
    Ref       string
    Type      string
    Name      string
    Children  []Node
    Child     *Node
    // ... connector-specific fields
}

2. Validation

Before building, the factory validates the entire schema tree:

func (f *Factory[T]) ValidateSchema(schema Schema) error {
    var errors ValidationErrors
    f.validateNode(&schema.Node, []string{"root"}, &errors)
    if len(errors) > 0 {
        return errors
    }
    return nil
}

Validation traverses the tree recursively, checking:

  • References exist - All ref: values point to registered processors
  • Predicates exist - All predicate: values are registered
  • Conditions exist - All condition: values are registered
  • Required fields - Each node type has its required fields
  • Constraints - Type-specific rules (e.g., fallback needs exactly 2 children)
  • No cycles - Reference chains don't loop back

3. Building

The builder recursively constructs pipz chainables:

func (f *Factory[T]) buildNode(node *Node) (pipz.Chainable[T], error) {
    // Handle processor reference
    if node.Ref != "" {
        return f.processors[pipz.Name(node.Ref)].processor, nil
    }

    // Handle connector types
    switch node.Type {
    case "sequence":
        return f.buildSequence(node)
    case "concurrent":
        return f.buildConcurrent(node)
    // ... other types
    }
}

Each connector type has a dedicated builder that:

  1. Validates connector-specific requirements
  2. Recursively builds child nodes
  3. Constructs the pipz connector with appropriate options

Connector Mapping

Flume schema types map directly to pipz constructors:

Schema Typepipz Constructor
sequencepipz.NewSequence
concurrentpipz.NewConcurrent
racepipz.NewRace
fallbackpipz.NewFallback
retrypipz.NewRetry or pipz.NewBackoff
timeoutpipz.NewTimeout
filterpipz.NewFilter or custom routing
switchpipz.NewSwitch
circuit-breakerpipz.NewCircuitBreaker
rate-limitpipz.NewRateLimiter + pipz.NewSequence

Hot Reload Mechanism

Named schemas support atomic updates:

func (f *Factory[T]) SetSchema(name string, schema Schema) error {
    // 1. Validate first
    if err := f.ValidateSchema(schema); err != nil {
        return err
    }

    // 2. Build pipeline
    pipeline, err := f.Build(schema)
    if err != nil {
        return err
    }

    // 3. Atomic update
    f.mu.Lock()
    defer f.mu.Unlock()

    f.schemas[name] = &schema
    if ptr, exists := f.pipelines[name]; exists {
        ptr.Store(&pipeline)  // Atomic swap
    } else {
        ptr := &atomic.Pointer[pipz.Chainable[T]]{}
        ptr.Store(&pipeline)
        f.pipelines[name] = ptr
    }
    return nil
}

Retrieval is lock-free after the initial lookup:

func (f *Factory[T]) Pipeline(name string) (pipz.Chainable[T], bool) {
    f.mu.RLock()
    ptr := f.pipelines[name]
    f.mu.RUnlock()

    if ptr == nil {
        return nil, false
    }
    return *ptr.Load(), true  // Atomic load
}

Observability Integration

Flume emits Capitan events throughout its lifecycle:

// Factory events
FactoryCreated
ProcessorRegistered
PredicateRegistered
ConditionRegistered

// Schema events
SchemaValidationStarted
SchemaValidationCompleted
SchemaValidationFailed
SchemaBuildStarted
SchemaBuildCompleted
SchemaBuildFailed

// Dynamic schema events
SchemaRegistered
SchemaUpdated
SchemaRemoved
PipelineRetrieved

Events include typed fields:

capitan.Emit(ctx, SchemaRegistered,
    KeyName.Field(name),
    KeyVersion.Field(schema.Version))

Channel Integration

Channels enable stream processing patterns:

factory.AddChannel("output", outputChan)

Stream nodes wrap channels in pipz effects:

func (f *Factory[T]) buildStream(node *Node) (pipz.Chainable[T], error) {
    channel := f.channels[node.Stream]

    return pipz.Effect(
        pipz.Name(fmt.Sprintf("stream:%s", node.Stream)),
        func(_ context.Context, item T) error {
            channel <- item
            return nil
        },
    ), nil
}

Memory Model

Component Storage

Components are stored by value in maps:

type processorMeta[T any] struct {
    processor   pipz.Chainable[T]  // Interface value
    description string
    tags        []string
}

Pipeline Lifecycle

  1. Registration - Components stored in factory maps
  2. Build - New pipeline created from current registrations
  3. Storage - Pipeline stored via atomic pointer (for named schemas)
  4. Usage - Pipeline used directly, no factory reference needed
  5. Update - New pipeline built, pointer atomically swapped
  6. Removal - Schema removed, pipeline garbage collected when unreferenced

Concurrency

  • Multiple goroutines can safely call Pipeline() concurrently
  • Build operations are serialized through the write lock
  • Built pipelines are independent of the factory

Next Steps