Quickstart
Build your first schema-driven pipeline in 5 minutes.
Installation
go get github.com/zoobz-io/flume
Step 1: Define Your Data Type
Your data type must implement pipz.Cloner[T]:
package main
import (
"context"
"fmt"
"github.com/zoobz-io/flume"
"github.com/zoobz-io/pipz"
)
type Order struct {
ID string
Customer string
Total float64
Status string
}
func (o Order) Clone() Order {
return Order{
ID: o.ID,
Customer: o.Customer,
Total: o.Total,
Status: o.Status,
}
}
Step 2: Create a Factory
func main() {
factory := flume.New[Order]()
}
Step 3: Register Processors
// Validate order
factory.Add(pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, fmt.Errorf("invalid total: %f", o.Total)
}
return o, nil
}))
// Enrich with status
factory.Add(pipz.Transform("enrich", func(ctx context.Context, o Order) Order {
o.Status = "validated"
return o
}))
// Log order
factory.Add(pipz.Effect("log", func(ctx context.Context, o Order) error {
fmt.Printf("Processing order: %s for %s ($%.2f)\n", o.ID, o.Customer, o.Total)
return nil
}))
Step 4: Define Schema
schema := `
type: sequence
children:
- ref: validate
- ref: enrich
- ref: log
`
Step 5: Build and Use
pipeline, err := factory.BuildFromYAML(schema)
if err != nil {
panic(err)
}
order := Order{
ID: "ORD-001",
Customer: "Alice",
Total: 99.99,
}
result, err := pipeline.Process(context.Background(), order)
if err != nil {
panic(err)
}
fmt.Printf("Result: %+v\n", result)
}
Complete Example
package main
import (
"context"
"fmt"
"github.com/zoobz-io/flume"
"github.com/zoobz-io/pipz"
)
type Order struct {
ID string
Customer string
Total float64
Status string
}
func (o Order) Clone() Order {
return Order{ID: o.ID, Customer: o.Customer, Total: o.Total, Status: o.Status}
}
func main() {
factory := flume.New[Order]()
// Register processors
factory.Add(
pipz.Apply("validate", func(ctx context.Context, o Order) (Order, error) {
if o.Total <= 0 {
return o, fmt.Errorf("invalid total: %f", o.Total)
}
return o, nil
}),
pipz.Transform("enrich", func(ctx context.Context, o Order) Order {
o.Status = "validated"
return o
}),
pipz.Effect("log", func(ctx context.Context, o Order) error {
fmt.Printf("Order: %s ($%.2f)\n", o.ID, o.Total)
return nil
}),
)
// Define and build pipeline
schema := `
type: sequence
children:
- ref: validate
- ref: enrich
- ref: log
`
pipeline, err := factory.BuildFromYAML(schema)
if err != nil {
panic(err)
}
// Process
result, err := pipeline.Process(context.Background(), Order{
ID: "ORD-001",
Customer: "Alice",
Total: 99.99,
})
if err != nil {
panic(err)
}
fmt.Printf("Final: %+v\n", result)
}
Output:
Order: ORD-001 ($99.99)
Final: {ID:ORD-001 Customer:Alice Total:99.99 Status:validated}
Adding Conditional Logic
Add a predicate for conditional processing:
factory.AddPredicate(flume.Predicate[Order]{
Name: "high-value",
Predicate: func(ctx context.Context, o Order) bool {
return o.Total > 100
},
})
factory.Add(pipz.Effect("premium-notify", func(ctx context.Context, o Order) error {
fmt.Println("High-value order detected!")
return nil
}))
schema := `
type: sequence
children:
- ref: validate
- type: filter
predicate: high-value
then:
ref: premium-notify
- ref: log
`
Adding Error Handling
Wrap processors with retry:
schema := `
type: sequence
children:
- ref: validate
- type: retry
attempts: 3
child:
ref: external-api-call
- ref: log
`
Merging Concurrent Results
When running processors concurrently, use a reducer to merge results:
// Register parallel enrichment processors
factory.Add(
pipz.Transform("fetch-customer", func(ctx context.Context, o Order) Order {
o.Customer = "Alice (Premium)"
return o
}),
pipz.Transform("fetch-inventory", func(ctx context.Context, o Order) Order {
o.Status = "in-stock"
return o
}),
)
// Register a reducer to merge results
factory.AddReducer(flume.Reducer[Order]{
Name: "merge-enrichments",
Reducer: func(original Order, results map[pipz.Name]Order, errors map[pipz.Name]error) Order {
for _, result := range results {
if result.Customer != "" {
original.Customer = result.Customer
}
if result.Status != "" {
original.Status = result.Status
}
}
return original
},
})
schema := `
type: sequence
children:
- ref: validate
- type: concurrent
reducer: merge-enrichments
children:
- ref: fetch-customer
- ref: fetch-inventory
- ref: log
`
Loading from Files
pipeline, err := factory.BuildFromFile("pipeline.yaml")
Using Named Schemas
For hot-reloading support:
// Register named schema
err := factory.SetSchema("order-processing", schema)
if err != nil {
panic(err)
}
// Create binding with auto-sync
pipelineID := factory.Identity("order-processor", "Processes orders")
binding, err := factory.Bind(pipelineID, "order-processing", flume.WithAutoSync())
if err != nil {
panic(err)
}
// Process requests
result, err := binding.Process(ctx, order)
// Update schema at runtime - binding rebuilds automatically
newSchema := Schema{...}
factory.SetSchema("order-processing", newSchema)
Next Steps
- Building Pipelines - Complex schema patterns
- Core Concepts - Deeper understanding
- Schema Format - Complete reference