Hot Reloading
Update pipeline definitions at runtime without service restarts.
Overview
Flume's hot reloading enables:
- Zero-downtime updates - Swap pipelines while requests continue
- A/B testing - Switch between pipeline variants
- Feature flags - Enable/disable pipeline stages dynamically
- Configuration-driven behaviour - Operators modify pipelines without deploys
Basic Usage
Register Named Schemas
factory := flume.New[Order]()
// Register components
factory.Add(processors...)
factory.AddPredicate(predicates...)
// Register named schema
schema := flume.Schema{
Version: "1.0.0",
Node: flume.Node{
Type: "sequence",
Children: []flume.Node{
{Ref: "validate"},
{Ref: "process"},
},
},
}
err := factory.SetSchema("order-pipeline", schema)
Create a Binding
// Create a binding with auto-sync enabled
pipelineID := factory.Identity("order-processor", "Processes incoming orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())
if err != nil {
return fmt.Errorf("failed to bind: %w", err)
}
// Process requests (lock-free)
result, err := binding.Process(ctx, order)
Update at Runtime
newSchema := flume.Schema{
Version: "1.1.0",
Node: flume.Node{
Type: "sequence",
Children: []flume.Node{
{Ref: "validate"},
{Ref: "enrich"}, // Added step
{Ref: "process"},
},
},
}
err := factory.SetSchema("order-pipeline", newSchema)
// New requests immediately use updated pipeline
How It Works
Atomic Pointer Swap
// Internally, pipelines are stored as atomic pointers
pipelines map[string]*atomic.Pointer[pipz.Chainable[T]]
// Update atomically swaps the pointer
ptr.Store(&newPipeline)
// Retrieval loads atomically
return *ptr.Load(), true
Request Continuity
- Request A starts with pipeline v1
- Schema updates to v2
- Request A completes with v1 (unaffected)
- Request B starts with v2
No locks are held during processing - updates don't block requests.
File-Based Reloading
Watch for Changes
func watchSchemaFile(ctx context.Context, factory *flume.Factory[Order], path string) {
watcher, _ := fsnotify.NewWatcher()
defer watcher.Close()
watcher.Add(path)
for {
select {
case <-ctx.Done():
return
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
reloadSchema(factory, path)
}
}
}
}
func reloadSchema(factory *flume.Factory[Order], path string) {
pipeline, err := factory.BuildFromFile(path)
if err != nil {
log.Printf("failed to reload schema: %v", err)
return
}
// Read file to get schema for SetSchema
data, _ := os.ReadFile(path)
var schema flume.Schema
yaml.Unmarshal(data, &schema)
if err := factory.SetSchema("main", schema); err != nil {
log.Printf("failed to set schema: %v", err)
}
}
Multi-Schema Directory
func watchSchemaDir(ctx context.Context, factory *flume.Factory[Order], dir string) {
watcher, _ := fsnotify.NewWatcher()
watcher.Add(dir)
for event := range watcher.Events {
if event.Op&fsnotify.Write == 0 {
continue
}
name := strings.TrimSuffix(filepath.Base(event.Name), filepath.Ext(event.Name))
data, _ := os.ReadFile(event.Name)
var schema flume.Schema
if err := yaml.Unmarshal(data, &schema); err != nil {
log.Printf("invalid schema %s: %v", name, err)
continue
}
if err := factory.SetSchema(name, schema); err != nil {
log.Printf("failed to update %s: %v", name, err)
}
}
}
Remote Configuration
HTTP Endpoint
http.HandleFunc("/schemas/{name}", func(w http.ResponseWriter, r *http.Request) {
name := r.PathValue("name")
switch r.Method {
case "GET":
schema, ok := factory.GetSchema(name)
if !ok {
http.NotFound(w, r)
return
}
json.NewEncoder(w).Encode(schema)
case "PUT":
var schema flume.Schema
json.NewDecoder(r.Body).Decode(&schema)
if err := factory.SetSchema(name, schema); err != nil {
http.Error(w, err.Error(), 400)
return
}
w.WriteHeader(204)
case "DELETE":
if !factory.RemoveSchema(name) {
http.NotFound(w, r)
return
}
w.WriteHeader(204)
}
})
Polling
func pollSchemas(ctx context.Context, factory *flume.Factory[Order], url string) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
resp, err := http.Get(url)
if err != nil {
continue
}
var schemas map[string]flume.Schema
json.NewDecoder(resp.Body).Decode(&schemas)
resp.Body.Close()
for name, schema := range schemas {
existing, ok := factory.GetSchema(name)
if ok && existing.Version == schema.Version {
continue // No change
}
factory.SetSchema(name, schema)
}
}
}
}
Version Tracking
Compare Versions
existing, ok := factory.GetSchema("pipeline")
if ok && existing.Version == newSchema.Version {
return // Already up to date
}
err := factory.SetSchema("pipeline", newSchema)
Observability
Flume emits events on schema changes:
import "github.com/zoobz-io/capitan"
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
var name, oldVersion, newVersion string
for _, f := range fields {
switch f.Key {
case "name":
name = f.String()
case "old_version":
oldVersion = f.String()
case "new_version":
newVersion = f.String()
}
}
log.Printf("schema %s updated: %s -> %s", name, oldVersion, newVersion)
})
Graceful Updates
Validation First
Always validate before applying:
if err := factory.ValidateSchema(newSchema); err != nil {
return fmt.Errorf("invalid schema: %w", err)
}
// Schema is valid, safe to apply
factory.SetSchema(name, newSchema)
Rollback Pattern
func updateWithRollback(factory *flume.Factory[Order], name string, newSchema flume.Schema, binding *flume.Binding[Order]) error {
// Save current
oldSchema, hadOld := factory.GetSchema(name)
// Apply new (auto-sync bindings rebuild automatically)
if err := factory.SetSchema(name, newSchema); err != nil {
return err
}
// Test with canary request
_, err := binding.Process(context.Background(), canaryOrder)
if err != nil && hadOld {
// Rollback to previous schema
factory.SetSchema(name, oldSchema)
return fmt.Errorf("canary failed, rolled back: %w", err)
}
return nil
}
Best Practices
1. Use Semantic Versioning
version: "1.2.3" # MAJOR.MINOR.PATCH
2. Log All Changes
log.Printf("schema %s updated to version %s", name, schema.Version)
3. Validate Component Dependencies
Before removing a processor, check if schemas reference it:
for _, schemaName := range factory.ListSchemas() {
schema, _ := factory.GetSchema(schemaName)
if schemaReferencesProcessor(schema, "old-processor") {
return fmt.Errorf("cannot remove: schema %s uses it", schemaName)
}
}
4. Consider Circuit Breakers
Protect against bad schema updates:
type GuardedFactory[T pipz.Cloner[T]] struct {
factory *flume.Factory[T]
updateBreaker *circuitbreaker.CircuitBreaker
}
func (g *GuardedFactory[T]) SetSchema(name string, schema flume.Schema) error {
return g.updateBreaker.Execute(func() error {
return g.factory.SetSchema(name, schema)
})
}
Next Steps
- Testing - Test hot reload scenarios
- Observability - Monitor schema changes
- Events Reference - All schema events