zoobzio December 11, 2025 Edit this page

Connector Types

Detailed reference for all Flume connector types and their behaviour.

Flow Control

sequence

Processes data through multiple steps in order.

Schema:

type: sequence
children:
  - ref: step1
  - ref: step2
  - ref: step3

Behaviour:

  1. Process step1 with input data
  2. Pass step1 output to step2
  3. Pass step2 output to step3
  4. Return step3 output

Error Handling: Stops on first error, returns that error.

Use Cases:

  • Multi-stage processing
  • Pipeline composition
  • Sequential transformations

concurrent

Executes multiple steps in parallel.

Schema:

type: concurrent
children:
  - ref: task1
  - ref: task2
  - ref: task3

Behaviour:

  1. Clone input data for each child
  2. Execute all children concurrently
  3. Wait for all to complete
  4. Merge results

Error Handling: Collects all errors, returns first error encountered.

Requirements: Type T must implement pipz.Cloner[T].

Use Cases:

  • Parallel enrichment from multiple sources
  • Independent validations
  • Concurrent API calls

race

Returns first successful result.

Schema:

type: race
children:
  - ref: fast-path
  - ref: slow-path

Behaviour:

  1. Clone input data for each child
  2. Execute all children concurrently
  3. Return first successful result
  4. Cancel other executions

Error Handling: Returns error only if all children fail.

Requirements: Type T must implement pipz.Cloner[T].

Use Cases:

  • Redundant data sources
  • Fastest-wins scenarios
  • Speculative execution

Error Handling

fallback

Provides backup processing on failure.

Schema:

type: fallback
children:
  - ref: primary
  - ref: backup

Behaviour:

  1. Execute primary with input
  2. If primary succeeds, return result
  3. If primary fails, execute backup
  4. Return backup result or error

Requirements: Exactly 2 children.

Use Cases:

  • Service redundancy
  • Graceful degradation
  • Default value provision

retry

Retries on transient failures.

Schema:

type: retry
attempts: 3
child:
  ref: operation

With backoff:

type: retry
attempts: 5
backoff: "100ms"
child:
  ref: operation

Behaviour without backoff:

  1. Execute child
  2. If error, retry up to attempts times
  3. Return success or final error

Behaviour with backoff:

  1. Execute child
  2. If error, wait backoff duration
  3. Retry with doubled wait time (exponential)
  4. Continue until success or attempts exhausted

Defaults:

  • attempts: 3
  • backoff: none (immediate retry)

Use Cases:

  • Network timeouts
  • Rate limit handling
  • Transient service failures

timeout

Enforces execution time limit.

Schema:

type: timeout
duration: "5s"
child:
  ref: operation

Behaviour:

  1. Start child execution with deadline context
  2. If completes before deadline, return result
  3. If deadline exceeded, cancel and return error

Default: 30 seconds

Error: Returns context.DeadlineExceeded

Use Cases:

  • External API calls
  • Long-running computations
  • User-facing requests

circuit-breaker

Prevents repeated calls to failing services.

Schema:

type: circuit-breaker
failure_threshold: 5
recovery_timeout: "30s"
child:
  ref: service

States:

StateBehaviour
ClosedNormal operation, count failures
OpenFail immediately, don't call child
Half-OpenAllow one test request

Behaviour:

  1. Closed: Execute child, count failures
  2. After failure_threshold failures, open circuit
  3. Open: Return error immediately
  4. After recovery_timeout, enter half-open
  5. Half-Open: Execute one test request
  6. If test succeeds, close circuit
  7. If test fails, reopen circuit

Defaults:

  • failure_threshold: 5
  • recovery_timeout: 60s

Use Cases:

  • External service protection
  • Cascade failure prevention
  • Allowing service recovery

rate-limit

Controls request throughput.

Schema:

type: rate-limit
requests_per_second: 100.0
burst_size: 10
child:
  ref: operation

Behaviour:

  1. Wait for token from bucket
  2. Execute child
  3. Return result

Uses token bucket algorithm:

  • Tokens added at requests_per_second rate
  • Up to burst_size tokens can accumulate
  • Request waits if no tokens available

Defaults:

  • requests_per_second: 10.0
  • burst_size: 1

Use Cases:

  • API rate limit compliance
  • Resource protection
  • Fair usage enforcement

Routing

filter

Conditional execution based on boolean predicate.

Schema:

type: filter
predicate: is-valid
then:
  ref: process
else:
  ref: handle-invalid  # Optional

Behaviour with else:

  1. Evaluate predicate
  2. If true, execute then branch
  3. If false, execute else branch
  4. Return result

