The Migration Story#
Over the past few weeks, I’ve been migrating Prodigy—my Rust-based AI workflow orchestration tool—to use functional programming patterns from Stillwater, a library I built for applicative validation and effect handling in Rust.
The migration touched variable aggregation, environment access, and workflow execution. Not every change was revolutionary, but three patterns produced outsized benefits in testability, safety, and code clarity. This post breaks down each one with concrete before/after comparisons.
Pattern 1: Semigroup-Based Variable Aggregation#
The Problem: Prodigy’s MapReduce workflows aggregate results from parallel AI agents. Before the migration, aggregation logic was scattered across custom merge implementations—each aggregate type (count, sum, average, etc.) had its own ad-hoc combination logic with no consistency guarantees.
The Solution: Implement the Semigroup trait from Stillwater, which provides a single combine operation with a mathematical guarantee: associativity.
(a.combine(b)).combine(c) == a.combine((b.combine(c)))
This property means results can be combined in any order—essential for safe parallel aggregation.
Before: Ad-Hoc Merge Logic#
// Each aggregate type had custom merge logic
fn merge_counts(a: usize, b: usize) -> usize {
a + b
}
fn merge_averages(sum_a: f64, count_a: usize, sum_b: f64, count_b: usize) -> (f64, usize) {
(sum_a + sum_b, count_a + count_b)
}
fn merge_maps(mut a: HashMap<String, Value>, b: HashMap<String, Value>) -> HashMap<String, Value> {
for (k, v) in b {
a.entry(k).or_insert(v);
}
a
}
// No guarantee these operations are associative
// No property tests verifying correctness
// 15 different aggregate types, 15 different implementations
After: Unified Semigroup Implementation#
impl Semigroup for AggregateResult {
fn combine(self, other: Self) -> Self {
use AggregateResult::*;
match (self, other) {
// All 15 types follow the same pattern
(Count(a), Count(b)) => Count(a.saturating_add(b)),
(Sum(a), Sum(b)) => Sum(a + b),
(Average(sum_a, count_a), Average(sum_b, count_b)) => {
Average(sum_a + sum_b, count_a + count_b)
}
(Merge(mut a), Merge(b)) => {
for (k, v) in b {
a.entry(k).or_insert(v);
}
Merge(a)
}
// ... other types follow same structure
// Type mismatches handled at validation boundary
_ => unreachable!("Use combine_homogeneous for type safety")
}
}
}
The Key Insight: Validation at Boundaries#
The Semigroup combine only handles matching types. Type validation happens at the boundary using Stillwater’s homogeneous validation:
pub fn aggregate_map_results(
results: Vec<AggregateResult>,
) -> Validation<AggregateResult, Vec<TypeMismatchError>> {
if results.is_empty() {
return Validation::success(AggregateResult::Count(0));
}
combine_homogeneous(results, std::mem::discriminant, TypeMismatchError::new)
}
This accumulates all type mismatches rather than failing on the first one—a core principle of applicative validation.
Property Tests Verify Associativity#
proptest! {
#[test]
fn prop_count_associativity(a in 0usize..1000, b in 0usize..1000, c in 0usize..1000) {
let x = AggregateResult::Count(a);
let y = AggregateResult::Count(b);
let z = AggregateResult::Count(c);
// (x combine y) combine z == x combine (y combine z)
let left = x.clone().combine(y.clone()).combine(z.clone());
let right = x.combine(y.combine(z));
prop_assert_eq!(left, right);
}
}
All 15 aggregate types have property tests verifying associativity. This isn’t ceremony—it’s proof that parallel aggregation is safe.
Parallel Aggregation: Now Trivial#
With associativity guaranteed, parallel aggregation becomes a one-liner:
pub fn parallel_aggregate(results: Vec<AggregateResult>) -> Option<AggregateResult> {
results.into_par_iter().reduce_with(|a, b| a.combine(b))
}
Rayon’s reduce_with can split work arbitrarily because associativity guarantees the same result regardless of combination order.
The Gains#
| Metric | Before | After |
|---|---|---|
| Lines of code | ~450 | ~200 |
| Custom merge functions | 15 | 1 |
| Property tests | 0 | 33 |
| Panics in production | Possible | None (validation at boundaries) |
| Parallel-safe | Unknown | Proven via associativity |
Pattern 2: Reader Pattern for Environment Access#
The Problem: MapReduce workflow functions needed access to configuration, storage, executors, and other dependencies. The traditional approach threads these through every function signature:
async fn execute_agent(
item: &Value,
config: &MapConfig,
worktree_manager: &WorktreeManager,
executor: &CommandExecutor,
storage: &Storage,
job_id: &str,
max_parallel: usize,
) -> Result<AgentResult> { ... }
Six parameters just for dependencies. Adding a new dependency meant updating every function in the call chain.
The Solution: The Reader pattern via Stillwater’s Effect::asks, which extracts dependencies from an environment at runtime.
Before: Parameter Threading#
async fn execute_map_phase(
items: Vec<Value>,
config: &MapConfig,
worktree_mgr: &WorktreeManager,
executor: &CommandExecutor,
storage: &Storage,
job_id: &str,
max_parallel: usize,
) -> Result<Vec<AgentResult>> {
let mut results = Vec::new();
for item in items {
let result = execute_agent(
&item, config, worktree_mgr, executor, storage, job_id, max_parallel
).await?;
results.push(result);
}
Ok(results)
}
Every function explicitly passes dependencies to its callees. Tests require constructing all dependencies even when testing logic that doesn’t use them.
After: Reader Pattern with Effect::asks#
/// Get the worktree manager from the environment
pub fn get_worktree_manager() -> Effect<Arc<WorktreeManager>, MapReduceError, MapEnv> {
Effect::asks(|env: &MapEnv| env.worktree_manager.clone())
}
/// Get max parallel setting
pub fn get_max_parallel() -> Effect<usize, MapReduceError, MapEnv> {
Effect::asks(|env: &MapEnv| env.max_parallel)
}
/// Workflow code extracts what it needs
fn execute_agent(item: Value) -> Effect<AgentResult, MapReduceError, MapEnv> {
get_worktree_manager()
.and_then(|wt_mgr| create_worktree_effect(&item.id))
.and_then(|worktree| execute_commands_effect(&item, &worktree))
}
Dependencies are extracted from the environment when needed, not threaded through signatures.
Local Overrides: Temporarily Modify Context#
The Reader pattern also enables local modifications via Effect::local:
/// Run an effect with reduced concurrency
pub fn with_max_parallel<T: Send + 'static>(
max_parallel: usize,
effect: Effect<T, MapReduceError, MapEnv>,
) -> Effect<T, MapReduceError, MapEnv> {
Effect::local(
move |env: &MapEnv| MapEnv {
max_parallel,
..env.clone()
},
effect,
)
}
// Usage: run risky operations with limited concurrency
let effect = with_max_parallel(2, execute_agents(work_items));
The override only affects the wrapped effect—outer code sees the original environment.
Testing with Mock Environments#
#[tokio::test]
async fn test_get_max_parallel() {
let env = MockMapEnvBuilder::new()
.with_max_parallel(10)
.build();
let effect = get_max_parallel();
let result = effect.run(&env).await;
assert_eq!(result.unwrap(), 10);
}
#[tokio::test]
async fn test_local_changes_dont_leak() {
let env = MockMapEnvBuilder::new().with_max_parallel(5).build();
// Override inside effect
let inner = with_max_parallel(100, get_max_parallel());
assert_eq!(inner.run(&env).await.unwrap(), 100);
// Original environment unchanged
let outer = get_max_parallel();
assert_eq!(outer.run(&env).await.unwrap(), 5);
}
Tests use MockMapEnvBuilder to construct exactly the environment they need. No complex setup, no unused dependencies.
The Gains#
| Metric | Before | After |
|---|---|---|
| Average function parameters | 6-8 | 1-2 |
| Test setup boilerplate | High | Minimal (builder pattern) |
| Adding new dependencies | Change N signatures | Add one getter |
| Local overrides | Manual save/restore | Effect::local |
| Composition | Manual threading | and_then/map |
Pattern 3: Pure Core, Imperative Shell#
The Problem: Workflow execution mixed pure logic (variable expansion, command building, output parsing) with I/O operations (file access, shell commands, API calls). Testing required mocking I/O even for purely computational tests.
The Solution: Separate pure transformations into a pure/ module and I/O operations into an effects/ module. Pure functions are trivially testable; effects are testable with mock environments.
The Architecture#
src/cook/workflow/
├── pure/ # No I/O, no side effects
│ ├── command_builder.rs # Build commands from templates
│ ├── output_parser.rs # Parse command output
│ └── variable_expansion.rs # Expand ${VAR} patterns
└── effects/ # I/O encapsulated in Effects
├── claude.rs # Claude API interactions
├── shell.rs # Shell command execution
├── handler.rs # Custom handler execution
└── environment.rs # Effect environment
Pure Module: No Mocks Needed#
//! Pure workflow transformations module
//!
//! All functions in this module are:
//! - Pure: No I/O operations, no side effects
//! - Deterministic: Same inputs always produce same outputs
//! - Testable: No mocking required for unit tests
/// Expand variables in template string
pub fn expand_variables(template: &str, variables: &HashMap<String, String>) -> String {
let mut result = template.to_string();
// Expand ${VAR} first (more specific pattern)
for (key, value) in variables {
let placeholder = format!("${{{}}}", key);
result = result.replace(&placeholder, value);
}
// Expand $VAR with word boundaries
for (key, value) in variables {
result = expand_simple_var(&result, key, value);
}
result
}
Testing is straightforward:
#[test]
fn test_expand_variables_mixed() {
let template = "echo ${name} $value";
let vars: HashMap<String, String> = [
("name".into(), "test".into()),
("value".into(), "123".into()),
].iter().cloned().collect();
let result = expand_variables(template, &vars);
assert_eq!(result, "echo test 123");
}
No mock setup, no async runtime, no I/O stubs. Just input → output.
Property Tests for Pure Functions#
proptest! {
#[test]
fn prop_variable_expansion_is_deterministic(
template in ".*",
vars in prop::collection::hash_map(valid_var_name(), safe_value(), 0..5),
) {
let result1 = expand_variables(&template, &vars);
let result2 = expand_variables(&template, &vars);
prop_assert_eq!(result1, result2);
}
#[test]
fn prop_variable_expansion_idempotent_for_safe_values(
template in r"[a-zA-Z0-9 ${}_.,-]*",
vars in prop::collection::hash_map(valid_var_name(), r"[a-zA-Z0-9 _-]*", 0..3),
) {
let safe_vars: HashMap<String, String> = vars
.into_iter()
.filter(|(_, v)| !v.contains('$'))
.collect();
let result1 = expand_variables(&template, &safe_vars);
let result2 = expand_variables(&result1, &safe_vars);
// Idempotent when values don't contain variable references
prop_assert_eq!(result1, result2);
}
}
Property tests verify behavioral guarantees that unit tests might miss.
Effects Module: I/O in Composable Units#
//! Effect-based I/O operations for workflow execution
//!
//! The effects module separates concerns:
//! - **Pure logic** lives in `pure/` module
//! - **I/O effects** live here
//! - **Environment** provides dependencies via injection
pub fn execute_shell_command_effect(
command: &str,
vars: &HashMap<String, String>,
) -> Effect<CommandOutput, CommandError, WorkflowEnv> {
// 1. Pure: expand variables in command
let expanded = expand_variables(command, vars);
// 2. Effect: execute shell command
Effect::from_async(move |env: &WorkflowEnv| {
let shell = env.shell_executor.clone();
async move {
shell.execute(&expanded).await
.map_err(|e| CommandError::ExecutionFailed {
message: e.to_string(),
exit_code: None,
})
}
})
}
Effects compose via and_then:
let workflow_effect = execute_shell_command_effect("cargo build", &vars)
.and_then(|_| execute_shell_command_effect("cargo test", &vars))
.map(|result| process_output(result));
// Execute with environment
let output = workflow_effect.run(&env).await?;
The Gains#
| Metric | Before | After |
|---|---|---|
| Pure functions | Mixed with I/O | Isolated in pure/ |
| Unit tests requiring mocks | Most | Only effects/ |
| Property tests | Difficult | Natural |
| Code reuse | Limited | Pure functions composable |
| Testability | Mock-heavy | Mock-minimal |
Lessons Learned#
1. Mathematical Guarantees Pay Off#
The Semigroup trait’s associativity guarantee isn’t academic—it’s what makes parallel aggregation provably correct. Property tests verify the guarantee; production code depends on it.
2. Reader Pattern Scales#
Threading 6 parameters through 10 functions is manageable. Threading 12 parameters through 50 functions isn’t. The Reader pattern’s overhead is constant; parameter threading’s overhead scales with codebase size.
3. Pure Functions Are Easier to Test, Period#
Every function in pure/ can be tested with zero setup. No mocks, no async runtime, no environment construction. This isn’t ideology—it’s pragmatic reduction of test complexity.
4. Separation Creates Options#
With pure logic separated from effects, I can:
- Test pure logic exhaustively with property tests
- Test effects with minimal mock environments
- Swap effect implementations (real vs mock) without touching business logic
- Reason about pure and effectful code independently
5. The Migration Was Incremental#
None of this happened in one big refactor. Each spec (171, 174a-h, 175) was a focused change:
- Spec 171: Semigroup for aggregation
- Spec 174b: Pure workflow transformations
- Spec 174d: Effect modules
- Spec 175: Reader pattern helpers
Small, reviewable changes that compiled and passed tests at each step.
Conclusion#
The Stillwater migration to Prodigy wasn’t about adopting functional programming for its own sake. It was about solving real problems:
- Parallel aggregation needed to be provably correct → Semigroup’s associativity
- Dependency threading was becoming unmanageable → Reader pattern
- Testing required too much mock infrastructure → Pure core, imperative shell
Each pattern addressed a specific pain point with a proven solution. The result is code that’s more testable, more composable, and easier to reason about.
If you’re building Rust applications that deal with aggregation, dependency injection, or I/O-heavy workflows, consider whether these patterns might help. They’re not silver bullets, but they’re sharp tools for the right problems.
Resources#
Projects:
- Prodigy - AI workflow orchestration
- Stillwater - Applicative validation and effects for Rust