zoobzio December 11, 2025 Edit this page

Observability

Flume integrates with Capitan for comprehensive observability.

Overview

Flume emits structured events throughout its lifecycle:

  • Factory creation and component registration
  • Schema validation, building, and updates
  • Pipeline retrieval and execution
  • File loading and parsing

Event Categories

Factory Events

flume.FactoryCreated      // Factory instantiated
flume.ProcessorRegistered // Processor added
flume.PredicateRegistered // Predicate added
flume.ConditionRegistered // Condition added
flume.ProcessorRemoved    // Processor removed
flume.PredicateRemoved    // Predicate removed
flume.ConditionRemoved    // Condition removed

Schema Events

flume.SchemaValidationStarted   // Validation began
flume.SchemaValidationCompleted // Validation successful
flume.SchemaValidationFailed    // Validation failed
flume.SchemaBuildStarted        // Build began
flume.SchemaBuildCompleted      // Build successful
flume.SchemaBuildFailed         // Build failed

Dynamic Schema Events

flume.SchemaRegistered   // New schema added
flume.SchemaUpdated      // Existing schema replaced
flume.SchemaRemoved      // Schema deleted
flume.PipelineRetrieved  // Pipeline accessed

File Events

flume.SchemaFileLoaded  // File read successfully
flume.SchemaFileFailed  // File read error
flume.SchemaYAMLParsed  // YAML parsed
flume.SchemaJSONParsed  // JSON parsed
flume.SchemaParseFailed // Parse error

Event Fields

Events include typed fields for structured data:

flume.KeyName       // string: component/schema name
flume.KeyType       // string: data type
flume.KeyVersion    // string: schema version
flume.KeyOldVersion // string: previous version (updates)
flume.KeyNewVersion // string: new version (updates)
flume.KeyPath       // string: file path
flume.KeyError      // string: error message
flume.KeyDuration   // time.Duration: operation duration
flume.KeyErrorCount // int: number of errors
flume.KeySizeBytes  // int: file size
flume.KeyFound      // bool: retrieval success

Setting Up Handlers

Basic Logging

import "github.com/zoobz-io/capitan"

func setupLogging() {
    // Log all schema events
    capitan.Handle(flume.SchemaRegistered, func(ctx context.Context, fields []capitan.Field) {
        log.Printf("Schema registered: %s", extractString(fields, "name"))
    })

    capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
        log.Printf("Schema updated: %s (%s -> %s)",
            extractString(fields, "name"),
            extractString(fields, "old_version"),
            extractString(fields, "new_version"))
    })

    capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
        log.Printf("Build failed: %s", extractString(fields, "error"))
    })
}

func extractString(fields []capitan.Field, key string) string {
    for _, f := range fields {
        if f.Key == key {
            return f.String()
        }
    }
    return ""
}

Metrics Collection

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/zoobz-io/capitan"
)

var (
    schemaUpdates = prometheus.NewCounterVec(
        prometheus.CounterOpts{
            Name: "flume_schema_updates_total",
            Help: "Total schema updates",
        },
        []string{"name"},
    )

    buildDuration = prometheus.NewHistogramVec(
        prometheus.HistogramOpts{
            Name:    "flume_build_duration_seconds",
            Help:    "Schema build duration",
            Buckets: prometheus.DefBuckets,
        },
        []string{"version"},
    )

    buildFailures = prometheus.NewCounter(
        prometheus.CounterOpts{
            Name: "flume_build_failures_total",
            Help: "Total build failures",
        },
    )
)

func setupMetrics() {
    prometheus.MustRegister(schemaUpdates, buildDuration, buildFailures)

    capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
        name := extractString(fields, "name")
        schemaUpdates.WithLabelValues(name).Inc()
    })

    capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
        version := extractString(fields, "version")
        duration := extractDuration(fields, "duration")
        buildDuration.WithLabelValues(version).Observe(duration.Seconds())
    })

    capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
        buildFailures.Inc()
    })
}

Distributed Tracing

import "go.opentelemetry.io/otel/trace"

func setupTracing(tracer trace.Tracer) {
    capitan.Handle(flume.SchemaBuildStarted, func(ctx context.Context, fields []capitan.Field) {
        _, span := tracer.Start(ctx, "flume.build")
        version := extractString(fields, "version")
        span.SetAttributes(attribute.String("schema.version", version))
        // Store span for completion handler
    })

    capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
        // End span
    })
}

Common Monitoring Patterns

Schema Change Alerting

capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
    name := extractString(fields, "name")
    oldVersion := extractString(fields, "old_version")
    newVersion := extractString(fields, "new_version")

    if isMajorVersionChange(oldVersion, newVersion) {
        alerting.SendAlert(alerting.Warning, fmt.Sprintf(
            "Major schema version change: %s from %s to %s",
            name, oldVersion, newVersion))
    }
})

Build Performance Tracking

capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
    duration := extractDuration(fields, "duration")

    if duration > 100*time.Millisecond {
        log.Printf("Slow schema build: %v", duration)
    }
})

Validation Error Analysis

capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
    errorCount := extractInt(fields, "error_count")
    duration := extractDuration(fields, "duration")

    metrics.RecordValidationFailure(errorCount, duration)
})

Debugging with Events

Development Logging

func setupDevLogging() {
    // Log everything in development
    signals := []capitan.Signal{
        flume.FactoryCreated,
        flume.ProcessorRegistered,
        flume.SchemaValidationStarted,
        flume.SchemaBuildStarted,
        flume.SchemaBuildCompleted,
        // ... all signals
    }

    for _, signal := range signals {
        s := signal
        capitan.Handle(s, func(ctx context.Context, fields []capitan.Field) {
            log.Printf("[%s] %v", s.Name, formatFields(fields))
        })
    }
}

Pipeline Access Tracking

capitan.Handle(flume.PipelineRetrieved, func(ctx context.Context, fields []capitan.Field) {
    name := extractString(fields, "name")
    found := extractBool(fields, "found")

    if !found {
        log.Printf("Pipeline not found: %s", name)
        metrics.PipelineMiss(name)
    } else {
        metrics.PipelineHit(name)
    }
})

Production Recommendations

1. Monitor Build Failures

Build failures indicate configuration issues:

capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
    error := extractString(fields, "error")
    alerting.Page(fmt.Sprintf("Flume build failed: %s", error))
})

2. Track Schema Versions

Maintain audit trail:

capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
    audit.Log(audit.Entry{
        Action:     "schema_update",
        Name:       extractString(fields, "name"),
        OldVersion: extractString(fields, "old_version"),
        NewVersion: extractString(fields, "new_version"),
        Timestamp:  time.Now(),
    })
})

3. Set Up Dashboards

Key metrics to display:

  • Schema update frequency
  • Build success/failure rate
  • Build duration percentiles
  • Pipeline retrieval hit rate
  • Validation error trends

4. Alert on Anomalies

  • Sudden increase in build failures
  • Unusually long build times
  • High pipeline miss rate
  • Validation errors after deployment

Next Steps