Observability
Flume integrates with Capitan for comprehensive observability.
Overview
Flume emits structured events throughout its lifecycle:
- Factory creation and component registration
- Schema validation, building, and updates
- Pipeline retrieval and execution
- File loading and parsing
Event Categories
Factory Events
flume.FactoryCreated // Factory instantiated
flume.ProcessorRegistered // Processor added
flume.PredicateRegistered // Predicate added
flume.ConditionRegistered // Condition added
flume.ProcessorRemoved // Processor removed
flume.PredicateRemoved // Predicate removed
flume.ConditionRemoved // Condition removed
Schema Events
flume.SchemaValidationStarted // Validation began
flume.SchemaValidationCompleted // Validation successful
flume.SchemaValidationFailed // Validation failed
flume.SchemaBuildStarted // Build began
flume.SchemaBuildCompleted // Build successful
flume.SchemaBuildFailed // Build failed
Dynamic Schema Events
flume.SchemaRegistered // New schema added
flume.SchemaUpdated // Existing schema replaced
flume.SchemaRemoved // Schema deleted
flume.PipelineRetrieved // Pipeline accessed
File Events
flume.SchemaFileLoaded // File read successfully
flume.SchemaFileFailed // File read error
flume.SchemaYAMLParsed // YAML parsed
flume.SchemaJSONParsed // JSON parsed
flume.SchemaParseFailed // Parse error
Event Fields
Events include typed fields for structured data:
flume.KeyName // string: component/schema name
flume.KeyType // string: data type
flume.KeyVersion // string: schema version
flume.KeyOldVersion // string: previous version (updates)
flume.KeyNewVersion // string: new version (updates)
flume.KeyPath // string: file path
flume.KeyError // string: error message
flume.KeyDuration // time.Duration: operation duration
flume.KeyErrorCount // int: number of errors
flume.KeySizeBytes // int: file size
flume.KeyFound // bool: retrieval success
Setting Up Handlers
Basic Logging
import "github.com/zoobz-io/capitan"
func setupLogging() {
// Log all schema events
capitan.Handle(flume.SchemaRegistered, func(ctx context.Context, fields []capitan.Field) {
log.Printf("Schema registered: %s", extractString(fields, "name"))
})
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
log.Printf("Schema updated: %s (%s -> %s)",
extractString(fields, "name"),
extractString(fields, "old_version"),
extractString(fields, "new_version"))
})
capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
log.Printf("Build failed: %s", extractString(fields, "error"))
})
}
func extractString(fields []capitan.Field, key string) string {
for _, f := range fields {
if f.Key == key {
return f.String()
}
}
return ""
}
Metrics Collection
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/zoobz-io/capitan"
)
var (
schemaUpdates = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "flume_schema_updates_total",
Help: "Total schema updates",
},
[]string{"name"},
)
buildDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "flume_build_duration_seconds",
Help: "Schema build duration",
Buckets: prometheus.DefBuckets,
},
[]string{"version"},
)
buildFailures = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "flume_build_failures_total",
Help: "Total build failures",
},
)
)
func setupMetrics() {
prometheus.MustRegister(schemaUpdates, buildDuration, buildFailures)
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
schemaUpdates.WithLabelValues(name).Inc()
})
capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
version := extractString(fields, "version")
duration := extractDuration(fields, "duration")
buildDuration.WithLabelValues(version).Observe(duration.Seconds())
})
capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
buildFailures.Inc()
})
}
Distributed Tracing
import "go.opentelemetry.io/otel/trace"
func setupTracing(tracer trace.Tracer) {
capitan.Handle(flume.SchemaBuildStarted, func(ctx context.Context, fields []capitan.Field) {
_, span := tracer.Start(ctx, "flume.build")
version := extractString(fields, "version")
span.SetAttributes(attribute.String("schema.version", version))
// Store span for completion handler
})
capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
// End span
})
}
Common Monitoring Patterns
Schema Change Alerting
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
oldVersion := extractString(fields, "old_version")
newVersion := extractString(fields, "new_version")
if isMajorVersionChange(oldVersion, newVersion) {
alerting.SendAlert(alerting.Warning, fmt.Sprintf(
"Major schema version change: %s from %s to %s",
name, oldVersion, newVersion))
}
})
Build Performance Tracking
capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
duration := extractDuration(fields, "duration")
if duration > 100*time.Millisecond {
log.Printf("Slow schema build: %v", duration)
}
})
Validation Error Analysis
capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
errorCount := extractInt(fields, "error_count")
duration := extractDuration(fields, "duration")
metrics.RecordValidationFailure(errorCount, duration)
})
Debugging with Events
Development Logging
func setupDevLogging() {
// Log everything in development
signals := []capitan.Signal{
flume.FactoryCreated,
flume.ProcessorRegistered,
flume.SchemaValidationStarted,
flume.SchemaBuildStarted,
flume.SchemaBuildCompleted,
// ... all signals
}
for _, signal := range signals {
s := signal
capitan.Handle(s, func(ctx context.Context, fields []capitan.Field) {
log.Printf("[%s] %v", s.Name, formatFields(fields))
})
}
}
Pipeline Access Tracking
capitan.Handle(flume.PipelineRetrieved, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
found := extractBool(fields, "found")
if !found {
log.Printf("Pipeline not found: %s", name)
metrics.PipelineMiss(name)
} else {
metrics.PipelineHit(name)
}
})
Production Recommendations
1. Monitor Build Failures
Build failures indicate configuration issues:
capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
error := extractString(fields, "error")
alerting.Page(fmt.Sprintf("Flume build failed: %s", error))
})
2. Track Schema Versions
Maintain audit trail:
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
audit.Log(audit.Entry{
Action: "schema_update",
Name: extractString(fields, "name"),
OldVersion: extractString(fields, "old_version"),
NewVersion: extractString(fields, "new_version"),
Timestamp: time.Now(),
})
})
3. Set Up Dashboards
Key metrics to display:
- Schema update frequency
- Build success/failure rate
- Build duration percentiles
- Pipeline retrieval hit rate
- Validation error trends
4. Alert on Anomalies
- Sudden increase in build failures
- Unusually long build times
- High pipeline miss rate
- Validation errors after deployment
Next Steps
- Events Reference - Complete event list
- Capitan Documentation - Event system details