Skip to main content
Background Image
  1. Blog/

Three Patterns That Made Prodigy's Functional Migration Worth It

Author
Glen Baker
Building open source tooling
Table of Contents

The Migration Story
#

Over the past few weeks, I’ve been migrating Prodigy—my Rust-based AI workflow orchestration tool—to use functional programming patterns from Stillwater, a library I built for applicative validation and effect handling in Rust.

The migration touched variable aggregation, environment access, and workflow execution. Not every change was revolutionary, but three patterns produced outsized benefits in testability, safety, and code clarity. This post breaks down each one with concrete before/after comparisons.

Pattern 1: Semigroup-Based Variable Aggregation
#

The Problem: Prodigy’s MapReduce workflows aggregate results from parallel AI agents. Before the migration, aggregation logic was scattered across custom merge implementations—each aggregate type (count, sum, average, etc.) had its own ad-hoc combination logic with no consistency guarantees.

The Solution: Implement the Semigroup trait from Stillwater, which provides a single combine operation with a mathematical guarantee: associativity.

(a.combine(b)).combine(c) == a.combine((b.combine(c)))

This property means results can be combined in any order—essential for safe parallel aggregation.

Before: Ad-Hoc Merge Logic
#

// Each aggregate type had custom merge logic
fn merge_counts(a: usize, b: usize) -> usize {
    a + b
}

fn merge_averages(sum_a: f64, count_a: usize, sum_b: f64, count_b: usize) -> (f64, usize) {
    (sum_a + sum_b, count_a + count_b)
}

fn merge_maps(mut a: HashMap<String, Value>, b: HashMap<String, Value>) -> HashMap<String, Value> {
    for (k, v) in b {
        a.entry(k).or_insert(v);
    }
    a
}

// No guarantee these operations are associative
// No property tests verifying correctness
// 15 different aggregate types, 15 different implementations

After: Unified Semigroup Implementation
#

impl Semigroup for AggregateResult {
    fn combine(self, other: Self) -> Self {
        use AggregateResult::*;

        match (self, other) {
            // All 15 types follow the same pattern
            (Count(a), Count(b)) => Count(a.saturating_add(b)),
            (Sum(a), Sum(b)) => Sum(a + b),
            (Average(sum_a, count_a), Average(sum_b, count_b)) => {
                Average(sum_a + sum_b, count_a + count_b)
            }
            (Merge(mut a), Merge(b)) => {
                for (k, v) in b {
                    a.entry(k).or_insert(v);
                }
                Merge(a)
            }
            // ... other types follow same structure

            // Type mismatches handled at validation boundary
            _ => unreachable!("Use combine_homogeneous for type safety")
        }
    }
}

The Key Insight: Validation at Boundaries
#

The Semigroup combine only handles matching types. Type validation happens at the boundary using Stillwater’s homogeneous validation:

pub fn aggregate_map_results(
    results: Vec<AggregateResult>,
) -> Validation<AggregateResult, Vec<TypeMismatchError>> {
    if results.is_empty() {
        return Validation::success(AggregateResult::Count(0));
    }

    combine_homogeneous(results, std::mem::discriminant, TypeMismatchError::new)
}

This accumulates all type mismatches rather than failing on the first one—a core principle of applicative validation.

Property Tests Verify Associativity
#

proptest! {
    #[test]
    fn prop_count_associativity(a in 0usize..1000, b in 0usize..1000, c in 0usize..1000) {
        let x = AggregateResult::Count(a);
        let y = AggregateResult::Count(b);
        let z = AggregateResult::Count(c);

        // (x combine y) combine z == x combine (y combine z)
        let left = x.clone().combine(y.clone()).combine(z.clone());
        let right = x.combine(y.combine(z));

        prop_assert_eq!(left, right);
    }
}

All 15 aggregate types have property tests verifying associativity. This isn’t ceremony—it’s proof that parallel aggregation is safe.

