Events
Complete reference for Flume's observability events using Capitan.
Signal Categories
Factory Lifecycle
| Signal | Description |
|---|---|
FactoryCreated | Factory instantiated |
ProcessorRegistered | Processor added to factory |
PredicateRegistered | Predicate added to factory |
ConditionRegistered | Condition added to factory |
ProcessorRemoved | Processor removed from factory |
PredicateRemoved | Predicate removed from factory |
ConditionRemoved | Condition removed from factory |
Schema Validation
| Signal | Description |
|---|---|
SchemaValidationStarted | Validation began |
SchemaValidationCompleted | Validation successful |
SchemaValidationFailed | Validation found errors |
Schema Building
| Signal | Description |
|---|---|
SchemaBuildStarted | Build began |
SchemaBuildCompleted | Build successful |
SchemaBuildFailed | Build failed |
Dynamic Schema Management
| Signal | Description |
|---|---|
SchemaRegistered | New schema added via SetSchema |
SchemaUpdated | Existing schema replaced |
SchemaRemoved | Schema removed |
PipelineRetrieved | Pipeline accessed via Pipeline() |
File Operations
| Signal | Description |
|---|---|
SchemaFileLoaded | File read successfully |
SchemaFileFailed | File read failed |
SchemaYAMLParsed | YAML parsed successfully |
SchemaJSONParsed | JSON parsed successfully |
SchemaParseFailed | Parse failed |
Field Keys
String Fields
| Key | Type | Description |
|---|---|---|
KeyName | string | Component or schema name |
KeyType | string | Data type (e.g., *main.Order) |
KeyVersion | string | Schema version |
KeyOldVersion | string | Previous version (on update) |
KeyNewVersion | string | New version (on update) |
KeyPath | string | File path |
KeyError | string | Error message |
Duration Fields
| Key | Type | Description |
|---|---|---|
KeyDuration | time.Duration | Operation duration |
Integer Fields
| Key | Type | Description |
|---|---|---|
KeyErrorCount | int | Number of validation errors |
KeySizeBytes | int | File size in bytes |
Boolean Fields
| Key | Type | Description |
|---|---|---|
KeyFound | bool | Whether pipeline was found |
Event Details
FactoryCreated
Emitted when flume.New[T]() is called.
Fields:
KeyType: The type parameter T
Example:
capitan.Handle(flume.FactoryCreated, func(ctx context.Context, fields []capitan.Field) {
dataType := extractString(fields, "type")
log.Printf("Factory created for type: %s", dataType)
})
ProcessorRegistered
Emitted when a processor is added.
Fields:
KeyName: Processor name
SchemaValidationStarted
Emitted at the start of validation.
Fields: None
SchemaValidationCompleted
Emitted on successful validation.
Fields:
KeyDuration: Validation duration
SchemaValidationFailed
Emitted when validation finds errors.
Fields:
KeyErrorCount: Number of errorsKeyDuration: Validation duration
Example:
capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
count := extractInt(fields, "error_count")
duration := extractDuration(fields, "duration")
log.Printf("Validation failed with %d errors in %v", count, duration)
})
SchemaBuildStarted
Emitted at the start of building.
Fields:
KeyVersion: Schema version (if present)
SchemaBuildCompleted
Emitted on successful build.
Fields:
KeyDuration: Build durationKeyVersion: Schema version (if present)
SchemaBuildFailed
Emitted when build fails.
Fields:
KeyError: Error messageKeyDuration: Duration until failureKeyVersion: Schema version (if present)
SchemaRegistered
Emitted when a new schema is added.
Fields:
KeyName: Schema nameKeyVersion: Schema version (if present)
SchemaUpdated
Emitted when an existing schema is replaced.
Fields:
KeyName: Schema nameKeyOldVersion: Previous version (if present)KeyNewVersion: New version (if present)
Example:
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
oldV := extractString(fields, "old_version")
newV := extractString(fields, "new_version")
log.Printf("Schema %s: %s -> %s", name, oldV, newV)
})
PipelineRetrieved
Emitted when Pipeline() is called.
Fields:
KeyName: Schema name requestedKeyFound: Whether pipeline exists
Example:
capitan.Handle(flume.PipelineRetrieved, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
found := extractBool(fields, "found")
if !found {
log.Printf("Pipeline not found: %s", name)
}
})
SchemaFileLoaded
Emitted when a file is read.
Fields:
KeyPath: File pathKeySizeBytes: File size
SchemaParseFailed
Emitted when parsing fails.
Fields:
KeyPath: File path (if from file)KeyError: Parse error message
Handler Patterns
Complete Logging Handler
func setupLogging() {
signals := map[capitan.Signal]string{
flume.FactoryCreated: "Factory created",
flume.ProcessorRegistered: "Processor registered",
flume.SchemaValidationStarted: "Validation started",
flume.SchemaValidationCompleted: "Validation completed",
flume.SchemaValidationFailed: "Validation failed",
flume.SchemaBuildStarted: "Build started",
flume.SchemaBuildCompleted: "Build completed",
flume.SchemaBuildFailed: "Build failed",
flume.SchemaRegistered: "Schema registered",
flume.SchemaUpdated: "Schema updated",
flume.SchemaRemoved: "Schema removed",
flume.PipelineRetrieved: "Pipeline retrieved",
}
for signal, message := range signals {
s, m := signal, message
capitan.Handle(s, func(ctx context.Context, fields []capitan.Field) {
log.Printf("[FLUME] %s: %v", m, formatFields(fields))
})
}
}
func formatFields(fields []capitan.Field) string {
parts := make([]string, 0, len(fields))
for _, f := range fields {
parts = append(parts, fmt.Sprintf("%s=%v", f.Key, f.Value()))
}
return strings.Join(parts, ", ")
}
Metrics Handler
func setupMetrics() {
// Build latency histogram
capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
for _, f := range fields {
if f.Key == "duration" {
metrics.RecordBuildLatency(f.Duration())
}
}
})
// Error counter
capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
metrics.IncrBuildErrors()
})
capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
metrics.IncrValidationErrors()
})
}
Alerting Handler
func setupAlerting() {
capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
errMsg := extractString(fields, "error")
alerting.Send(alerting.Error, fmt.Sprintf("Flume build failed: %s", errMsg))
})
capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
name := extractString(fields, "name")
oldV := extractString(fields, "old_version")
newV := extractString(fields, "new_version")
// Alert on major version changes
if isMajorChange(oldV, newV) {
alerting.Send(alerting.Warning,
fmt.Sprintf("Major version change: %s %s->%s", name, oldV, newV))
}
})
}
Helper Functions
func extractString(fields []capitan.Field, key string) string {
for _, f := range fields {
if f.Key == key {
return f.String()
}
}
return ""
}
func extractInt(fields []capitan.Field, key string) int {
for _, f := range fields {
if f.Key == key {
return f.Int()
}
}
return 0
}
func extractBool(fields []capitan.Field, key string) bool {
for _, f := range fields {
if f.Key == key {
return f.Bool()
}
}
return false
}
func extractDuration(fields []capitan.Field, key string) time.Duration {
for _, f := range fields {
if f.Key == key {
return f.Duration()
}
}
return 0
}
Next Steps
- Observability Guide - Monitoring patterns
- Capitan Documentation - Event system