zoobzio December 11, 2025 Edit this page

Schema Format

Complete specification for Flume schema documents.

Overview

Schemas can be written in YAML or JSON. This reference uses YAML examples; JSON equivalents use standard JSON syntax.

Schema Structure

version: "1.0.0"    # Optional: schema version
type: sequence      # Required: connector type OR ref
name: my-pipeline   # Optional: override connector name
children:           # Depends on type
  - ref: step1
  - ref: step2

Top-Level Fields

FieldTypeRequiredDescription
versionstringNoSchema version for tracking changes
refstringConditionalReference to registered processor
typestringConditionalConnector type
namestringNoOverride default connector name

Either ref or type is required, but not both.

Processor Reference

Reference a registered processor by name:

ref: validate

Equivalent to calling the processor directly.

Connector Types

sequence

Sequential processing through multiple steps.

type: sequence
name: my-sequence      # Optional
children:              # Required: 1+ children
  - ref: step1
  - ref: step2
  - ref: step3
FieldTypeRequiredDefault
childrenNodeYes-
namestringNo"sequence"

concurrent

Parallel execution with data cloning.

type: concurrent
name: parallel-steps   # Optional
children:              # Required: 1+ children
  - ref: task1
  - ref: task2
FieldTypeRequiredDefault
childrenNodeYes-
namestringNo"concurrent"

Note: Requires T to implement pipz.Cloner[T].


race

First successful result wins.

type: race
children:
  - ref: fast-path
  - ref: slow-path
FieldTypeRequiredDefault
childrenNodeYes-
namestringNo"race"

Note: Requires T to implement pipz.Cloner[T].


fallback

Try primary, use fallback on error.

type: fallback
children:
  - ref: primary       # First child is primary
  - ref: backup        # Second child is fallback
FieldTypeRequiredDefault
childrenNode (exactly 2)Yes-
namestringNo"fallback"

retry

Retry operation on failure.

type: retry
attempts: 5            # Optional
backoff: "100ms"       # Optional: enables exponential backoff
child:                 # Required
  ref: flaky-operation
FieldTypeRequiredDefault
childNodeYes-
attemptsintNo3
backoffdurationNo(none)
namestringNo"retry" or "backoff"

timeout

Enforce time limit.

type: timeout
duration: "5s"         # Optional
child:                 # Required
  ref: slow-operation
FieldTypeRequiredDefault
childNodeYes-
durationdurationNo"30s"
namestringNo"timeout"

filter

Conditional execution based on predicate.

type: filter
predicate: is-valid    # Required: registered predicate name
then:                  # Required
  ref: process-valid
else:                  # Optional
  ref: handle-invalid
FieldTypeRequiredDefault
predicatestringYes-
thenNodeYes-
elseNodeNo(passthrough)
namestringNo"filter-{predicate}"

switch

Multi-way routing based on condition.

type: switch
condition: order-type  # Required: registered condition name
routes:                # Required: 1+ routes
  subscription:
    ref: handle-subscription
  one-time:
    ref: handle-single
default:               # Optional
  ref: handle-unknown
FieldTypeRequiredDefault
conditionstringYes-
routesmapstringNodeYes-
defaultNodeNo(none)
namestringNo"switch-{condition}"

Default Routing: The default node is registered as a route with key "default". For default routing to work, your condition function must return the string "default" when no other route matches:

factory.AddCondition(flume.Condition[Order]{
    Name: "order-type",
    Condition: func(ctx context.Context, o Order) string {
        switch o.Type {
        case "subscription":
            return "subscription"
        case "one-time":
            return "one-time"
        default:
            return "default"  // Triggers the default route
        }
    },
})

circuit-breaker

Circuit breaker pattern.

type: circuit-breaker
failure_threshold: 5   # Optional
recovery_timeout: "60s"  # Optional
child:                 # Required
  ref: external-service
FieldTypeRequiredDefault
childNodeYes-
failure_thresholdintNo5
recovery_timeoutdurationNo"60s"
namestringNo"circuit-breaker"

rate-limit

Rate limiting with token bucket.

type: rate-limit
requests_per_second: 100.0  # Optional
burst_size: 10              # Optional
child:                      # Required
  ref: rate-sensitive-api
FieldTypeRequiredDefault
childNodeYes-
requests_per_secondfloatNo10.0
burst_sizeintNo1
namestringNo"rate-limit"

stream

Send to registered channel.

stream: output-channel  # Required: channel name
stream_timeout: "5s"    # Optional: timeout for channel write
child:                  # Optional: continue processing
  ref: next-step
FieldTypeRequiredDefault
streamstringYes-
stream_timeoutdurationNo(blocks indefinitely)
childNodeNo(none)
childrenNodeNo(none)
namestringNo"stream:{name}"

Timeout Behaviour: Without stream_timeout, the stream blocks until the channel accepts the item or the context is cancelled. With stream_timeout, returns an error if the channel doesn't accept within the duration.


contest

First result satisfying a predicate.

type: contest
predicate: is-valid      # Required: registered predicate name
children:                # Required: 1+ children
  - ref: source1
  - ref: source2
FieldTypeRequiredDefault
predicatestringYes-
childrenNodeYes-
namestringNo"contest-{predicate}"

Note: Requires T to implement pipz.Cloner[T].


handle

Wrap child with error handling.

type: handle
error_handler: recovery  # Required: registered error handler name
child:                   # Required
  ref: risky-operation
FieldTypeRequiredDefault
error_handlerstringYes-
childNodeYes-
namestringNo"handle-{handler}"

scaffold

Structural grouping of children.

type: scaffold
name: validation-stage   # Optional
children:                # Required: 1+ children
  - ref: step1
  - ref: step2
FieldTypeRequiredDefault
childrenNodeYes-
namestringNo"scaffold"

worker-pool

Bounded parallel execution.

type: worker-pool
workers: 8               # Optional
children:                # Required: 1+ children
  - ref: task1
  - ref: task2
FieldTypeRequiredDefault
childrenNodeYes-
workersintNo4
namestringNo"worker-pool"

Note: Requires T to implement pipz.Cloner[T].


Duration Format

Durations use Go's duration format:

FormatExample
Nanoseconds100ns
Microseconds50µs or 50us
Milliseconds100ms
Seconds5s
Minutes2m
Hours1h
Combined1h30m, 2m30s

Nesting

Nodes can be nested arbitrarily:

type: sequence
children:
  - ref: validate
  - type: filter
    predicate: is-premium
    then:
      type: retry
      attempts: 3
      child:
        type: timeout
        duration: "5s"
        child:
          ref: premium-process
  - ref: finalize

JSON Format

Equivalent JSON for the nested example:

{
  "type": "sequence",
  "children": [
    {"ref": "validate"},
    {
      "type": "filter",
      "predicate": "is-premium",
      "then": {
        "type": "retry",
        "attempts": 3,
        "child": {
          "type": "timeout",
          "duration": "5s",
          "child": {"ref": "premium-process"}
        }
      }
    },
    {"ref": "finalize"}
  ]
}

Validation Rules

  1. Either ref or type - Node must have exactly one
  2. Required fields - Each type has required fields
  3. Registered components - All refs, predicates, conditions must exist
  4. Correct counts - fallback needs exactly 2 children
  5. Valid durations - Duration strings must parse
  6. Positive values - attempts, threshold must be positive
  7. No cycles - Reference chains cannot loop

Next Steps