zoobzio December 11, 2025 Edit this page

API Reference

Complete reference for Flume's public API.

Factory

New

Creates a new factory for type T.

func New[T pipz.Cloner[T]]() *Factory[T]

Type Constraint: T must implement pipz.Cloner[T].

Example:

factory := flume.New[Order]()

Processor Registration

Add

Registers one or more processors using their intrinsic names.

func (f *Factory[T]) Add(processors ...pipz.Chainable[T])

Parameters:

  • processors - Variadic list of pipz chainables

Example:

factory.Add(
    pipz.Apply("validate", validateOrder),
    pipz.Transform("enrich", enrichOrder),
)

AddWithMeta

Registers processors with metadata for introspection.

func (f *Factory[T]) AddWithMeta(processors ...ProcessorMeta[T])

Parameters:

  • processors - Variadic list of ProcessorMeta structs

Example:

factory.AddWithMeta(flume.ProcessorMeta[Order]{
    Processor:   pipz.Apply("validate", validateOrder),
    Description: "Validates order fields",
    Tags:        []string{"validation", "required"},
})

Remove

Removes processors by name.

func (f *Factory[T]) Remove(names ...pipz.Name) int

Parameters:

  • names - Variadic list of processor names

Returns: Number of processors removed

Example:

removed := factory.Remove("old-processor", "deprecated")

HasProcessor

Checks if a processor is registered.

func (f *Factory[T]) HasProcessor(name pipz.Name) bool

ListProcessors

Returns all registered processor names.

func (f *Factory[T]) ListProcessors() []pipz.Name

Predicate Registration

AddPredicate

Registers predicates for filter conditions.

func (f *Factory[T]) AddPredicate(predicates ...Predicate[T])

Example:

factory.AddPredicate(flume.Predicate[Order]{
    Name:        "is-premium",
    Description: "Customer has premium tier",
    Predicate: func(ctx context.Context, o Order) bool {
        return o.CustomerTier == "premium"
    },
})

RemovePredicate

Removes predicates by name.

func (f *Factory[T]) RemovePredicate(names ...pipz.Name) int

HasPredicate

Checks if a predicate is registered.

func (f *Factory[T]) HasPredicate(name pipz.Name) bool

ListPredicates

Returns all registered predicate names.

func (f *Factory[T]) ListPredicates() []pipz.Name

Condition Registration

AddCondition

Registers conditions for switch routing.

func (f *Factory[T]) AddCondition(conditions ...Condition[T])

Example:

factory.AddCondition(flume.Condition[Order]{
    Name:        "order-status",
    Description: "Returns the order status",
    Values:      []string{"pending", "approved", "rejected"},
    Condition: func(ctx context.Context, o Order) string {
        return o.Status
    },
})

RemoveCondition

Removes conditions by name.

func (f *Factory[T]) RemoveCondition(names ...pipz.Name) int

HasCondition

Checks if a condition is registered.

func (f *Factory[T]) HasCondition(name pipz.Name) bool

ListConditions

Returns all registered condition names.

func (f *Factory[T]) ListConditions() []pipz.Name

Channel Registration

AddChannel

Registers a channel for stream nodes.

func (f *Factory[T]) AddChannel(name string, channel chan<- T)

Example:

ch := make(chan Order, 100)
factory.AddChannel("orders", ch)

GetChannel

Retrieves a registered channel.

func (f *Factory[T]) GetChannel(name string) (chan<- T, bool)

HasChannel

Checks if a channel is registered.

func (f *Factory[T]) HasChannel(name string) bool

ListChannels

Returns all registered channel names.

func (f *Factory[T]) ListChannels() []string

RemoveChannel

Removes a channel from the factory.

func (f *Factory[T]) RemoveChannel(name string) bool

Building Pipelines

Build

Builds a pipeline from a Schema struct.

func (f *Factory[T]) Build(schema Schema) (pipz.Chainable[T], error)

Example:

schema := flume.Schema{
    Version: "1.0.0",
    Node: flume.Node{
        Type: "sequence",
        Children: []flume.Node{
            {Ref: "validate"},
            {Ref: "process"},
        },
    },
}
pipeline, err := factory.Build(schema)

BuildFromYAML

Builds a pipeline from a YAML string.