Parallel Aggregation: Now Trivial
#

With associativity guaranteed, parallel aggregation becomes a one-liner:

pub fn parallel_aggregate(results: Vec<AggregateResult>) -> Option<AggregateResult> {
    results.into_par_iter().reduce_with(|a, b| a.combine(b))
}

Rayon’s reduce_with can split work arbitrarily because associativity guarantees the same result regardless of combination order.

The Gains
#

MetricBeforeAfter
Lines of code~450~200
Custom merge functions151
Property tests033
Panics in productionPossibleNone (validation at boundaries)
Parallel-safeUnknownProven via associativity

Pattern 2: Reader Pattern for Environment Access
#

The Problem: MapReduce workflow functions needed access to configuration, storage, executors, and other dependencies. The traditional approach threads these through every function signature:

async fn execute_agent(
    item: &Value,
    config: &MapConfig,
    worktree_manager: &WorktreeManager,
    executor: &CommandExecutor,
    storage: &Storage,
    job_id: &str,
    max_parallel: usize,
) -> Result<AgentResult> { ... }

Six parameters just for dependencies. Adding a new dependency meant updating every function in the call chain.

The Solution: The Reader pattern via Stillwater’s Effect::asks, which extracts dependencies from an environment at runtime.

Before: Parameter Threading
#

async fn execute_map_phase(
    items: Vec<Value>,
    config: &MapConfig,
    worktree_mgr: &WorktreeManager,
    executor: &CommandExecutor,
    storage: &Storage,
    job_id: &str,
    max_parallel: usize,
) -> Result<Vec<AgentResult>> {
    let mut results = Vec::new();
    for item in items {
        let result = execute_agent(
            &item, config, worktree_mgr, executor, storage, job_id, max_parallel
        ).await?;
        results.push(result);
    }
    Ok(results)
}

Every function explicitly passes dependencies to its callees. Tests require constructing all dependencies even when testing logic that doesn’t use them.

After: Reader Pattern with Effect::asks
#

/// Get the worktree manager from the environment
pub fn get_worktree_manager() -> Effect<Arc<WorktreeManager>, MapReduceError, MapEnv> {
    Effect::asks(|env: &MapEnv| env.worktree_manager.clone())
}

/// Get max parallel setting
pub fn get_max_parallel() -> Effect<usize, MapReduceError, MapEnv> {
    Effect::asks(|env: &MapEnv| env.max_parallel)
}

/// Workflow code extracts what it needs
fn execute_agent(item: Value) -> Effect<AgentResult, MapReduceError, MapEnv> {
    get_worktree_manager()
        .and_then(|wt_mgr| create_worktree_effect(&item.id))
        .and_then(|worktree| execute_commands_effect(&item, &worktree))
}

Dependencies are extracted from the environment when needed, not threaded through signatures.

Local Overrides: Temporarily Modify Context
#

The Reader pattern also enables local modifications via Effect::local:

/// Run an effect with reduced concurrency
pub fn with_max_parallel<T: Send + 'static>(
    max_parallel: usize,
    effect: Effect<T, MapReduceError, MapEnv>,
) -> Effect<T, MapReduceError, MapEnv> {
    Effect::local(
        move |env: &MapEnv| MapEnv {
            max_parallel,
            ..env.clone()
        },
        effect,
    )
}

// Usage: run risky operations with limited concurrency
let effect = with_max_parallel(2, execute_agents(work_items));

The override only affects the wrapped effect—outer code sees the original environment.

Testing with Mock Environments
#

#[tokio::test]
async fn test_get_max_parallel() {
    let env = MockMapEnvBuilder::new()
        .with_max_parallel(10)
        .build();

    let effect = get_max_parallel();
    let result = effect.run(&env).await;

    assert_eq!(result.unwrap(), 10);
}