Behaviour without else:

  1. Evaluate predicate
  2. If true, execute then branch
  3. If false, pass data through unchanged

Requirements: Predicate must be registered with factory.

Use Cases:

  • Conditional processing
  • Feature flags
  • Validation gates

switch

Multi-way routing based on condition value.

Schema:

type: switch
condition: category
routes:
  electronics:
    ref: handle-electronics
  clothing:
    ref: handle-clothing
  food:
    ref: handle-food
default:
  ref: handle-other

Behaviour:

  1. Evaluate condition function
  2. Match return value to route key
  3. Execute matched route
  4. If no match and default exists, execute default
  5. If no match and no default, pass through unchanged

Default Route: The condition function should return "default" for default routing.

Requirements: Condition must be registered with factory.

Use Cases:

  • Multi-tenant routing
  • Category handling
  • Status-based processing

Streaming

stream

Sends data to a registered channel.

Schema (terminal):

stream: output-channel

Schema (with timeout):

stream: output-channel
stream_timeout: "5s"

Schema (with continuation):

stream: audit-channel
child:
  ref: continue-processing

Behaviour (terminal):

  1. Send data to channel
  2. Return data (unchanged)

Behaviour (with child):

  1. Send data to channel
  2. Execute child
  3. Return child result

Timeout Behaviour:

  • Without stream_timeout: Blocks until channel accepts or context cancelled
  • With stream_timeout: Returns error if write doesn't complete within duration

Requirements: Channel must be registered with factory.

Use Cases:

  • Stream processing integration
  • Async event publishing
  • Audit logging
  • Fan-out patterns

Advanced

contest

Runs children concurrently and returns the first result satisfying a predicate.

Schema:

type: contest
predicate: is-valid
children:
  - ref: source1
  - ref: source2
  - ref: source3

Behaviour:

  1. Clone input data for each child
  2. Execute all children concurrently
  3. Evaluate predicate against each result as it completes
  4. Return first result where predicate returns true
  5. Cancel remaining executions

Error Handling: Returns error if no child produces a result satisfying the predicate.

Requirements:

  • Type T must implement pipz.Cloner[T]
  • Predicate must be registered with factory

Use Cases:

  • Find first valid response from multiple sources
  • Speculative execution with validation
  • Best-effort data retrieval

handle

Wraps a child with custom error handling logic.

Schema:

type: handle
error_handler: recovery-handler
child:
  ref: risky-operation

Behaviour:

  1. Execute child
  2. If child succeeds, return result
  3. If child fails, pass error to handler
  4. Handler receives *pipz.Error[T] with original data and error
  5. Return handler result

Error Handling: Handler can recover, transform, or propagate the error.

Requirements: Error handler must be registered with factory.

Use Cases:

  • Custom error recovery
  • Error transformation
  • Contextual error handling
  • Compensating transactions

scaffold

Structural wrapper that groups children for organisational purposes.

Schema:

type: scaffold
name: validation-stage
children:
  - ref: validate-format
  - ref: validate-business-rules
  - ref: validate-permissions

Behaviour:

  1. Execute children in sequence
  2. Pass output of each child to the next
  3. Return final child's output

Error Handling: Stops on first error, returns that error.

Use Cases:

  • Logical grouping of related processors
  • Pipeline organisation
  • Named stages for observability

worker-pool

Distributes work across a pool of workers.

Schema:

type: worker-pool
workers: 8
children:
  - ref: task1
  - ref: task2
  - ref: task3

Behaviour:

  1. Create pool of workers goroutines
  2. Distribute children across workers
  3. Execute concurrently within worker limit
  4. Collect and merge results

Defaults:

  • workers: 4

Requirements: Type T must implement pipz.Cloner[T].

Use Cases:

  • Bounded concurrency
  • Resource-constrained parallel processing
  • Batch processing with limits

Comparison Table

TypePurposeError BehaviourCloning
sequenceChain stepsStop on errorNo
concurrentParallel executionReturn first errorYes
raceFirst successError if all failYes
fallbackBackup processingTry backupNo
retryTransient failuresRetry n timesNo
timeoutTime limitCancel on timeoutNo
circuit-breakerFailure protectionFail fast when openNo
rate-limitThroughput controlWait for tokenNo
filterConditional routingFrom branchNo
switchMulti-way routingFrom routeNo
streamChannel outputFrom sendNo
contestFirst valid resultError if none validYes
handleError handlingFrom handlerNo
scaffoldLogical groupingStop on errorNo
worker-poolBounded parallelismReturn first errorYes

Next Steps