zoobzio December 11, 2025 Edit this page

Events

Complete reference for Flume's observability events using Capitan.

Signal Categories

Factory Lifecycle

SignalDescription
FactoryCreatedFactory instantiated
ProcessorRegisteredProcessor added to factory
PredicateRegisteredPredicate added to factory
ConditionRegisteredCondition added to factory
ProcessorRemovedProcessor removed from factory
PredicateRemovedPredicate removed from factory
ConditionRemovedCondition removed from factory

Schema Validation

SignalDescription
SchemaValidationStartedValidation began
SchemaValidationCompletedValidation successful
SchemaValidationFailedValidation found errors

Schema Building

SignalDescription
SchemaBuildStartedBuild began
SchemaBuildCompletedBuild successful
SchemaBuildFailedBuild failed

Dynamic Schema Management

SignalDescription
SchemaRegisteredNew schema added via SetSchema
SchemaUpdatedExisting schema replaced
SchemaRemovedSchema removed
PipelineRetrievedPipeline accessed via Pipeline()

File Operations

SignalDescription
SchemaFileLoadedFile read successfully
SchemaFileFailedFile read failed
SchemaYAMLParsedYAML parsed successfully
SchemaJSONParsedJSON parsed successfully
SchemaParseFailedParse failed

Field Keys

String Fields

KeyTypeDescription
KeyNamestringComponent or schema name
KeyTypestringData type (e.g., *main.Order)
KeyVersionstringSchema version
KeyOldVersionstringPrevious version (on update)
KeyNewVersionstringNew version (on update)
KeyPathstringFile path
KeyErrorstringError message

Duration Fields

KeyTypeDescription
KeyDurationtime.DurationOperation duration

Integer Fields

KeyTypeDescription
KeyErrorCountintNumber of validation errors
KeySizeBytesintFile size in bytes

Boolean Fields

KeyTypeDescription
KeyFoundboolWhether pipeline was found

Event Details

FactoryCreated

Emitted when flume.New[T]() is called.

Fields:

  • KeyType: The type parameter T

Example:

capitan.Handle(flume.FactoryCreated, func(ctx context.Context, fields []capitan.Field) {
    dataType := extractString(fields, "type")
    log.Printf("Factory created for type: %s", dataType)
})

ProcessorRegistered

Emitted when a processor is added.

Fields:

  • KeyName: Processor name

SchemaValidationStarted

Emitted at the start of validation.

Fields: None


SchemaValidationCompleted

Emitted on successful validation.

Fields:

  • KeyDuration: Validation duration

SchemaValidationFailed

Emitted when validation finds errors.

Fields:

  • KeyErrorCount: Number of errors
  • KeyDuration: Validation duration

Example:

capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
    count := extractInt(fields, "error_count")
    duration := extractDuration(fields, "duration")
    log.Printf("Validation failed with %d errors in %v", count, duration)
})

SchemaBuildStarted

Emitted at the start of building.

Fields:

  • KeyVersion: Schema version (if present)

SchemaBuildCompleted

Emitted on successful build.

Fields:

  • KeyDuration: Build duration
  • KeyVersion: Schema version (if present)

SchemaBuildFailed

Emitted when build fails.

Fields:

  • KeyError: Error message
  • KeyDuration: Duration until failure
  • KeyVersion: Schema version (if present)

SchemaRegistered

Emitted when a new schema is added.

Fields:

  • KeyName: Schema name
  • KeyVersion: Schema version (if present)

SchemaUpdated

Emitted when an existing schema is replaced.

Fields:

  • KeyName: Schema name
  • KeyOldVersion: Previous version (if present)
  • KeyNewVersion: New version (if present)

Example:

capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
    name := extractString(fields, "name")
    oldV := extractString(fields, "old_version")
    newV := extractString(fields, "new_version")
    log.Printf("Schema %s: %s -> %s", name, oldV, newV)
})

PipelineRetrieved

Emitted when Pipeline() is called.