#[tokio::test]
async fn test_local_changes_dont_leak() {
    let env = MockMapEnvBuilder::new().with_max_parallel(5).build();

    // Override inside effect
    let inner = with_max_parallel(100, get_max_parallel());
    assert_eq!(inner.run(&env).await.unwrap(), 100);

    // Original environment unchanged
    let outer = get_max_parallel();
    assert_eq!(outer.run(&env).await.unwrap(), 5);
}

Tests use MockMapEnvBuilder to construct exactly the environment they need. No complex setup, no unused dependencies.

The Gains
#

MetricBeforeAfter
Average function parameters6-81-2
Test setup boilerplateHighMinimal (builder pattern)
Adding new dependenciesChange N signaturesAdd one getter
Local overridesManual save/restoreEffect::local
CompositionManual threadingand_then/map

Pattern 3: Pure Core, Imperative Shell
#

The Problem: Workflow execution mixed pure logic (variable expansion, command building, output parsing) with I/O operations (file access, shell commands, API calls). Testing required mocking I/O even for purely computational tests.

The Solution: Separate pure transformations into a pure/ module and I/O operations into an effects/ module. Pure functions are trivially testable; effects are testable with mock environments.

The Architecture
#

src/cook/workflow/
├── pure/                           # No I/O, no side effects
│   ├── command_builder.rs          # Build commands from templates
│   ├── output_parser.rs            # Parse command output
│   └── variable_expansion.rs       # Expand ${VAR} patterns
└── effects/                        # I/O encapsulated in Effects
    ├── claude.rs                   # Claude API interactions
    ├── shell.rs                    # Shell command execution
    ├── handler.rs                  # Custom handler execution
    └── environment.rs              # Effect environment

Pure Module: No Mocks Needed
#

//! Pure workflow transformations module
//!
//! All functions in this module are:
//! - Pure: No I/O operations, no side effects
//! - Deterministic: Same inputs always produce same outputs
//! - Testable: No mocking required for unit tests

/// Expand variables in template string
pub fn expand_variables(template: &str, variables: &HashMap<String, String>) -> String {
    let mut result = template.to_string();

    // Expand ${VAR} first (more specific pattern)
    for (key, value) in variables {
        let placeholder = format!("${{{}}}", key);
        result = result.replace(&placeholder, value);
    }

    // Expand $VAR with word boundaries
    for (key, value) in variables {
        result = expand_simple_var(&result, key, value);
    }

    result
}

Testing is straightforward:

#[test]
fn test_expand_variables_mixed() {
    let template = "echo ${name} $value";
    let vars: HashMap<String, String> = [
        ("name".into(), "test".into()),
        ("value".into(), "123".into()),
    ].iter().cloned().collect();

    let result = expand_variables(template, &vars);
    assert_eq!(result, "echo test 123");
}

No mock setup, no async runtime, no I/O stubs. Just input → output.

Property Tests for Pure Functions
#

proptest! {
    #[test]
    fn prop_variable_expansion_is_deterministic(
        template in ".*",
        vars in prop::collection::hash_map(valid_var_name(), safe_value(), 0..5),
    ) {
        let result1 = expand_variables(&template, &vars);
        let result2 = expand_variables(&template, &vars);

        prop_assert_eq!(result1, result2);
    }

    #[test]
    fn prop_variable_expansion_idempotent_for_safe_values(
        template in r"[a-zA-Z0-9 ${}_.,-]*",
        vars in prop::collection::hash_map(valid_var_name(), r"[a-zA-Z0-9 _-]*", 0..3),
    ) {
        let safe_vars: HashMap<String, String> = vars
            .into_iter()
            .filter(|(_, v)| !v.contains('$'))
            .collect();

        let result1 = expand_variables(&template, &safe_vars);
        let result2 = expand_variables(&result1, &safe_vars);

        // Idempotent when values don't contain variable references
        prop_assert_eq!(result1, result2);
    }
}

Property tests verify behavioral guarantees that unit tests might miss.

