zoobzio December 11, 2025 Edit this page

Testing

Strategies for testing Flume pipelines at multiple levels.

Testing Levels

  1. Unit Tests - Test individual processors
  2. Schema Tests - Test schema validation
  3. Integration Tests - Test complete pipelines
  4. Property Tests - Test invariants across schemas

Unit Testing Processors

Test processors independently before registering:

func TestValidateOrder(t *testing.T) {
    processor := pipz.Apply("validate", validateOrder)

    tests := []struct {
        name    string
        input   Order
        wantErr bool
    }{
        {
            name:    "valid order",
            input:   Order{Total: 100},
            wantErr: false,
        },
        {
            name:    "zero total",
            input:   Order{Total: 0},
            wantErr: true,
        },
        {
            name:    "negative total",
            input:   Order{Total: -50},
            wantErr: true,
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            _, err := processor.Process(context.Background(), tt.input)
            if (err != nil) != tt.wantErr {
                t.Errorf("error = %v, wantErr %v", err, tt.wantErr)
            }
        })
    }
}

Testing Predicates

func TestIsPremium(t *testing.T) {
    pred := func(ctx context.Context, o Order) bool {
        return o.CustomerTier == "premium"
    }

    tests := []struct {
        tier string
        want bool
    }{
        {"premium", true},
        {"standard", false},
        {"", false},
    }

    for _, tt := range tests {
        got := pred(context.Background(), Order{CustomerTier: tt.tier})
        if got != tt.want {
            t.Errorf("isPremium(%q) = %v, want %v", tt.tier, got, tt.want)
        }
    }
}

Schema Validation Tests

Test that schemas are valid before deployment:

func TestSchemaValidation(t *testing.T) {
    factory := setupFactory()

    tests := []struct {
        name      string
        schema    string
        wantError bool
    }{
        {
            name: "valid sequence",
            schema: `
type: sequence
children:
  - ref: validate
  - ref: process`,
            wantError: false,
        },
        {
            name: "missing processor",
            schema: `
type: sequence
children:
  - ref: nonexistent`,
            wantError: true,
        },
        {
            name: "missing predicate",
            schema: `
type: filter
predicate: missing
then:
  ref: validate`,
            wantError: true,
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            _, err := factory.BuildFromYAML(tt.schema)
            if (err != nil) != tt.wantError {
                t.Errorf("BuildFromYAML() error = %v, wantError %v", err, tt.wantError)
            }
        })
    }
}

func setupFactory() *flume.Factory[Order] {
    factory := flume.New[Order]()
    factory.Add(
        pipz.Transform("validate", func(_ context.Context, o Order) Order { return o }),
        pipz.Transform("process", func(_ context.Context, o Order) Order { return o }),
    )
    factory.AddPredicate(flume.Predicate[Order]{
        Name:      "is-valid",
        Predicate: func(_ context.Context, o Order) bool { return o.Total > 0 },
    })
    return factory
}

CI/CD Schema Linting

Use ValidateSchemaStructure for CI/CD pipelines where a configured factory is not available. This validates schema syntax without checking if processors, predicates, or other references exist.

Lint Schema Files

func TestSchemaFilesStructure(t *testing.T) {
    files, _ := filepath.Glob("schemas/*.yaml")

    for _, file := range files {
        t.Run(filepath.Base(file), func(t *testing.T) {
            data, err := os.ReadFile(file)
            if err != nil {
                t.Fatalf("failed to read file: %v", err)
            }

            var schema flume.Schema
            if err := yaml.Unmarshal(data, &schema); err != nil {
                t.Fatalf("failed to parse YAML: %v", err)
            }

            // Structural validation - no factory needed
            if err := flume.ValidateSchemaStructure(schema); err != nil {
                t.Errorf("invalid schema structure: %v", err)
            }
        })
    }
}

CI Pipeline Example

# .github/workflows/lint-schemas.yml
name: Lint Schemas
on: [push, pull_request]
jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-go@v5
        with:
          go-version: '1.23'
      - run: go test -v -run TestSchemaFilesStructure ./...

What Each Validation Checks

CheckValidateSchemaStructureValidateSchema
Valid node typesYesYes
Required children/childYesYes
Valid durationsYesYes
Positive numbersYesYes
Processor refs existNoYes
Predicate refs existNoYes
Condition refs existNoYes
Channel refs existNoYes
Circular referencesNoYes

Use ValidateSchemaStructure in CI to catch syntax errors early, then ValidateSchema at runtime with a configured factory to catch reference errors.

Integration Testing

Test complete pipelines with realistic scenarios:

func TestOrderPipeline(t *testing.T) {
    factory := setupProductionFactory()

    schema := `
type: sequence
children:
  - ref: validate
  - type: filter
    predicate: high-value
    then:
      ref: premium-handler
  - ref: finalize`

    pipeline, err := factory.BuildFromYAML(schema)
    if err != nil {
        t.Fatalf("failed to build: %v", err)
    }

    tests := []struct {
        name  string
        order Order
        want  Order
    }{
        {
            name:  "standard order unchanged",
            order: Order{Total: 50},
            want:  Order{Total: 50, Status: "finalized"},
        },
        {
            name:  "high value gets premium treatment",
            order: Order{Total: 1000},
            want:  Order{Total: 1000, Discount: 0.1, Status: "finalized"},
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            result, err := pipeline.Process(context.Background(), tt.order)
            if err != nil {
                t.Fatalf("unexpected error: %v", err)
            }
            if result != tt.want {
                t.Errorf("got %+v, want %+v", result, tt.want)
            }
        })
    }
}

