Testing
Strategies for testing Flume pipelines at multiple levels.
Testing Levels
- Unit Tests - Test individual processors
- Schema Tests - Test schema validation
- Integration Tests - Test complete pipelines
- Property Tests - Test invariants across schemas
Unit Testing Processors
Test processors independently before registering:
func TestValidateOrder(t *testing.T) {
processor := pipz.Apply("validate", validateOrder)
tests := []struct {
name string
input Order
wantErr bool
}{
{
name: "valid order",
input: Order{Total: 100},
wantErr: false,
},
{
name: "zero total",
input: Order{Total: 0},
wantErr: true,
},
{
name: "negative total",
input: Order{Total: -50},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := processor.Process(context.Background(), tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
Testing Predicates
func TestIsPremium(t *testing.T) {
pred := func(ctx context.Context, o Order) bool {
return o.CustomerTier == "premium"
}
tests := []struct {
tier string
want bool
}{
{"premium", true},
{"standard", false},
{"", false},
}
for _, tt := range tests {
got := pred(context.Background(), Order{CustomerTier: tt.tier})
if got != tt.want {
t.Errorf("isPremium(%q) = %v, want %v", tt.tier, got, tt.want)
}
}
}
Schema Validation Tests
Test that schemas are valid before deployment:
func TestSchemaValidation(t *testing.T) {
factory := setupFactory()
tests := []struct {
name string
schema string
wantError bool
}{
{
name: "valid sequence",
schema: `
type: sequence
children:
- ref: validate
- ref: process`,
wantError: false,
},
{
name: "missing processor",
schema: `
type: sequence
children:
- ref: nonexistent`,
wantError: true,
},
{
name: "missing predicate",
schema: `
type: filter
predicate: missing
then:
ref: validate`,
wantError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := factory.BuildFromYAML(tt.schema)
if (err != nil) != tt.wantError {
t.Errorf("BuildFromYAML() error = %v, wantError %v", err, tt.wantError)
}
})
}
}
func setupFactory() *flume.Factory[Order] {
factory := flume.New[Order]()
factory.Add(
pipz.Transform("validate", func(_ context.Context, o Order) Order { return o }),
pipz.Transform("process", func(_ context.Context, o Order) Order { return o }),
)
factory.AddPredicate(flume.Predicate[Order]{
Name: "is-valid",
Predicate: func(_ context.Context, o Order) bool { return o.Total > 0 },
})
return factory
}
CI/CD Schema Linting
Use ValidateSchemaStructure for CI/CD pipelines where a configured factory is not available. This validates schema syntax without checking if processors, predicates, or other references exist.
Lint Schema Files
func TestSchemaFilesStructure(t *testing.T) {
files, _ := filepath.Glob("schemas/*.yaml")
for _, file := range files {
t.Run(filepath.Base(file), func(t *testing.T) {
data, err := os.ReadFile(file)
if err != nil {
t.Fatalf("failed to read file: %v", err)
}
var schema flume.Schema
if err := yaml.Unmarshal(data, &schema); err != nil {
t.Fatalf("failed to parse YAML: %v", err)
}
// Structural validation - no factory needed
if err := flume.ValidateSchemaStructure(schema); err != nil {
t.Errorf("invalid schema structure: %v", err)
}
})
}
}
CI Pipeline Example
# .github/workflows/lint-schemas.yml
name: Lint Schemas
on: [push, pull_request]
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.23'
- run: go test -v -run TestSchemaFilesStructure ./...
What Each Validation Checks
| Check | ValidateSchemaStructure | ValidateSchema |
|---|---|---|
| Valid node types | Yes | Yes |
| Required children/child | Yes | Yes |
| Valid durations | Yes | Yes |
| Positive numbers | Yes | Yes |
| Processor refs exist | No | Yes |
| Predicate refs exist | No | Yes |
| Condition refs exist | No | Yes |
| Channel refs exist | No | Yes |
| Circular references | No | Yes |
Use ValidateSchemaStructure in CI to catch syntax errors early, then ValidateSchema at runtime with a configured factory to catch reference errors.
Integration Testing
Test complete pipelines with realistic scenarios:
func TestOrderPipeline(t *testing.T) {
factory := setupProductionFactory()
schema := `
type: sequence
children:
- ref: validate
- type: filter
predicate: high-value
then:
ref: premium-handler
- ref: finalize`
pipeline, err := factory.BuildFromYAML(schema)
if err != nil {
t.Fatalf("failed to build: %v", err)
}
tests := []struct {
name string
order Order
want Order
}{
{
name: "standard order unchanged",
order: Order{Total: 50},
want: Order{Total: 50, Status: "finalized"},
},
{
name: "high value gets premium treatment",
order: Order{Total: 1000},
want: Order{Total: 1000, Discount: 0.1, Status: "finalized"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := pipeline.Process(context.Background(), tt.order)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result != tt.want {
t.Errorf("got %+v, want %+v", result, tt.want)
}
})
}
}
Testing Error Handling
Retry Behaviour
func TestRetryBehaviour(t *testing.T) {
attempts := 0
processor := pipz.Apply("flaky", func(_ context.Context, o Order) (Order, error) {
attempts++
if attempts < 3 {
return o, errors.New("transient failure")
}
return o, nil
})
factory := flume.New[Order]()
factory.Add(processor)
schema := `
type: retry
attempts: 5
child:
ref: flaky`
pipeline, _ := factory.BuildFromYAML(schema)
_, err := pipeline.Process(context.Background(), Order{})
if err != nil {
t.Errorf("expected success after retries, got: %v", err)
}
if attempts != 3 {
t.Errorf("expected 3 attempts, got %d", attempts)
}
}
Fallback Behaviour
func TestFallbackBehaviour(t *testing.T) {
factory := flume.New[Order]()
factory.Add(
pipz.Apply("failing", func(_ context.Context, o Order) (Order, error) {
return o, errors.New("primary failed")
}),
pipz.Transform("backup", func(_ context.Context, o Order) Order {
o.Source = "backup"
return o
}),
)
schema := `
type: fallback
children:
- ref: failing
- ref: backup`
pipeline, _ := factory.BuildFromYAML(schema)
result, err := pipeline.Process(context.Background(), Order{})
if err != nil {
t.Errorf("expected success from fallback: %v", err)
}
if result.Source != "backup" {
t.Error("expected fallback to be used")
}
}
Timeout Behaviour
func TestTimeoutBehaviour(t *testing.T) {
factory := flume.New[Order]()
factory.Add(pipz.Apply("slow", func(ctx context.Context, o Order) (Order, error) {
select {
case <-time.After(5 * time.Second):
return o, nil
case <-ctx.Done():
return o, ctx.Err()
}
}))
schema := `
type: timeout
duration: "100ms"
child:
ref: slow`
pipeline, _ := factory.BuildFromYAML(schema)
_, err := pipeline.Process(context.Background(), Order{})
if !errors.Is(err, context.DeadlineExceeded) {
t.Errorf("expected deadline exceeded, got: %v", err)
}
}
Testing Hot Reload
func TestHotReload(t *testing.T) {
factory := flume.New[Order]()
factory.Add(
pipz.Transform("v1", func(_ context.Context, o Order) Order {
o.Version = "v1"
return o
}),
pipz.Transform("v2", func(_ context.Context, o Order) Order {
o.Version = "v2"
return o
}),
)
// Set initial schema
factory.SetSchema("test", flume.Schema{
Version: "1.0.0",
Node: flume.Node{Ref: "v1"},
})
// Create binding with auto-sync
pipelineID := factory.Identity("test-pipeline", "Test pipeline")
binding, _ := factory.Bind(pipelineID, "test", flume.WithAutoSync())
// Process with v1
result, _ := binding.Process(context.Background(), Order{})
if result.Version != "v1" {
t.Error("expected v1")
}
// Update schema - binding auto-syncs
factory.SetSchema("test", flume.Schema{
Version: "2.0.0",
Node: flume.Node{Ref: "v2"},
})
// Process with v2
result, _ = binding.Process(context.Background(), Order{})
if result.Version != "v2" {
t.Error("expected v2")
}
}
Concurrent Testing
func TestConcurrentPipelineAccess(t *testing.T) {
factory := setupFactory()
factory.SetSchema("test", testSchema)
// Create binding with auto-sync
pipelineID := factory.Identity("concurrent-pipeline", "Concurrent test")
binding, _ := factory.Bind(pipelineID, "test", flume.WithAutoSync())
var wg sync.WaitGroup
errors := make(chan error, 100)
for i := 0; i < 100; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
_, err := binding.Process(context.Background(), Order{ID: fmt.Sprint(id)})
if err != nil {
errors <- err
}
}(i)
}
wg.Wait()
close(errors)
for err := range errors {
t.Error(err)
}
}
Test Helpers
The flume/testing package provides utilities:
import flumetesting "github.com/zoobz-io/flume/testing"
func TestWithHelpers(t *testing.T) {
tf := flumetesting.NewTestFactory(t)
// Register test processors
tf.RegisterNoOpProcessors("validate", "process", "finalize")
// Build and test
pipeline, err := tf.Factory.BuildFromYAML(schema)
tf.AssertNoError(err)
result, err := pipeline.Process(context.Background(), flumetesting.TestData{ID: 1})
tf.AssertNoError(err)
}
Best Practices
- Test processors in isolation before integration
- Use table-driven tests for schema variations
- Test error paths explicitly - don't just test happy paths
- Test concurrent access if using hot reload
- Keep test factories minimal - only register what's needed
Next Steps
- Observability - Monitor in production
- Testing Package Reference - Test utilities