Connector Types
Detailed reference for all Flume connector types and their behaviour.
Flow Control
sequence
Processes data through multiple steps in order.
Schema:
type: sequence
children:
- ref: step1
- ref: step2
- ref: step3
Behaviour:
- Process step1 with input data
- Pass step1 output to step2
- Pass step2 output to step3
- Return step3 output
Error Handling: Stops on first error, returns that error.
Use Cases:
- Multi-stage processing
- Pipeline composition
- Sequential transformations
concurrent
Executes multiple steps in parallel.
Schema:
type: concurrent
children:
- ref: task1
- ref: task2
- ref: task3
Behaviour:
- Clone input data for each child
- Execute all children concurrently
- Wait for all to complete
- Merge results
Error Handling: Collects all errors, returns first error encountered.
Requirements: Type T must implement pipz.Cloner[T].
Use Cases:
- Parallel enrichment from multiple sources
- Independent validations
- Concurrent API calls
race
Returns first successful result.
Schema:
type: race
children:
- ref: fast-path
- ref: slow-path
Behaviour:
- Clone input data for each child
- Execute all children concurrently
- Return first successful result
- Cancel other executions
Error Handling: Returns error only if all children fail.
Requirements: Type T must implement pipz.Cloner[T].
Use Cases:
- Redundant data sources
- Fastest-wins scenarios
- Speculative execution
Error Handling
fallback
Provides backup processing on failure.
Schema:
type: fallback
children:
- ref: primary
- ref: backup
Behaviour:
- Execute primary with input
- If primary succeeds, return result
- If primary fails, execute backup
- Return backup result or error
Requirements: Exactly 2 children.
Use Cases:
- Service redundancy
- Graceful degradation
- Default value provision
retry
Retries on transient failures.
Schema:
type: retry
attempts: 3
child:
ref: operation
With backoff:
type: retry
attempts: 5
backoff: "100ms"
child:
ref: operation
Behaviour without backoff:
- Execute child
- If error, retry up to
attemptstimes - Return success or final error
Behaviour with backoff:
- Execute child
- If error, wait
backoffduration - Retry with doubled wait time (exponential)
- Continue until success or
attemptsexhausted
Defaults:
attempts: 3backoff: none (immediate retry)
Use Cases:
- Network timeouts
- Rate limit handling
- Transient service failures
timeout
Enforces execution time limit.
Schema:
type: timeout
duration: "5s"
child:
ref: operation
Behaviour:
- Start child execution with deadline context
- If completes before deadline, return result
- If deadline exceeded, cancel and return error
Default: 30 seconds
Error: Returns context.DeadlineExceeded
Use Cases:
- External API calls
- Long-running computations
- User-facing requests
circuit-breaker
Prevents repeated calls to failing services.
Schema:
type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
ref: service
States:
| State | Behaviour |
|---|---|
| Closed | Normal operation, count failures |
| Open | Fail immediately, don't call child |
| Half-Open | Allow one test request |
Behaviour:
- Closed: Execute child, count failures
- After
failure_thresholdfailures, open circuit - Open: Return error immediately
- After
recovery_timeout, enter half-open - Half-Open: Execute one test request
- If test succeeds, close circuit
- If test fails, reopen circuit
Defaults:
failure_threshold: 5recovery_timeout: 60s
Use Cases:
- External service protection
- Cascade failure prevention
- Allowing service recovery
rate-limit
Controls request throughput.
Schema:
type: rate-limit
requests_per_second: 100.0
burst_size: 10
child:
ref: operation
Behaviour:
- Wait for token from bucket
- Execute child
- Return result
Uses token bucket algorithm:
- Tokens added at
requests_per_secondrate - Up to
burst_sizetokens can accumulate - Request waits if no tokens available
Defaults:
requests_per_second: 10.0burst_size: 1
Use Cases:
- API rate limit compliance
- Resource protection
- Fair usage enforcement
Routing
filter
Conditional execution based on boolean predicate.
Schema:
type: filter
predicate: is-valid
then:
ref: process
else:
ref: handle-invalid # Optional
Behaviour with else:
- Evaluate predicate
- If true, execute
thenbranch - If false, execute
elsebranch - Return result
Behaviour without else:
- Evaluate predicate
- If true, execute
thenbranch - If false, pass data through unchanged
Requirements: Predicate must be registered with factory.
Use Cases:
- Conditional processing
- Feature flags
- Validation gates
switch
Multi-way routing based on condition value.
Schema:
type: switch
condition: category
routes:
electronics:
ref: handle-electronics
clothing:
ref: handle-clothing
food:
ref: handle-food
default:
ref: handle-other
Behaviour:
- Evaluate condition function
- Match return value to route key
- Execute matched route
- If no match and default exists, execute default
- If no match and no default, pass through unchanged
Default Route: The condition function should return "default" for default routing.
Requirements: Condition must be registered with factory.
Use Cases:
- Multi-tenant routing
- Category handling
- Status-based processing
Streaming
stream
Sends data to a registered channel.
Schema (terminal):
stream: output-channel
Schema (with timeout):
stream: output-channel
stream_timeout: "5s"
Schema (with continuation):
stream: audit-channel
child:
ref: continue-processing
Behaviour (terminal):
- Send data to channel
- Return data (unchanged)
Behaviour (with child):
- Send data to channel
- Execute child
- Return child result
Timeout Behaviour:
- Without
stream_timeout: Blocks until channel accepts or context cancelled - With
stream_timeout: Returns error if write doesn't complete within duration
Requirements: Channel must be registered with factory.
Use Cases:
- Stream processing integration
- Async event publishing
- Audit logging
- Fan-out patterns
Advanced
contest
Runs children concurrently and returns the first result satisfying a predicate.
Schema:
type: contest
predicate: is-valid
children:
- ref: source1
- ref: source2
- ref: source3
Behaviour:
- Clone input data for each child
- Execute all children concurrently
- Evaluate predicate against each result as it completes
- Return first result where predicate returns true
- Cancel remaining executions
Error Handling: Returns error if no child produces a result satisfying the predicate.
Requirements:
- Type
Tmust implementpipz.Cloner[T] - Predicate must be registered with factory
Use Cases:
- Find first valid response from multiple sources
- Speculative execution with validation
- Best-effort data retrieval
handle
Wraps a child with custom error handling logic.
Schema:
type: handle
error_handler: recovery-handler
child:
ref: risky-operation
Behaviour:
- Execute child
- If child succeeds, return result
- If child fails, pass error to handler
- Handler receives
*pipz.Error[T]with original data and error - Return handler result
Error Handling: Handler can recover, transform, or propagate the error.
Requirements: Error handler must be registered with factory.
Use Cases:
- Custom error recovery
- Error transformation
- Contextual error handling
- Compensating transactions
scaffold
Structural wrapper that groups children for organisational purposes.
Schema:
type: scaffold
name: validation-stage
children:
- ref: validate-format
- ref: validate-business-rules
- ref: validate-permissions
Behaviour:
- Execute children in sequence
- Pass output of each child to the next
- Return final child's output
Error Handling: Stops on first error, returns that error.
Use Cases:
- Logical grouping of related processors
- Pipeline organisation
- Named stages for observability
worker-pool
Distributes work across a pool of workers.
Schema:
type: worker-pool
workers: 8
children:
- ref: task1
- ref: task2
- ref: task3
Behaviour:
- Create pool of
workersgoroutines - Distribute children across workers
- Execute concurrently within worker limit
- Collect and merge results
Defaults:
workers: 4
Requirements: Type T must implement pipz.Cloner[T].
Use Cases:
- Bounded concurrency
- Resource-constrained parallel processing
- Batch processing with limits
Comparison Table
| Type | Purpose | Error Behaviour | Cloning |
|---|---|---|---|
| sequence | Chain steps | Stop on error | No |
| concurrent | Parallel execution | Return first error | Yes |
| race | First success | Error if all fail | Yes |
| fallback | Backup processing | Try backup | No |
| retry | Transient failures | Retry n times | No |
| timeout | Time limit | Cancel on timeout | No |
| circuit-breaker | Failure protection | Fail fast when open | No |
| rate-limit | Throughput control | Wait for token | No |
| filter | Conditional routing | From branch | No |
| switch | Multi-way routing | From route | No |
| stream | Channel output | From send | No |
| contest | First valid result | Error if none valid | Yes |
| handle | Error handling | From handler | No |
| scaffold | Logical grouping | Stop on error | No |
| worker-pool | Bounded parallelism | Return first error | Yes |
Next Steps
- Schema Format - YAML/JSON syntax
- Events - Observability signals