Schema Design
Best practices for designing clear, maintainable Flume schemas.
Naming Conventions
Processor Names
Use kebab-case with verb-noun patterns:
// Good
pipz.Apply("validate-order", ...)
pipz.Apply("fetch-customer", ...)
pipz.Apply("send-notification", ...)
// Avoid
pipz.Apply("OrderValidation", ...)
pipz.Apply("customer", ...)
pipz.Apply("doStuff", ...)
Predicate Names
Use is-/has-/can- prefixes:
// Good
flume.Predicate[T]{Name: "is-premium", ...}
flume.Predicate[T]{Name: "has-discount-code", ...}
flume.Predicate[T]{Name: "can-ship-internationally", ...}
// Avoid
flume.Predicate[T]{Name: "premium", ...}
flume.Predicate[T]{Name: "check-discount", ...}
Condition Names
Use get- prefix or descriptive nouns:
// Good
flume.Condition[T]{Name: "get-order-type", ...}
flume.Condition[T]{Name: "payment-method", ...}
flume.Condition[T]{Name: "customer-tier", ...}
Schema Names
Use descriptive kebab-case:
factory.SetSchema("order-processing", ...)
factory.SetSchema("user-onboarding", ...)
factory.SetSchema("payment-v2", ...)
Structure Guidelines
Keep Depth Shallow
Deeply nested schemas are hard to read and debug:
# Hard to follow (5+ levels)
type: sequence
children:
- type: filter
predicate: a
then:
type: switch
condition: b
routes:
x:
type: retry
child:
type: timeout
child:
ref: processor
Break into logical sub-schemas:
# Main schema
type: sequence
children:
- ref: validate
- ref: resilient-external-call # Pre-composed
- ref: finalize
// Register composed processor
resilientCall := pipz.NewCircuitBreaker("resilient",
pipz.NewTimeout("timeout", apiCall, 5*time.Second),
3, 30*time.Second,
)
factory.Add(resilientCall)
Group Related Operations
Use sequence to group related steps:
type: sequence
name: payment-processing
children:
- ref: validate-payment
- ref: authorize-payment
- ref: capture-payment
- ref: record-transaction
Name Complex Nodes
Add names to non-trivial nodes for debugging:
type: concurrent
name: parallel-enrichment
children:
- ref: fetch-customer
- ref: fetch-inventory
type: filter
name: premium-check
predicate: is-premium
then:
ref: premium-handler
Version Management
Semantic Versioning
Use semantic versioning for schemas:
version: "1.0.0" # Initial
version: "1.1.0" # Added new route
version: "2.0.0" # Breaking change to structure
Version in Schema Name
For A/B testing or gradual rollouts:
factory.SetSchema("checkout-v1", schemaV1)
factory.SetSchema("checkout-v2", schemaV2)
// Create bindings for each variant
bindingV1, _ := factory.Bind(v1ID, "checkout-v1", flume.WithAutoSync())
bindingV2, _ := factory.Bind(v2ID, "checkout-v2", flume.WithAutoSync())
// Route traffic
binding := getBindingForUser(user, bindingV1, bindingV2)
result, _ := binding.Process(ctx, order)
Error Handling Placement
Wrap at Boundaries
Apply error handling at service boundaries:
type: sequence
children:
- ref: validate # Internal - no wrap needed
- type: circuit-breaker
failure_threshold: 5
child:
type: retry
attempts: 3
child:
ref: external-api # External - wrap
- ref: transform # Internal - no wrap
Don't Over-Protect
Avoid redundant error handling:
# Overkill
type: retry
child:
type: retry
child:
type: fallback
children:
- type: retry
child:
ref: processor
- ref: fallback
Match Retry to Operation
# Quick operation - few retries, no backoff
type: retry
attempts: 2
child:
ref: cache-lookup
# Slow external service - more retries with backoff
type: retry
attempts: 5
backoff: "500ms"
child:
ref: payment-gateway
Performance Considerations
Minimize Cloning
concurrent and race nodes clone data. Avoid unnecessary parallelism:
# Unnecessary clone
type: concurrent
children:
- ref: quick-transform # Could be sequential
# Justified parallelism
type: concurrent
children:
- ref: slow-api-call
- ref: another-slow-call
Rate Limit at Entry Points
Place rate limiters early:
type: sequence
children:
- type: rate-limit
requests_per_second: 100.0
child:
ref: validate # Everything after is protected
- ref: process
- ref: external-call
Use Timeouts Wisely
Set realistic timeouts:
type: timeout
duration: "30s" # Based on actual SLA
child:
ref: external-service
Schema Organization
Single Responsibility
Each schema should do one thing well:
// Separate concerns
factory.SetSchema("order-validation", validationSchema)
factory.SetSchema("order-payment", paymentSchema)
factory.SetSchema("order-fulfillment", fulfillmentSchema)
Compose Larger Flows
Build complex flows from simple schemas:
// High-level orchestration
factory.SetSchema("order-complete", Schema{
Node: Node{
Type: "sequence",
Children: []Node{
{Ref: "order-validation"}, // References another pipeline
{Ref: "order-payment"},
{Ref: "order-fulfillment"},
},
},
})
Documentation Practices
Use Descriptions
Add descriptions when registering:
factory.AddPredicate(flume.Predicate[Order]{
Name: "high-value",
Description: "Orders exceeding $1000 threshold for premium handling",
Predicate: isHighValue,
})
factory.AddCondition(flume.Condition[Order]{
Name: "fulfillment-type",
Description: "Determines shipping vs. digital delivery",
Values: []string{"ship", "digital", "pickup"},
Condition: getFulfillmentType,
})
Companion Documentation
Maintain schema documentation alongside files:
schemas/
order-processing.yaml
order-processing.md # Documents the schema
Schema Diagrams
For complex pipelines, create visual diagrams:
validate → [concurrent: enrich] → [filter: premium?] → [switch: payment] → complete
↓ ↓
premium-handler credit|paypal|crypto
Next Steps
- Hot Reloading - Dynamic schema updates
- Testing - Testing schema-driven pipelines
- Schema Format Reference - Complete specification