Schema Format
Complete specification for Flume schema documents.
Overview
Schemas can be written in YAML or JSON. This reference uses YAML examples; JSON equivalents use standard JSON syntax.
Schema Structure
version: "1.0.0" # Optional: schema version
type: sequence # Required: connector type OR ref
name: my-pipeline # Optional: override connector name
children: # Depends on type
- ref: step1
- ref: step2
Top-Level Fields
| Field | Type | Required | Description |
|---|---|---|---|
version | string | No | Schema version for tracking changes |
ref | string | Conditional | Reference to registered processor |
type | string | Conditional | Connector type |
name | string | No | Override default connector name |
Either ref or type is required, but not both.
Processor Reference
Reference a registered processor by name:
ref: validate
Equivalent to calling the processor directly.
Connector Types
sequence
Sequential processing through multiple steps.
type: sequence
name: my-sequence # Optional
children: # Required: 1+ children
- ref: step1
- ref: step2
- ref: step3
| Field | Type | Required | Default |
|---|---|---|---|
children | Node | Yes | - |
name | string | No | "sequence" |
concurrent
Parallel execution with data cloning.
type: concurrent
name: parallel-steps # Optional
children: # Required: 1+ children
- ref: task1
- ref: task2
| Field | Type | Required | Default |
|---|---|---|---|
children | Node | Yes | - |
name | string | No | "concurrent" |
Note: Requires T to implement pipz.Cloner[T].
race
First successful result wins.
type: race
children:
- ref: fast-path
- ref: slow-path
| Field | Type | Required | Default |
|---|---|---|---|
children | Node | Yes | - |
name | string | No | "race" |
Note: Requires T to implement pipz.Cloner[T].
fallback
Try primary, use fallback on error.
type: fallback
children:
- ref: primary # First child is primary
- ref: backup # Second child is fallback
| Field | Type | Required | Default |
|---|---|---|---|
children | Node (exactly 2) | Yes | - |
name | string | No | "fallback" |
retry
Retry operation on failure.
type: retry
attempts: 5 # Optional
backoff: "100ms" # Optional: enables exponential backoff
child: # Required
ref: flaky-operation
| Field | Type | Required | Default |
|---|---|---|---|
child | Node | Yes | - |
attempts | int | No | 3 |
backoff | duration | No | (none) |
name | string | No | "retry" or "backoff" |
timeout
Enforce time limit.
type: timeout
duration: "5s" # Optional
child: # Required
ref: slow-operation
| Field | Type | Required | Default |
|---|---|---|---|
child | Node | Yes | - |
duration | duration | No | "30s" |
name | string | No | "timeout" |
filter
Conditional execution based on predicate.
type: filter
predicate: is-valid # Required: registered predicate name
then: # Required
ref: process-valid
else: # Optional
ref: handle-invalid
| Field | Type | Required | Default |
|---|---|---|---|
predicate | string | Yes | - |
then | Node | Yes | - |
else | Node | No | (passthrough) |
name | string | No | "filter-{predicate}" |
switch
Multi-way routing based on condition.
type: switch
condition: order-type # Required: registered condition name
routes: # Required: 1+ routes
subscription:
ref: handle-subscription
one-time:
ref: handle-single
default: # Optional
ref: handle-unknown
| Field | Type | Required | Default |
|---|---|---|---|
condition | string | Yes | - |
routes | mapstringNode | Yes | - |
default | Node | No | (none) |
name | string | No | "switch-{condition}" |
Default Routing: The default node is registered as a route with key "default". For default routing to work, your condition function must return the string "default" when no other route matches:
factory.AddCondition(flume.Condition[Order]{
Name: "order-type",
Condition: func(ctx context.Context, o Order) string {
switch o.Type {
case "subscription":
return "subscription"
case "one-time":
return "one-time"
default:
return "default" // Triggers the default route
}
},
})
circuit-breaker
Circuit breaker pattern.
type: circuit-breaker
failure_threshold: 5 # Optional
recovery_timeout: "60s" # Optional
child: # Required
ref: external-service
| Field | Type | Required | Default |
|---|---|---|---|
child | Node | Yes | - |
failure_threshold | int | No | 5 |
recovery_timeout | duration | No | "60s" |
name | string | No | "circuit-breaker" |
rate-limit
Rate limiting with token bucket.
type: rate-limit
requests_per_second: 100.0 # Optional
burst_size: 10 # Optional
child: # Required
ref: rate-sensitive-api
| Field | Type | Required | Default |
|---|---|---|---|
child | Node | Yes | - |
requests_per_second | float | No | 10.0 |
burst_size | int | No | 1 |
name | string | No | "rate-limit" |
stream
Send to registered channel.
stream: output-channel # Required: channel name
stream_timeout: "5s" # Optional: timeout for channel write
child: # Optional: continue processing
ref: next-step
| Field | Type | Required | Default |
|---|---|---|---|
stream | string | Yes | - |
stream_timeout | duration | No | (blocks indefinitely) |
child | Node | No | (none) |
children | Node | No | (none) |
name | string | No | "stream:{name}" |
Timeout Behaviour: Without stream_timeout, the stream blocks until the channel accepts the item or the context is cancelled. With stream_timeout, returns an error if the channel doesn't accept within the duration.
contest
First result satisfying a predicate.
type: contest
predicate: is-valid # Required: registered predicate name
children: # Required: 1+ children
- ref: source1
- ref: source2
| Field | Type | Required | Default |
|---|---|---|---|
predicate | string | Yes | - |
children | Node | Yes | - |
name | string | No | "contest-{predicate}" |
Note: Requires T to implement pipz.Cloner[T].
handle
Wrap child with error handling.
type: handle
error_handler: recovery # Required: registered error handler name
child: # Required
ref: risky-operation
| Field | Type | Required | Default |
|---|---|---|---|
error_handler | string | Yes | - |
child | Node | Yes | - |
name | string | No | "handle-{handler}" |
scaffold
Structural grouping of children.
type: scaffold
name: validation-stage # Optional
children: # Required: 1+ children
- ref: step1
- ref: step2
| Field | Type | Required | Default |
|---|---|---|---|
children | Node | Yes | - |
name | string | No | "scaffold" |
worker-pool
Bounded parallel execution.
type: worker-pool
workers: 8 # Optional
children: # Required: 1+ children
- ref: task1
- ref: task2
| Field | Type | Required | Default |
|---|---|---|---|
children | Node | Yes | - |
workers | int | No | 4 |
name | string | No | "worker-pool" |
Note: Requires T to implement pipz.Cloner[T].
Duration Format
Durations use Go's duration format:
| Format | Example |
|---|---|
| Nanoseconds | 100ns |
| Microseconds | 50µs or 50us |
| Milliseconds | 100ms |
| Seconds | 5s |
| Minutes | 2m |
| Hours | 1h |
| Combined | 1h30m, 2m30s |
Nesting
Nodes can be nested arbitrarily:
type: sequence
children:
- ref: validate
- type: filter
predicate: is-premium
then:
type: retry
attempts: 3
child:
type: timeout
duration: "5s"
child:
ref: premium-process
- ref: finalize
JSON Format
Equivalent JSON for the nested example:
{
"type": "sequence",
"children": [
{"ref": "validate"},
{
"type": "filter",
"predicate": "is-premium",
"then": {
"type": "retry",
"attempts": 3,
"child": {
"type": "timeout",
"duration": "5s",
"child": {"ref": "premium-process"}
}
}
},
{"ref": "finalize"}
]
}
Validation Rules
- Either ref or type - Node must have exactly one
- Required fields - Each type has required fields
- Registered components - All refs, predicates, conditions must exist
- Correct counts - fallback needs exactly 2 children
- Valid durations - Duration strings must parse
- Positive values - attempts, threshold must be positive
- No cycles - Reference chains cannot loop
Next Steps
- Connector Types - Detailed connector behaviour
- API Reference - Factory methods