Testing Error Handling

Retry Behaviour

func TestRetryBehaviour(t *testing.T) {
    attempts := 0
    processor := pipz.Apply("flaky", func(_ context.Context, o Order) (Order, error) {
        attempts++
        if attempts < 3 {
            return o, errors.New("transient failure")
        }
        return o, nil
    })

    factory := flume.New[Order]()
    factory.Add(processor)

    schema := `
type: retry
attempts: 5
child:
  ref: flaky`

    pipeline, _ := factory.BuildFromYAML(schema)
    _, err := pipeline.Process(context.Background(), Order{})

    if err != nil {
        t.Errorf("expected success after retries, got: %v", err)
    }
    if attempts != 3 {
        t.Errorf("expected 3 attempts, got %d", attempts)
    }
}

Fallback Behaviour

func TestFallbackBehaviour(t *testing.T) {
    factory := flume.New[Order]()

    factory.Add(
        pipz.Apply("failing", func(_ context.Context, o Order) (Order, error) {
            return o, errors.New("primary failed")
        }),
        pipz.Transform("backup", func(_ context.Context, o Order) Order {
            o.Source = "backup"
            return o
        }),
    )

    schema := `
type: fallback
children:
  - ref: failing
  - ref: backup`

    pipeline, _ := factory.BuildFromYAML(schema)
    result, err := pipeline.Process(context.Background(), Order{})

    if err != nil {
        t.Errorf("expected success from fallback: %v", err)
    }
    if result.Source != "backup" {
        t.Error("expected fallback to be used")
    }
}

Timeout Behaviour

func TestTimeoutBehaviour(t *testing.T) {
    factory := flume.New[Order]()

    factory.Add(pipz.Apply("slow", func(ctx context.Context, o Order) (Order, error) {
        select {
        case <-time.After(5 * time.Second):
            return o, nil
        case <-ctx.Done():
            return o, ctx.Err()
        }
    }))

    schema := `
type: timeout
duration: "100ms"
child:
  ref: slow`

    pipeline, _ := factory.BuildFromYAML(schema)
    _, err := pipeline.Process(context.Background(), Order{})

    if !errors.Is(err, context.DeadlineExceeded) {
        t.Errorf("expected deadline exceeded, got: %v", err)
    }
}

Testing Hot Reload

func TestHotReload(t *testing.T) {
    factory := flume.New[Order]()
    factory.Add(
        pipz.Transform("v1", func(_ context.Context, o Order) Order {
            o.Version = "v1"
            return o
        }),
        pipz.Transform("v2", func(_ context.Context, o Order) Order {
            o.Version = "v2"
            return o
        }),
    )

    // Set initial schema
    factory.SetSchema("test", flume.Schema{
        Version: "1.0.0",
        Node:    flume.Node{Ref: "v1"},
    })

    // Create binding with auto-sync
    pipelineID := factory.Identity("test-pipeline", "Test pipeline")
    binding, _ := factory.Bind(pipelineID, "test", flume.WithAutoSync())

    // Process with v1
    result, _ := binding.Process(context.Background(), Order{})
    if result.Version != "v1" {
        t.Error("expected v1")
    }

    // Update schema - binding auto-syncs
    factory.SetSchema("test", flume.Schema{
        Version: "2.0.0",
        Node:    flume.Node{Ref: "v2"},
    })

    // Process with v2
    result, _ = binding.Process(context.Background(), Order{})
    if result.Version != "v2" {
        t.Error("expected v2")
    }
}

Concurrent Testing

func TestConcurrentPipelineAccess(t *testing.T) {
    factory := setupFactory()
    factory.SetSchema("test", testSchema)

    // Create binding with auto-sync
    pipelineID := factory.Identity("concurrent-pipeline", "Concurrent test")
    binding, _ := factory.Bind(pipelineID, "test", flume.WithAutoSync())

    var wg sync.WaitGroup
    errors := make(chan error, 100)

    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            _, err := binding.Process(context.Background(), Order{ID: fmt.Sprint(id)})
            if err != nil {
                errors <- err
            }
        }(i)
    }

    wg.Wait()
    close(errors)

    for err := range errors {
        t.Error(err)
    }
}

Test Helpers

The flume/testing package provides utilities:

import flumetesting "github.com/zoobz-io/flume/testing"

func TestWithHelpers(t *testing.T) {
    tf := flumetesting.NewTestFactory(t)

    // Register test processors
    tf.RegisterNoOpProcessors("validate", "process", "finalize")

    // Build and test
    pipeline, err := tf.Factory.BuildFromYAML(schema)
    tf.AssertNoError(err)

    result, err := pipeline.Process(context.Background(), flumetesting.TestData{ID: 1})
    tf.AssertNoError(err)
}

Best Practices

  1. Test processors in isolation before integration
  2. Use table-driven tests for schema variations
  3. Test error paths explicitly - don't just test happy paths
  4. Test concurrent access if using hot reload
  5. Keep test factories minimal - only register what's needed

Next Steps