Effects Module: I/O in Composable Units
#

//! Effect-based I/O operations for workflow execution
//!
//! The effects module separates concerns:
//! - **Pure logic** lives in `pure/` module
//! - **I/O effects** live here
//! - **Environment** provides dependencies via injection

pub fn execute_shell_command_effect(
    command: &str,
    vars: &HashMap<String, String>,
) -> Effect<CommandOutput, CommandError, WorkflowEnv> {
    // 1. Pure: expand variables in command
    let expanded = expand_variables(command, vars);

    // 2. Effect: execute shell command
    Effect::from_async(move |env: &WorkflowEnv| {
        let shell = env.shell_executor.clone();
        async move {
            shell.execute(&expanded).await
                .map_err(|e| CommandError::ExecutionFailed {
                    message: e.to_string(),
                    exit_code: None,
                })
        }
    })
}

Effects compose via and_then:

let workflow_effect = execute_shell_command_effect("cargo build", &vars)
    .and_then(|_| execute_shell_command_effect("cargo test", &vars))
    .map(|result| process_output(result));

// Execute with environment
let output = workflow_effect.run(&env).await?;

The Gains
#

MetricBeforeAfter
Pure functionsMixed with I/OIsolated in pure/
Unit tests requiring mocksMostOnly effects/
Property testsDifficultNatural
Code reuseLimitedPure functions composable
TestabilityMock-heavyMock-minimal

Lessons Learned
#

1. Mathematical Guarantees Pay Off
#

The Semigroup trait’s associativity guarantee isn’t academic—it’s what makes parallel aggregation provably correct. Property tests verify the guarantee; production code depends on it.

2. Reader Pattern Scales
#

Threading 6 parameters through 10 functions is manageable. Threading 12 parameters through 50 functions isn’t. The Reader pattern’s overhead is constant; parameter threading’s overhead scales with codebase size.

3. Pure Functions Are Easier to Test, Period
#

Every function in pure/ can be tested with zero setup. No mocks, no async runtime, no environment construction. This isn’t ideology—it’s pragmatic reduction of test complexity.

4. Separation Creates Options
#

With pure logic separated from effects, I can:

  • Test pure logic exhaustively with property tests
  • Test effects with minimal mock environments
  • Swap effect implementations (real vs mock) without touching business logic
  • Reason about pure and effectful code independently

5. The Migration Was Incremental
#

None of this happened in one big refactor. Each spec (171, 174a-h, 175) was a focused change:

  • Spec 171: Semigroup for aggregation
  • Spec 174b: Pure workflow transformations
  • Spec 174d: Effect modules
  • Spec 175: Reader pattern helpers

Small, reviewable changes that compiled and passed tests at each step.

Conclusion
#

The Stillwater migration to Prodigy wasn’t about adopting functional programming for its own sake. It was about solving real problems:

  1. Parallel aggregation needed to be provably correct → Semigroup’s associativity
  2. Dependency threading was becoming unmanageable → Reader pattern
  3. Testing required too much mock infrastructure → Pure core, imperative shell

Each pattern addressed a specific pain point with a proven solution. The result is code that’s more testable, more composable, and easier to reason about.

If you’re building Rust applications that deal with aggregation, dependency injection, or I/O-heavy workflows, consider whether these patterns might help. They’re not silver bullets, but they’re sharp tools for the right problems.


Resources
#

Projects:

  • Prodigy - AI workflow orchestration
  • Stillwater - Applicative validation and effects for Rust

Related

Automating Documentation Maintenance with Prodigy: A Real-World Case Study
Transforming ripgrep's Documentation with AI Automation and MkDocs
Stillwater - Pure Core, Imperative Shell for Rust
Mermaid-Sonar: Detecting Hidden Complexity in Diagram Documentation
Premortem - Configuration Validation for Rust
Debtmap - Rust Technical Debt Analyzer
Prodigy - AI Workflow Orchestration for Claude