API Reference
Complete reference for Flume's public API.
Factory
New
Creates a new factory for type T.
func New[T pipz.Cloner[T]]() *Factory[T]
Type Constraint: T must implement pipz.Cloner[T].
Example:
factory := flume.New[Order]()
Processor Registration
Add
Registers one or more processors using their intrinsic names.
func (f *Factory[T]) Add(processors ...pipz.Chainable[T])
Parameters:
processors- Variadic list of pipz chainables
Example:
factory.Add(
pipz.Apply("validate", validateOrder),
pipz.Transform("enrich", enrichOrder),
)
AddWithMeta
Registers processors with metadata for introspection.
func (f *Factory[T]) AddWithMeta(processors ...ProcessorMeta[T])
Parameters:
processors- Variadic list of ProcessorMeta structs
Example:
factory.AddWithMeta(flume.ProcessorMeta[Order]{
Processor: pipz.Apply("validate", validateOrder),
Description: "Validates order fields",
Tags: []string{"validation", "required"},
})
Remove
Removes processors by name.
func (f *Factory[T]) Remove(names ...pipz.Name) int
Parameters:
names- Variadic list of processor names
Returns: Number of processors removed
Example:
removed := factory.Remove("old-processor", "deprecated")
HasProcessor
Checks if a processor is registered.
func (f *Factory[T]) HasProcessor(name pipz.Name) bool
ListProcessors
Returns all registered processor names.
func (f *Factory[T]) ListProcessors() []pipz.Name
Predicate Registration
AddPredicate
Registers predicates for filter conditions.
func (f *Factory[T]) AddPredicate(predicates ...Predicate[T])
Example:
factory.AddPredicate(flume.Predicate[Order]{
Name: "is-premium",
Description: "Customer has premium tier",
Predicate: func(ctx context.Context, o Order) bool {
return o.CustomerTier == "premium"
},
})
RemovePredicate
Removes predicates by name.
func (f *Factory[T]) RemovePredicate(names ...pipz.Name) int
HasPredicate
Checks if a predicate is registered.
func (f *Factory[T]) HasPredicate(name pipz.Name) bool
ListPredicates
Returns all registered predicate names.
func (f *Factory[T]) ListPredicates() []pipz.Name
Condition Registration
AddCondition
Registers conditions for switch routing.
func (f *Factory[T]) AddCondition(conditions ...Condition[T])
Example:
factory.AddCondition(flume.Condition[Order]{
Name: "order-status",
Description: "Returns the order status",
Values: []string{"pending", "approved", "rejected"},
Condition: func(ctx context.Context, o Order) string {
return o.Status
},
})
RemoveCondition
Removes conditions by name.
func (f *Factory[T]) RemoveCondition(names ...pipz.Name) int
HasCondition
Checks if a condition is registered.
func (f *Factory[T]) HasCondition(name pipz.Name) bool
ListConditions
Returns all registered condition names.
func (f *Factory[T]) ListConditions() []pipz.Name
Channel Registration
AddChannel
Registers a channel for stream nodes.
func (f *Factory[T]) AddChannel(name string, channel chan<- T)
Example:
ch := make(chan Order, 100)
factory.AddChannel("orders", ch)
GetChannel
Retrieves a registered channel.
func (f *Factory[T]) GetChannel(name string) (chan<- T, bool)
HasChannel
Checks if a channel is registered.
func (f *Factory[T]) HasChannel(name string) bool
ListChannels
Returns all registered channel names.
func (f *Factory[T]) ListChannels() []string
RemoveChannel
Removes a channel from the factory.
func (f *Factory[T]) RemoveChannel(name string) bool
Building Pipelines
Build
Builds a pipeline from a Schema struct.
func (f *Factory[T]) Build(schema Schema) (pipz.Chainable[T], error)
Example:
schema := flume.Schema{
Version: "1.0.0",
Node: flume.Node{
Type: "sequence",
Children: []flume.Node{
{Ref: "validate"},
{Ref: "process"},
},
},
}
pipeline, err := factory.Build(schema)
BuildFromYAML
Builds a pipeline from a YAML string.
func (f *Factory[T]) BuildFromYAML(yamlStr string) (pipz.Chainable[T], error)
Example:
schema := `
type: sequence
children:
- ref: validate
- ref: process
`
pipeline, err := factory.BuildFromYAML(schema)
BuildFromJSON
Builds a pipeline from a JSON string.
func (f *Factory[T]) BuildFromJSON(jsonStr string) (pipz.Chainable[T], error)
BuildFromFile
Builds a pipeline from a file (YAML or JSON).
func (f *Factory[T]) BuildFromFile(path string) (pipz.Chainable[T], error)
Supported Extensions: .yaml, .yml, .json
Schema Management
SetSchema
Adds or updates a named schema.
func (f *Factory[T]) SetSchema(name string, schema Schema) error
Validates the schema, builds the pipeline, and stores both. If the schema exists, atomically updates it.
Example:
err := factory.SetSchema("order-pipeline", schema)
GetSchema
Retrieves a schema by name.
func (f *Factory[T]) GetSchema(name string) (Schema, bool)
RemoveSchema
Removes a named schema.
func (f *Factory[T]) RemoveSchema(name string) bool
Returns: true if removed, false if not found
ListSchemas
Returns all registered schema names.
func (f *Factory[T]) ListSchemas() []string
Bind
Creates or retrieves a binding for a schema.
func (f *Factory[T]) Bind(identity pipz.Identity, schemaID string, opts ...BindingOption[T]) (*Binding[T], error)
Parameters:
identity- Unique identifier for this bindingschemaID- ID of schema in registry (must exist)opts- Optional configuration (e.g.,WithAutoSync())
Returns: *Binding[T] on success, error if schema not found
Example:
pipelineID := factory.Identity("order-processor", "Processes orders")
binding, err := factory.Bind(pipelineID, "order-pipeline", flume.WithAutoSync())
if err != nil {
return err
}
result, err := binding.Process(ctx, order)
Get
Retrieves an existing binding by identity.
func (f *Factory[T]) Get(identity pipz.Identity) *Binding[T]
Returns: *Binding[T] if found, nil otherwise
Binding
A Binding[T] represents a live pipeline bound to a schema. It provides lock-free execution via atomic.Pointer.
Process
Executes the pipeline with the given data.
func (b *Binding[T]) Process(ctx context.Context, data T) (T, error)
Lock-free operation using atomic pointer access.
WithAutoSync
Option to enable automatic rebuilding when the source schema changes.
flume.WithAutoSync[T]()
When enabled, calls to factory.SetSchema() automatically rebuild this binding.
Binding Accessors
binding.Identity() // Returns pipz.Identity
binding.SchemaID() // Returns string (schema ID)
binding.AutoSync() // Returns bool
binding.Pipeline() // Returns *pipz.Pipeline[T]
Validation
ValidateSchema
Validates a schema against registered components without building it.
func (f *Factory[T]) ValidateSchema(schema Schema) error
Returns: nil if valid, ValidationErrors otherwise
Checks:
- Schema structure (node types, required fields)
- All processor references exist
- All predicate references exist
- All condition references exist
- All reducer references exist
- All error handler references exist
- All channel references exist
- Circular reference detection
Example:
if err := factory.ValidateSchema(schema); err != nil {
fmt.Println(err)
// 3 validation errors:
// 1. root.children[0]: processor 'missing' not found
// ...
}
ValidateSchemaStructure
Validates schema syntax without requiring registered components. Use this for CI/CD schema linting where a configured factory is not available.
func ValidateSchemaStructure(schema Schema) error
Returns: nil if valid, ValidationErrors otherwise
This validates:
- Node structure (ref vs type exclusivity, non-empty nodes)
- Connector constraints (required children, child counts)
- Configuration values (valid durations, positive numbers)
- Unknown node types
This does NOT validate:
- Processor references exist
- Predicate references exist
- Condition references exist
- Reducer references exist
- Error handler references exist
- Channel references exist
Example:
// CI/CD pipeline - validate schema files without a configured factory
schema, _ := yaml.Unmarshal(data, &flume.Schema{})
if err := flume.ValidateSchemaStructure(schema); err != nil {
log.Fatalf("Invalid schema: %v", err)
}
Types
Schema
type Schema struct {
Version string `json:"version,omitempty" yaml:"version,omitempty"`
Node `yaml:",inline"`
}
Node
type Node struct {
Ref string // Processor reference
Type string // Connector type
Name string // Optional name override
Children []Node // For sequence, concurrent, etc.
Child *Node // For retry, timeout, etc.
Predicate string // For filter
Then *Node // For filter
Else *Node // For filter
Condition string // For switch
Routes map[string]Node // For switch
Default *Node // For switch
Attempts int // For retry
Backoff string // For retry
Duration string // For timeout
FailureThreshold int // For circuit-breaker
RecoveryTimeout string // For circuit-breaker
RequestsPerSecond float64 // For rate-limit
BurstSize int // For rate-limit
Stream string // For stream nodes
}
Predicate
type Predicate[T any] struct {
Name pipz.Name
Description string
Predicate func(context.Context, T) bool
}
Condition
type Condition[T any] struct {
Name pipz.Name
Description string
Values []string // Possible return values
Condition func(context.Context, T) string
}
ProcessorMeta
type ProcessorMeta[T any] struct {
Processor pipz.Chainable[T]
Description string
Tags []string
}
Reducer
type Reducer[T any] struct {
Name pipz.Name
Description string
Reducer func(original T, results map[pipz.Name]T, errors map[pipz.Name]error) T
}
Used with concurrent connector to merge results from parallel execution.
Example:
factory.AddReducer(flume.Reducer[Order]{
Name: "merge-enrichments",
Description: "Combines data from parallel enrichment sources",
Reducer: func(original Order, results map[pipz.Name]Order, errors map[pipz.Name]error) Order {
for _, result := range results {
original.Metadata = append(original.Metadata, result.Metadata...)
}
return original
},
})
ErrorHandler
type ErrorHandler[T any] struct {
Name pipz.Name
Description string
Handler pipz.Chainable[*pipz.Error[T]]
}
Used with handle connector to process errors from child execution. The handler receives a *pipz.Error[T] containing both the original data and the error.
Error Recovery Pattern:
Clear the error to indicate successful recovery:
factory.AddErrorHandler(flume.ErrorHandler[Order]{
Name: "order-recovery",
Description: "Recovers failed orders by marking them for retry",
Handler: pipz.Apply("recover", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
e.Data.Status = "pending-retry"
e.Err = nil // Clear error - pipeline continues normally
return e, nil
}),
})
Error Transformation Pattern:
Transform the error while preserving failure state:
factory.AddErrorHandler(flume.ErrorHandler[Order]{
Name: "error-enricher",
Description: "Enriches errors with context before propagating",
Handler: pipz.Apply("enrich-error", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
e.Err = fmt.Errorf("order %s: %w", e.Data.ID, e.Err)
return e, nil // Error still propagates
}),
})
Conditional Recovery Pattern:
Recover from specific errors only:
factory.AddErrorHandler(flume.ErrorHandler[Order]{
Name: "transient-recovery",
Description: "Recovers from transient errors, propagates others",
Handler: pipz.Apply("conditional-recover", func(ctx context.Context, e *pipz.Error[Order]) (*pipz.Error[Order], error) {
if errors.Is(e.Err, ErrTemporaryFailure) {
e.Data.Status = "retrying"
e.Err = nil // Recover
}
// Non-transient errors propagate unchanged
return e, nil
}),
})
Schema Usage:
type: handle
error_handler: order-recovery
child:
ref: risky-operation
ValidationError
type ValidationError struct {
Path []string // Path to error in schema
Message string
}
func (e ValidationError) Error() string
ValidationErrors
type ValidationErrors []ValidationError
func (e ValidationErrors) Error() string
Constants
Default Values
const (
DefaultRetryAttempts = 3
DefaultTimeoutDuration = 30 * time.Second
DefaultCircuitBreakerThreshold = 5
DefaultRecoveryTimeout = 60 * time.Second
DefaultRequestsPerSecond = 10.0
DefaultBurstSize = 1
)
Next Steps
- Schema Format - YAML/JSON specification
- Connector Types - All connector options
- Events - Observability signals