zoobzio December 11, 2025 Edit this page

Hot Reloading

Update pipeline definitions at runtime without service restarts.

Overview

Flume's hot reloading enables:

  • Zero-downtime updates - Swap pipelines while requests continue
  • A/B testing - Switch between pipeline variants
  • Feature flags - Enable/disable pipeline stages dynamically
  • Configuration-driven behaviour - Operators modify pipelines without deploys

Basic Usage

Register Named Schemas

factory := flume.New[Order]()

// Register components
factory.Add(processors...)
factory.AddPredicate(predicates...)

// Register named schema
schema := flume.Schema{
    Version: "1.0.0",
    Node: flume.Node{
        Type: "sequence",
        Children: []flume.Node{
            {Ref: "validate"},
            {Ref: "process"},
        },
    },
}

err := factory.SetSchema("order-pipeline", schema)

Create a Binding

// Create a binding with auto-sync enabled
pipelineID := factory.Identity("order-processor", "Processes incoming orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())
if err != nil {
    return fmt.Errorf("failed to bind: %w", err)
}

// Process requests (lock-free)
result, err := binding.Process(ctx, order)

Update at Runtime

newSchema := flume.Schema{
    Version: "1.1.0",
    Node: flume.Node{
        Type: "sequence",
        Children: []flume.Node{
            {Ref: "validate"},
            {Ref: "enrich"},  // Added step
            {Ref: "process"},
        },
    },
}

err := factory.SetSchema("order-pipeline", newSchema)
// New requests immediately use updated pipeline

How It Works

Atomic Pointer Swap

// Internally, pipelines are stored as atomic pointers
pipelines map[string]*atomic.Pointer[pipz.Chainable[T]]

// Update atomically swaps the pointer
ptr.Store(&newPipeline)

// Retrieval loads atomically
return *ptr.Load(), true

Request Continuity

  1. Request A starts with pipeline v1
  2. Schema updates to v2
  3. Request A completes with v1 (unaffected)
  4. Request B starts with v2

No locks are held during processing - updates don't block requests.

File-Based Reloading

Watch for Changes

func watchSchemaFile(ctx context.Context, factory *flume.Factory[Order], path string) {
    watcher, _ := fsnotify.NewWatcher()
    defer watcher.Close()

    watcher.Add(path)

    for {
        select {
        case <-ctx.Done():
            return
        case event := <-watcher.Events:
            if event.Op&fsnotify.Write == fsnotify.Write {
                reloadSchema(factory, path)
            }
        }
    }
}

func reloadSchema(factory *flume.Factory[Order], path string) {
    pipeline, err := factory.BuildFromFile(path)
    if err != nil {
        log.Printf("failed to reload schema: %v", err)
        return
    }

    // Read file to get schema for SetSchema
    data, _ := os.ReadFile(path)
    var schema flume.Schema
    yaml.Unmarshal(data, &schema)

    if err := factory.SetSchema("main", schema); err != nil {
        log.Printf("failed to set schema: %v", err)
    }
}

Multi-Schema Directory

func watchSchemaDir(ctx context.Context, factory *flume.Factory[Order], dir string) {
    watcher, _ := fsnotify.NewWatcher()
    watcher.Add(dir)

    for event := range watcher.Events {
        if event.Op&fsnotify.Write == 0 {
            continue
        }

        name := strings.TrimSuffix(filepath.Base(event.Name), filepath.Ext(event.Name))

        data, _ := os.ReadFile(event.Name)
        var schema flume.Schema
        if err := yaml.Unmarshal(data, &schema); err != nil {
            log.Printf("invalid schema %s: %v", name, err)
            continue
        }

        if err := factory.SetSchema(name, schema); err != nil {
            log.Printf("failed to update %s: %v", name, err)
        }
    }
}

Remote Configuration

HTTP Endpoint

http.HandleFunc("/schemas/{name}", func(w http.ResponseWriter, r *http.Request) {
    name := r.PathValue("name")

    switch r.Method {
    case "GET":
        schema, ok := factory.GetSchema(name)
        if !ok {
            http.NotFound(w, r)
            return
        }
        json.NewEncoder(w).Encode(schema)

    case "PUT":
        var schema flume.Schema
        json.NewDecoder(r.Body).Decode(&schema)

        if err := factory.SetSchema(name, schema); err != nil {
            http.Error(w, err.Error(), 400)
            return
        }
        w.WriteHeader(204)

    case "DELETE":
        if !factory.RemoveSchema(name) {
            http.NotFound(w, r)
            return
        }
        w.WriteHeader(204)
    }
})

Polling

func pollSchemas(ctx context.Context, factory *flume.Factory[Order], url string) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            resp, err := http.Get(url)
            if err != nil {
                continue
            }

            var schemas map[string]flume.Schema
            json.NewDecoder(resp.Body).Decode(&schemas)
            resp.Body.Close()

            for name, schema := range schemas {
                existing, ok := factory.GetSchema(name)
                if ok && existing.Version == schema.Version {
                    continue // No change
                }
                factory.SetSchema(name, schema)
            }
        }
    }
}

Version Tracking

Compare Versions

existing, ok := factory.GetSchema("pipeline")
if ok && existing.Version == newSchema.Version {
    return // Already up to date
}

err := factory.SetSchema("pipeline", newSchema)

Observability

Flume emits events on schema changes:

import "github.com/zoobz-io/capitan"

capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
    var name, oldVersion, newVersion string
    for _, f := range fields {
        switch f.Key {
        case "name":
            name = f.String()
        case "old_version":
            oldVersion = f.String()
        case "new_version":
            newVersion = f.String()
        }
    }
    log.Printf("schema %s updated: %s -> %s", name, oldVersion, newVersion)
})

Graceful Updates

Validation First

Always validate before applying:

if err := factory.ValidateSchema(newSchema); err != nil {
    return fmt.Errorf("invalid schema: %w", err)
}

// Schema is valid, safe to apply
factory.SetSchema(name, newSchema)

Rollback Pattern

func updateWithRollback(factory *flume.Factory[Order], name string, newSchema flume.Schema, binding *flume.Binding[Order]) error {
    // Save current
    oldSchema, hadOld := factory.GetSchema(name)

    // Apply new (auto-sync bindings rebuild automatically)
    if err := factory.SetSchema(name, newSchema); err != nil {
        return err
    }

    // Test with canary request
    _, err := binding.Process(context.Background(), canaryOrder)

    if err != nil && hadOld {
        // Rollback to previous schema
        factory.SetSchema(name, oldSchema)
        return fmt.Errorf("canary failed, rolled back: %w", err)
    }

    return nil
}

Best Practices

1. Use Semantic Versioning

version: "1.2.3"  # MAJOR.MINOR.PATCH

2. Log All Changes

log.Printf("schema %s updated to version %s", name, schema.Version)

3. Validate Component Dependencies

Before removing a processor, check if schemas reference it:

for _, schemaName := range factory.ListSchemas() {
    schema, _ := factory.GetSchema(schemaName)
    if schemaReferencesProcessor(schema, "old-processor") {
        return fmt.Errorf("cannot remove: schema %s uses it", schemaName)
    }
}

4. Consider Circuit Breakers

Protect against bad schema updates:

type GuardedFactory[T pipz.Cloner[T]] struct {
    factory       *flume.Factory[T]
    updateBreaker *circuitbreaker.CircuitBreaker
}

func (g *GuardedFactory[T]) SetSchema(name string, schema flume.Schema) error {
    return g.updateBreaker.Execute(func() error {
        return g.factory.SetSchema(name, schema)
    })
}

Next Steps