zoobzio December 11, 2025 Edit this page

Quickstart

Build your first schema-driven pipeline in 5 minutes.

Installation

go get github.com/zoobz-io/flume

Step 1: Define Your Data Type

Your data type must implement pipz.Cloner[T]:

package main

import (
    "context"
    "fmt"

    "github.com/zoobz-io/flume"
    "github.com/zoobz-io/pipz"
)

type Order struct {
    ID       string
    Customer string
    Total    float64
    Status   string
}

func (o Order) Clone() Order {
    return Order{
        ID:       o.ID,
        Customer: o.Customer,
        Total:    o.Total,
        Status:   o.Status,
    }
}

Step 2: Create a Factory

func main() {
    factory := flume.New[Order]()
}

Step 3: Register Processors

    // Validate order
    factory.Add(pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
        if o.Total <= 0 {
            return o, fmt.Errorf("invalid total: %f", o.Total)
        }
        return o, nil
    }))

    // Enrich with status
    factory.Add(pipz.Transform("enrich", func(ctx context.Context, o Order) Order {
        o.Status = "validated"
        return o
    }))

    // Log order
    factory.Add(pipz.Effect("log", func(ctx context.Context, o Order) error {
        fmt.Printf("Processing order: %s for %s ($%.2f)\n", o.ID, o.Customer, o.Total)
        return nil
    }))

Step 4: Define Schema

    schema := `
type: sequence
children:
  - ref: validate
  - ref: enrich
  - ref: log
`

Step 5: Build and Use

    pipeline, err := factory.BuildFromYAML(schema)
    if err != nil {
        panic(err)
    }

    order := Order{
        ID:       "ORD-001",
        Customer: "Alice",
        Total:    99.99,
    }

    result, err := pipeline.Process(context.Background(), order)
    if err != nil {
        panic(err)
    }

    fmt.Printf("Result: %+v\n", result)
}

Complete Example

package main

import (
    "context"
    "fmt"

    "github.com/zoobz-io/flume"
    "github.com/zoobz-io/pipz"
)

type Order struct {
    ID       string
    Customer string
    Total    float64
    Status   string
}

func (o Order) Clone() Order {
    return Order{ID: o.ID, Customer: o.Customer, Total: o.Total, Status: o.Status}
}

func main() {
    factory := flume.New[Order]()

    // Register processors
    factory.Add(
        pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
            if o.Total <= 0 {
                return o, fmt.Errorf("invalid total: %f", o.Total)
            }
            return o, nil
        }),
        pipz.Transform("enrich", func(ctx context.Context, o Order) Order {
            o.Status = "validated"
            return o
        }),
        pipz.Effect("log", func(ctx context.Context, o Order) error {
            fmt.Printf("Order: %s ($%.2f)\n", o.ID, o.Total)
            return nil
        }),
    )

    // Define and build pipeline
    schema := `
type: sequence
children:
  - ref: validate
  - ref: enrich
  - ref: log
`

    pipeline, err := factory.BuildFromYAML(schema)
    if err != nil {
        panic(err)
    }

    // Process
    result, err := pipeline.Process(context.Background(), Order{
        ID:       "ORD-001",
        Customer: "Alice",
        Total:    99.99,
    })
    if err != nil {
        panic(err)
    }

    fmt.Printf("Final: %+v\n", result)
}

Output:

Order: ORD-001 ($99.99)
Final: {ID:ORD-001 Customer:Alice Total:99.99 Status:validated}

Adding Conditional Logic

Add a predicate for conditional processing:

    factory.AddPredicate(flume.Predicate[Order]{
        Name: "high-value",
        Predicate: func(ctx context.Context, o Order) bool {
            return o.Total > 100
        },
    })

    factory.Add(pipz.Effect("premium-notify", func(ctx context.Context, o Order) error {
        fmt.Println("High-value order detected!")
        return nil
    }))

    schema := `
type: sequence
children:
  - ref: validate
  - type: filter
    predicate: high-value
    then:
      ref: premium-notify
  - ref: log
`

Adding Error Handling

Wrap processors with retry:

    schema := `
type: sequence
children:
  - ref: validate
  - type: retry
    attempts: 3
    child:
      ref: external-api-call
  - ref: log
`

Merging Concurrent Results

When running processors concurrently, use a reducer to merge results:

    // Register parallel enrichment processors
    factory.Add(
        pipz.Transform("fetch-customer", func(ctx context.Context, o Order) Order {
            o.Customer = "Alice (Premium)"
            return o
        }),
        pipz.Transform("fetch-inventory", func(ctx context.Context, o Order) Order {
            o.Status = "in-stock"
            return o
        }),
    )

    // Register a reducer to merge results
    factory.AddReducer(flume.Reducer[Order]{
        Name: "merge-enrichments",
        Reducer: func(original Order, results map[pipz.Name]Order, errors map[pipz.Name]error) Order {
            for _, result := range results {
                if result.Customer != "" {
                    original.Customer = result.Customer
                }
                if result.Status != "" {
                    original.Status = result.Status
                }
            }
            return original
        },
    })

    schema := `
type: sequence
children:
  - ref: validate
  - type: concurrent
    reducer: merge-enrichments
    children:
      - ref: fetch-customer
      - ref: fetch-inventory
  - ref: log
`

Loading from Files

    pipeline, err := factory.BuildFromFile("pipeline.yaml")

Using Named Schemas

For hot-reloading support:

    // Register named schema
    err := factory.SetSchema("order-processing", schema)
    if err != nil {
        panic(err)
    }

    // Create binding with auto-sync
    pipelineID := factory.Identity("order-processor", "Processes orders")
    binding, err := factory.Bind(pipelineID, "order-processing", flume.WithAutoSync())
    if err != nil {
        panic(err)
    }

    // Process requests
    result, err := binding.Process(ctx, order)

    // Update schema at runtime - binding rebuilds automatically
    newSchema := Schema{...}
    factory.SetSchema("order-processing", newSchema)

Next Steps