Fields:

  • KeyName: Schema name requested
  • KeyFound: Whether pipeline exists

Example:

capitan.Handle(flume.PipelineRetrieved, func(ctx context.Context, fields []capitan.Field) {
    name := extractString(fields, "name")
    found := extractBool(fields, "found")
    if !found {
        log.Printf("Pipeline not found: %s", name)
    }
})

SchemaFileLoaded

Emitted when a file is read.

Fields:

  • KeyPath: File path
  • KeySizeBytes: File size

SchemaParseFailed

Emitted when parsing fails.

Fields:

  • KeyPath: File path (if from file)
  • KeyError: Parse error message

Handler Patterns

Complete Logging Handler

func setupLogging() {
    signals := map[capitan.Signal]string{
        flume.FactoryCreated:            "Factory created",
        flume.ProcessorRegistered:       "Processor registered",
        flume.SchemaValidationStarted:   "Validation started",
        flume.SchemaValidationCompleted: "Validation completed",
        flume.SchemaValidationFailed:    "Validation failed",
        flume.SchemaBuildStarted:        "Build started",
        flume.SchemaBuildCompleted:      "Build completed",
        flume.SchemaBuildFailed:         "Build failed",
        flume.SchemaRegistered:          "Schema registered",
        flume.SchemaUpdated:             "Schema updated",
        flume.SchemaRemoved:             "Schema removed",
        flume.PipelineRetrieved:         "Pipeline retrieved",
    }

    for signal, message := range signals {
        s, m := signal, message
        capitan.Handle(s, func(ctx context.Context, fields []capitan.Field) {
            log.Printf("[FLUME] %s: %v", m, formatFields(fields))
        })
    }
}

func formatFields(fields []capitan.Field) string {
    parts := make([]string, 0, len(fields))
    for _, f := range fields {
        parts = append(parts, fmt.Sprintf("%s=%v", f.Key, f.Value()))
    }
    return strings.Join(parts, ", ")
}

Metrics Handler

func setupMetrics() {
    // Build latency histogram
    capitan.Handle(flume.SchemaBuildCompleted, func(ctx context.Context, fields []capitan.Field) {
        for _, f := range fields {
            if f.Key == "duration" {
                metrics.RecordBuildLatency(f.Duration())
            }
        }
    })

    // Error counter
    capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
        metrics.IncrBuildErrors()
    })

    capitan.Handle(flume.SchemaValidationFailed, func(ctx context.Context, fields []capitan.Field) {
        metrics.IncrValidationErrors()
    })
}

Alerting Handler

func setupAlerting() {
    capitan.Handle(flume.SchemaBuildFailed, func(ctx context.Context, fields []capitan.Field) {
        errMsg := extractString(fields, "error")
        alerting.Send(alerting.Error, fmt.Sprintf("Flume build failed: %s", errMsg))
    })

    capitan.Handle(flume.SchemaUpdated, func(ctx context.Context, fields []capitan.Field) {
        name := extractString(fields, "name")
        oldV := extractString(fields, "old_version")
        newV := extractString(fields, "new_version")

        // Alert on major version changes
        if isMajorChange(oldV, newV) {
            alerting.Send(alerting.Warning,
                fmt.Sprintf("Major version change: %s %s->%s", name, oldV, newV))
        }
    })
}

Helper Functions

func extractString(fields []capitan.Field, key string) string {
    for _, f := range fields {
        if f.Key == key {
            return f.String()
        }
    }
    return ""
}

func extractInt(fields []capitan.Field, key string) int {
    for _, f := range fields {
        if f.Key == key {
            return f.Int()
        }
    }
    return 0
}

func extractBool(fields []capitan.Field, key string) bool {
    for _, f := range fields {
        if f.Key == key {
            return f.Bool()
        }
    }
    return false
}

func extractDuration(fields []capitan.Field, key string) time.Duration {
    for _, f := range fields {
        if f.Key == key {
            return f.Duration()
        }
    }
    return 0
}

Next Steps