zoobzio December 11, 2025 Edit this page

Core Concepts

Flume is built around three core concepts: the Factory, Schemas, and Components.

The Factory

The factory is Flume's central registry. It holds all registered components and builds pipelines from schemas.

factory := flume.New[Order]()

The factory is generic over your data type T. This type must implement pipz.Cloner[T] to support parallel processing:

type Order struct {
    ID    string
    Total float64
}

func (o Order) Clone() Order {
    return Order{ID: o.ID, Total: o.Total}
}

Factory Responsibilities

  1. Component Registry - Store processors, predicates, and conditions
  2. Schema Storage - Manage named schemas with hot-reload support
  3. Pipeline Building - Construct pipz pipelines from schemas
  4. Validation - Verify schemas before building

Components

Components are the building blocks registered with the factory. There are three types:

Processors

Processors are pipz Chainable[T] implementations - the actual processing logic:

// Using pipz constructors
factory.Add(
    pipz.Apply("validate", validateOrder),      // T -> (T, error)
    pipz.Transform("normalize", normalizeOrder), // T -> T
    pipz.Effect("log", logOrder),               // T -> error (no modification)
)

Reference in schemas:

ref: validate

Predicates

Predicates are boolean functions for conditional processing:

factory.AddPredicate(flume.Predicate[Order]{
    Name:        "is-premium",
    Description: "Checks if customer has premium tier",
    Predicate: func(ctx context.Context, o Order) bool {
        return o.CustomerTier == "premium"
    },
})

Used in filter nodes:

type: filter
predicate: is-premium
then:
  ref: premium-handler

Conditions

Conditions return strings for multi-way routing:

factory.AddCondition(flume.Condition[Order]{
    Name:        "order-status",
    Description: "Returns the current order status",
    Values:      []string{"pending", "approved", "rejected"},
    Condition: func(ctx context.Context, o Order) string {
        return o.Status
    },
})

Used in switch nodes:

type: switch
condition: order-status
routes:
  pending:
    ref: handle-pending
  approved:
    ref: handle-approved
default:
  ref: handle-unknown

Schemas

Schemas define pipeline structure as YAML or JSON documents.

Schema Structure

version: "1.0.0"    # Optional version tracking
type: sequence      # Connector type
name: my-pipeline   # Optional name override
children:           # Child nodes
  - ref: step1
  - ref: step2

Node Types

Nodes are either references or connectors:

# Reference: points to registered processor
ref: validate

# Connector: defines structure
type: sequence
children:
  - ref: step1
  - ref: step2

Building Pipelines

Build from strings or files:

// From YAML string
pipeline, err := factory.BuildFromYAML(yamlStr)

// From JSON string
pipeline, err := factory.BuildFromJSON(jsonStr)

// From file (auto-detects format)
pipeline, err := factory.BuildFromFile("pipeline.yaml")

// From Schema struct
pipeline, err := factory.Build(schema)

Dynamic Schema Management

Flume supports named schemas that can be updated at runtime:

// Register a named schema
err := factory.SetSchema("order-pipeline", schema)

// Create a binding with auto-sync enabled
pipelineID := factory.Identity("order-processor", "Processes orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())

// Process requests (lock-free)
result, err := binding.Process(ctx, order)

// Update the schema - auto-sync bindings rebuild automatically
err = factory.SetSchema("order-pipeline", newSchema)

// Remove a schema
removed := factory.RemoveSchema("order-pipeline")

// List all schemas
names := factory.ListSchemas()

Hot Reloading

When you update a schema:

  1. New pipeline is built and validated
  2. All auto-sync bindings rebuild atomically
  3. In-flight requests complete with old pipeline
  4. New requests use updated pipeline
// Watcher pattern
for schemaChange := range watcher.Changes() {
    if err := factory.SetSchema(schemaChange.Name, schemaChange.Schema); err != nil {
        log.Printf("failed to update schema: %v", err)
        continue
    }
    log.Printf("schema %s updated to version %s",
        schemaChange.Name, schemaChange.Schema.Version)
}

Validation

Flume validates schemas before building, catching errors early:

err := factory.ValidateSchema(schema)
if err != nil {
    // ValidationErrors with detailed paths
    fmt.Println(err)
    // Output:
    // 3 validation errors:
    //   1. root.children[0]: processor 'missing' not found
    //   2. root.children[1]: predicate 'unknown' not found
    //   3. root.children[1].then: processor 'also-missing' not found
}

What Gets Validated

  • All processor/predicate/condition references exist
  • Required fields are present
  • Connector constraints are met (e.g., fallback needs 2 children)
  • Configuration values are valid (e.g., positive retry attempts)
  • Duration strings are parseable
  • No circular references

Component Introspection

Query the factory's registrations:

// Check existence
factory.HasProcessor("validate")
factory.HasPredicate("is-premium")
factory.HasCondition("order-status")

// List all
processors := factory.ListProcessors()  // []pipz.Name
predicates := factory.ListPredicates()
conditions := factory.ListConditions()

Component Removal

Remove components when no longer needed:

// Remove processors (returns count removed)
removed := factory.Remove("old-processor", "deprecated")

// Remove predicates
factory.RemovePredicate("old-predicate")

// Remove conditions
factory.RemoveCondition("old-condition")

Note: Removing a component doesn't affect already-built pipelines, but will cause validation errors if you try to build schemas that reference them.

Next Steps