func (f *Factory[T]) BuildFromYAML(yamlStr string) (pipz.Chainable[T], error)

Example:

schema := `
type: sequence
children:
  - ref: validate
  - ref: process
`
pipeline, err := factory.BuildFromYAML(schema)

BuildFromJSON

Builds a pipeline from a JSON string.

func (f *Factory[T]) BuildFromJSON(jsonStr string) (pipz.Chainable[T], error)

BuildFromFile

Builds a pipeline from a file (YAML or JSON).

func (f *Factory[T]) BuildFromFile(path string) (pipz.Chainable[T], error)

Supported Extensions: .yaml, .yml, .json


Schema Management

SetSchema

Adds or updates a named schema.

func (f *Factory[T]) SetSchema(name string, schema Schema) error

Validates the schema, builds the pipeline, and stores both. If the schema exists, atomically updates it.

Example:

err := factory.SetSchema("order-pipeline", schema)

GetSchema

Retrieves a schema by name.

func (f *Factory[T]) GetSchema(name string) (Schema, bool)

RemoveSchema

Removes a named schema.

func (f *Factory[T]) RemoveSchema(name string) bool

Returns: true if removed, false if not found


ListSchemas

Returns all registered schema names.

func (f *Factory[T]) ListSchemas() []string

Bind

Creates or retrieves a binding for a schema.

func (f *Factory[T]) Bind(identity pipz.Identity, schemaID string, opts ...BindingOption[T]) (*Binding[T], error)

Parameters:

  • identity - Unique identifier for this binding
  • schemaID - ID of schema in registry (must exist)
  • opts - Optional configuration (e.g., WithAutoSync())

Returns: *Binding[T] on success, error if schema not found

Example:

pipelineID := factory.Identity("order-processor", "Processes orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())
if err != nil {
    return err
}
result, err := binding.Process(ctx, order)

Get

Retrieves an existing binding by identity.

func (f *Factory[T]) Get(identity pipz.Identity) *Binding[T]

Returns: *Binding[T] if found, nil otherwise


Binding

A Binding[T] represents a live pipeline bound to a schema. It provides lock-free execution via atomic.Pointer.

Process

Executes the pipeline with the given data.

func (b *Binding[T]) Process(ctx context.Context, data T) (T, error)

Lock-free operation using atomic pointer access.

WithAutoSync

Option to enable automatic rebuilding when the source schema changes.

flume.WithAutoSync[T]()

When enabled, calls to factory.SetSchema() automatically rebuild this binding.

Binding Accessors

binding.Identity()  // Returns pipz.Identity
binding.SchemaID()  // Returns string (schema ID)
binding.AutoSync()  // Returns bool
binding.Pipeline()  // Returns *pipz.Pipeline[T]

Validation

ValidateSchema

Validates a schema against registered components without building it.

func (f *Factory[T]) ValidateSchema(schema Schema) error

Returns: nil if valid, ValidationErrors otherwise

Checks:

  • Schema structure (node types, required fields)
  • All processor references exist
  • All predicate references exist
  • All condition references exist
  • All reducer references exist
  • All error handler references exist
  • All channel references exist
  • Circular reference detection

Example:

if err := factory.ValidateSchema(schema); err != nil {
    fmt.Println(err)
    // 3 validation errors:
    //   1. root.children[0]: processor 'missing' not found
    //   ...
}

ValidateSchemaStructure

Validates schema syntax without requiring registered components. Use this for CI/CD schema linting where a configured factory is not available.

func ValidateSchemaStructure(schema Schema) error

Returns: nil if valid, ValidationErrors otherwise

This validates:

  • Node structure (ref vs type exclusivity, non-empty nodes)
  • Connector constraints (required children, child counts)
  • Configuration values (valid durations, positive numbers)
  • Unknown node types

This does NOT validate:

  • Processor references exist
  • Predicate references exist
  • Condition references exist
  • Reducer references exist
  • Error handler references exist
  • Channel references exist

Example:

// CI/CD pipeline - validate schema files without a configured factory
schema, _ := yaml.Unmarshal(data, &flume.Schema{})
if err := flume.ValidateSchemaStructure(schema); err != nil {
    log.Fatalf("Invalid schema: %v", err)
}

Types

Schema

type Schema struct {
    Version string `json:"version,omitempty" yaml:"version,omitempty"`
    Node    `yaml:",inline"`
}

Node

type Node struct {
    Ref               string          // Processor reference
    Type              string          // Connector type
    Name              string          // Optional name override
    Children          []Node          // For sequence, concurrent, etc.
    Child             *Node           // For retry, timeout, etc.
    Predicate         string          // For filter
    Then              *Node           // For filter
    Else              *Node           // For filter
    Condition         string          // For switch
    Routes            map[string]Node // For switch
    Default           *Node           // For switch
    Attempts          int             // For retry
    Backoff           string          // For retry
    Duration          string          // For timeout
    FailureThreshold  int             // For circuit-breaker
    RecoveryTimeout   string          // For circuit-breaker
    RequestsPerSecond float64         // For rate-limit
    BurstSize         int             // For rate-limit
    Stream            string          // For stream nodes
}

Predicate

type Predicate[T any] struct {
    Name        pipz.Name
    Description string
    Predicate   func(context.Context, T) bool
}

Condition

type Condition[T any] struct {
    Name        pipz.Name
    Description string
    Values      []string  // Possible return values
    Condition   func(context.Context, T) string
}

ProcessorMeta

type ProcessorMeta[T any] struct {
    Processor   pipz.Chainable[T]
    Description string
    Tags        []string
}

Reducer

type Reducer[T any] struct {
    Name        pipz.Name
    Description string
    Reducer     func(original T, results map[pipz.Name]T, errors map[pipz.Name]error) T
}

Used with concurrent connector to merge results from parallel execution.

Example:

factory.AddReducer(flume.Reducer[Order]{
    Name:        "merge-enrichments",
    Description: "Combines data from parallel enrichment sources",
    Reducer: func(original Order, results map[pipz.Name]Order, errors map[pipz.Name]error) Order {
        for _, result := range results {
            original.Metadata = append(original.Metadata, result.Metadata...)
        }
        return original
    },
})

ErrorHandler

type ErrorHandler[T any] struct {
    Name        pipz.Name
    Description string
    Handler     pipz.Chainable[*pipz.Error[T]]
}

Used with handle connector to process errors from child execution. The handler receives a *pipz.Error[T] containing both the original data and the error.

Error Recovery Pattern:

Clear the error to indicate successful recovery:

factory.AddErrorHandler(flume.ErrorHandler[Order]{
    Name:        "order-recovery",
    Description: "Recovers failed orders by marking them for retry",
    Handler: pipz.Apply("recover", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
        e.Data.Status = "pending-retry"
        e.Err = nil  // Clear error - pipeline continues normally
        return e, nil
    }),
})

Error Transformation Pattern:

Transform the error while preserving failure state:

factory.AddErrorHandler(flume.ErrorHandler[Order]{
    Name:        "error-enricher",
    Description: "Enriches errors with context before propagating",
    Handler: pipz.Apply("enrich-error", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
        e.Err = fmt.Errorf("order %s: %w", e.Data.ID, e.Err)
        return e, nil  // Error still propagates
    }),
})

Conditional Recovery Pattern:

Recover from specific errors only:

factory.AddErrorHandler(flume.ErrorHandler[Order]{
    Name:        "transient-recovery",
    Description: "Recovers from transient errors, propagates others",
    Handler: pipz.Apply("conditional-recover", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
        if errors.Is(e.Err, ErrTemporaryFailure) {
            e.Data.Status = "retrying"
            e.Err = nil  // Recover
        }
        // Non-transient errors propagate unchanged
        return e, nil
    }),
})

Schema Usage:

type: handle
error_handler: order-recovery
child:
  ref: risky-operation

ValidationError

type ValidationError struct {
    Path    []string  // Path to error in schema
    Message string
}

func (e ValidationError) Error() string

ValidationErrors

type ValidationErrors []ValidationError

func (e ValidationErrors) Error() string

Constants

Default Values

const (
    DefaultRetryAttempts           = 3
    DefaultTimeoutDuration         = 30 * time.Second
    DefaultCircuitBreakerThreshold = 5
    DefaultRecoveryTimeout         = 60 * time.Second
    DefaultRequestsPerSecond       = 10.0
    DefaultBurstSize               = 1
)

Next Steps