Skip to content

ADR 0034: Fan-in / map-reduce โ€” list-of-upstream parameter binding

Status: Accepted Date: 2026-06-01 Supersedes: none (closes #257 which proposed the same in deferred form)

Context

A common DAG topology โ€” and the default shape of most ML workloads โ€” is N independent map tasks โ†’ one reducer task that aggregates their outputs:

select_best([trial(lr) for lr in [0.001, 0.01, 0.05, 0.1, 0.5]])

Through v0.0.1-prealpha.22, the parser silently dropped the list-of-XComArg argument because _bind_call_arguments only knew about single XComArg or JSON literal. At runtime, the reducer ran with empty kwargs and crashed with TypeError: select_best() missing 1 required positional argument: 'trials'. Both shipped example DAGs (fan_out_aggregate, montecarlo_pi) were broken end-to-end.

We had two pre-implementation options on the table during alpha-prep:

  1. Loud reject at compile-time. Refuse fan-in with a clear error, delete the broken examples, defer the wiring post-alpha. Smallest surface change, honest failure, no false promises.
  2. Full pipeline implementation. Parser โ†’ schema โ†’ proto โ†’ agent โ†’ server. Touches every layer; reducer parameter receives the upstreams as a list, no manual XCom plumbing. Best UX, biggest surface change.

We chose option 2 because map-reduce is core to ML workflows (the project's documented target audience) and shipping an alpha that rejects the natural expression of fan-in would harm adoption more than the extra implementation risk costs.

This ADR records the design choices that made (2) feasible without future relitigation.

Decision

1. Activation criterion โ€” argument shape at the call site.

The parser activates fan-in when a parameter is bound to a list (or tuple) where every element is a XComArg (a task call). Equivalent forms:

# All three behave identically:
select_best([trial(lr) for lr in LRs])
combine([estimate(0), estimate(1), estimate(2)])
report((extract_a(), extract_b()))

A mixed list (one XComArg + one literal) is silently dropped today and should become a loud error in a follow-up โ€” its semantics are ambiguous (per-element pull vs. JSON literal) and no current example needs it.

A list of pure literals (f(items=[1, 2, 3])) remains a JSON-literal capture under call_args, unchanged.

2. Schema โ€” always a list.

xcom_input[param] is always a list of upstream task_ids, even for the common single-upstream case. The pre-existing {"value": "extract"} shape becomes {"value": ["extract"]}. We picked list-always over polymorphic str | list[str] because:

  • The proto wire format is map<string, XComUpstreams> where XComUpstreams { repeated string task_ids = 1 }; that's a list at the wire level, so the schema and the domain types should match.
  • Polymorphic JSON values force every consumer (parser tests, Go unmarshal, validation, future producers) to handle both forms โ€” bug surface.
  • The user-facing experience is unchanged: a single-upstream reducer receives the value directly, not wrapped in a list. The list lives in the spec only.

3. Wire format โ€” a wrapper proto message, not a delimited string.

xcom_input_mapping on TaskSpec is map<string, XComUpstreams>. We considered three alternatives and rejected them:

  • Delimited string ("abc") โ€” type-abuse, breaks proto introspection tools, and silently failure-modes if the task_id ever contains the delimiter (which it shouldn't, but invariant-by-luck is fragile).
  • Polymorphic oneof StringOrList โ€” Protobuf does not support oneof inside a map value cleanly, and the union forces every consumer to branch.
  • Parallel field xcom_input_many_mapping โ€” leaks the schema decision into the proto layer. Two fields where one suffices.

Wrapper messages are idiomatic in proto3 and travel well across language bindings.

4. Delivery โ€” JSON array in the env var.

The agent fetches each upstream's return_value via the existing FetchXCom RPC (N round-trips), assembles them into a JSON array in declaration order, and stamps LEOFLOW_XCOM_<PARAM>=[v0,v1,โ€ฆ,vN-1]. The runtime is unchanged โ€” _resolve_kwargs already JSON-decodes that env var, so the function's list[T] parameter receives the list naturally.

We did not introduce a bulk FetchXComs RPC. The marginal cost (N gRPC round-trips on the reducer's startup) is acceptable for the alpha's typical N (4-20). A bulk RPC is a future optimisation if Pro deployments push N higher.

5. Order โ€” declaration order, not finish order.

The reducer receives upstreams in the order they appear in the list comprehension (or explicit list), not the order they completed. This matches the natural mental model โ€” pairing indices with hyperparameters, fold numbers, or shard IDs survives the reorder-by-completion that other schedulers do.

6. Missing upstream โ€” null in the slot.

If an upstream pushed no XCom (Airflow semantics: a missing XCom resolves to None), the agent emits null in its slot rather than skipping it or failing the reducer. The reducer always receives len(upstreams) elements; the user decides what null means.

trigger_rule="all_success" (the default) blocks the reducer entirely on a failed upstream, so null only surfaces under all_done or other opt-in trigger rules.

7. Scope โ€” same DagRun, same tenant.

Fan-in is scoped to N tasks in the same DagRun. The XCom lookup is keyed by (tenant, dag_id, run_id, upstream_task_id, "return_value"). We did not generalise to cross-DAG or cross-run aggregation; those are separate problems (cross-DAG triggers, asset-driven scheduling) that would force a different abstraction.

8. N is compile-time.

The parser captures the count of upstreams when the DAG is loaded. A range(SHARDS) loop produces exactly SHARDS tasks. Dynamic mapping (Airflow's .expand() โ€” N discovered at runtime, e.g. from a query result) is explicitly rejected by the parser today. We reject it loudly so a user does not assume it works; lifting that limit is post-alpha.

Consequences

Positive

  • Map-reduce DAGs work end-to-end without manual XCom plumbing. The reducer is an ordinary Python function with a list[T] parameter.
  • The DAG source reads exactly like sequential code โ€” no Airflow API inside the task body, no XCom pull boilerplate.
  • The schema is uniform (always-list) so future producers and consumers do not branch.

Negative

  • Five layers move together (parser, schema, domain, proto, agent). A contributor adding a new operator must understand the contract end-to-end.
  • The pre-existing schema shape (xcom_input[param] = string) is a hard break. Acceptable because pre-alpha users reinstall on every prealpha tag (#90 in memory: fresh install resets datastore); after v0.1.0-alpha.1 cuts, further schema breaks need a migration step.
  • The 256 KB XCom ceiling now applies to the assembled array on the wire, not just each upstream's value. Reducers that pull many large payloads will hit it sooner.

Limitations recorded for follow-up

  • Mixed lists ([task(), 42, task()]) silently drop instead of erroring.
  • Dynamic mapping (.expand() and friends) remains unsupported.
  • Cross-DAG fan-in and cross-run aggregation are out of scope.
  • Bulk FetchXComs RPC to amortise the per-upstream gRPC round-trip cost on large N is a future optimisation.

Implementation

Shipped in PR #258 (commit 5db3fdf + 1d1b8ff):

  • Parser: _bind_call_arguments detects list-of-XComArg and emits xcom_input[param] = [task_idsโ€ฆ].
  • Schema (internal/domain/schemas/dag-schema.json, mirror in docs/api/): xcom_input value type string โ†’ array[string], minItems: 1.
  • Go domain (internal/domain/dag.go): XComInput map[string]string โ†’ map[string][]string.
  • Proto (proto/agent.proto): new XComUpstreams wrapper, regen agent.pb.go via buf generate.
  • Agent (internal/agent/runner.go): buildEnv iterates upstream lists; single-upstream stamps raw value, fan-in stamps JSON array, absent upstream โ†’ null.
  • Server (internal/agentrpc/server.go): declaresUpstream accepts list shape; helper toXComUpstreamsMap converts domain โ†’ proto.

Tests: parser 41/41 (incl. one new fan-in test), Go all packages OK (incl. two new agent fan-in tests โ€” happy path + absent upstream โ†’ null). Lint A+ (0 issues).

Examples: examples/ml_hparam_search/ (new), examples/fan_out_aggregate/ and examples/montecarlo_pi/ (restored functional). Cookbook page at docs/cookbook/map-reduce.md.