ADR 0034: Fan-in / map-reduce โ list-of-upstream parameter binding¶
Status: Accepted Date: 2026-06-01 Supersedes: none (closes #257 which proposed the same in deferred form)
Context¶
A common DAG topology โ and the default shape of most ML workloads โ is N independent map tasks โ one reducer task that aggregates their outputs:
Through v0.0.1-prealpha.22, the parser silently dropped the list-of-XComArg
argument because _bind_call_arguments only knew about single XComArg or
JSON literal. At runtime, the reducer ran with empty kwargs and crashed
with TypeError: select_best() missing 1 required positional argument:
'trials'. Both shipped example DAGs (fan_out_aggregate, montecarlo_pi)
were broken end-to-end.
We had two pre-implementation options on the table during alpha-prep:
- Loud reject at compile-time. Refuse fan-in with a clear error, delete the broken examples, defer the wiring post-alpha. Smallest surface change, honest failure, no false promises.
- Full pipeline implementation. Parser โ schema โ proto โ agent โ server. Touches every layer; reducer parameter receives the upstreams as a list, no manual XCom plumbing. Best UX, biggest surface change.
We chose option 2 because map-reduce is core to ML workflows (the project's documented target audience) and shipping an alpha that rejects the natural expression of fan-in would harm adoption more than the extra implementation risk costs.
This ADR records the design choices that made (2) feasible without future relitigation.
Decision¶
1. Activation criterion โ argument shape at the call site.¶
The parser activates fan-in when a parameter is bound to a list (or
tuple) where every element is a XComArg (a task call). Equivalent forms:
# All three behave identically:
select_best([trial(lr) for lr in LRs])
combine([estimate(0), estimate(1), estimate(2)])
report((extract_a(), extract_b()))
A mixed list (one XComArg + one literal) is silently dropped today and should become a loud error in a follow-up โ its semantics are ambiguous (per-element pull vs. JSON literal) and no current example needs it.
A list of pure literals (f(items=[1, 2, 3])) remains a JSON-literal
capture under call_args, unchanged.
2. Schema โ always a list.¶
xcom_input[param] is always a list of upstream task_ids, even for the
common single-upstream case. The pre-existing
{"value": "extract"} shape becomes {"value": ["extract"]}. We picked
list-always over polymorphic str | list[str] because:
- The proto wire format is
map<string, XComUpstreams>whereXComUpstreams { repeated string task_ids = 1 }; that's a list at the wire level, so the schema and the domain types should match. - Polymorphic JSON values force every consumer (parser tests, Go unmarshal, validation, future producers) to handle both forms โ bug surface.
- The user-facing experience is unchanged: a single-upstream reducer receives the value directly, not wrapped in a list. The list lives in the spec only.
3. Wire format โ a wrapper proto message, not a delimited string.¶
xcom_input_mapping on TaskSpec is map<string, XComUpstreams>. We
considered three alternatives and rejected them:
- Delimited string (
"a b c") โ type-abuse, breaks proto introspection tools, and silently failure-modes if the task_id ever contains the delimiter (which it shouldn't, but invariant-by-luck is fragile). - Polymorphic
oneof StringOrListโ Protobuf does not supportoneofinside a map value cleanly, and the union forces every consumer to branch. - Parallel field
xcom_input_many_mappingโ leaks the schema decision into the proto layer. Two fields where one suffices.
Wrapper messages are idiomatic in proto3 and travel well across language bindings.
4. Delivery โ JSON array in the env var.¶
The agent fetches each upstream's return_value via the existing
FetchXCom RPC (N round-trips), assembles them into a JSON array in
declaration order, and stamps LEOFLOW_XCOM_<PARAM>=[v0,v1,โฆ,vN-1]. The
runtime is unchanged โ _resolve_kwargs already JSON-decodes that env
var, so the function's list[T] parameter receives the list naturally.
We did not introduce a bulk FetchXComs RPC. The marginal cost (N gRPC
round-trips on the reducer's startup) is acceptable for the alpha's
typical N (4-20). A bulk RPC is a future optimisation if Pro deployments
push N higher.
5. Order โ declaration order, not finish order.¶
The reducer receives upstreams in the order they appear in the list comprehension (or explicit list), not the order they completed. This matches the natural mental model โ pairing indices with hyperparameters, fold numbers, or shard IDs survives the reorder-by-completion that other schedulers do.
6. Missing upstream โ null in the slot.¶
If an upstream pushed no XCom (Airflow semantics: a missing XCom resolves
to None), the agent emits null in its slot rather than skipping it or
failing the reducer. The reducer always receives len(upstreams)
elements; the user decides what null means.
trigger_rule="all_success" (the default) blocks the reducer entirely on
a failed upstream, so null only surfaces under all_done or other
opt-in trigger rules.
7. Scope โ same DagRun, same tenant.¶
Fan-in is scoped to N tasks in the same DagRun. The XCom lookup is keyed
by (tenant, dag_id, run_id, upstream_task_id, "return_value"). We did
not generalise to cross-DAG or cross-run aggregation; those are separate
problems (cross-DAG triggers, asset-driven scheduling) that would force a
different abstraction.
8. N is compile-time.¶
The parser captures the count of upstreams when the DAG is loaded. A
range(SHARDS) loop produces exactly SHARDS tasks. Dynamic mapping
(Airflow's .expand() โ N discovered at runtime, e.g. from a query
result) is explicitly rejected by the parser today. We reject it
loudly so a user does not assume it works; lifting that limit is post-alpha.
Consequences¶
Positive¶
- Map-reduce DAGs work end-to-end without manual XCom plumbing. The
reducer is an ordinary Python function with a
list[T]parameter. - The DAG source reads exactly like sequential code โ no Airflow API inside the task body, no XCom pull boilerplate.
- The schema is uniform (always-list) so future producers and consumers do not branch.
Negative¶
- Five layers move together (parser, schema, domain, proto, agent). A contributor adding a new operator must understand the contract end-to-end.
- The pre-existing schema shape (
xcom_input[param] = string) is a hard break. Acceptable because pre-alpha users reinstall on every prealpha tag (#90 in memory: fresh install resets datastore); afterv0.1.0-alpha.1cuts, further schema breaks need a migration step. - The 256 KB XCom ceiling now applies to the assembled array on the wire, not just each upstream's value. Reducers that pull many large payloads will hit it sooner.
Limitations recorded for follow-up¶
- Mixed lists (
[task(), 42, task()]) silently drop instead of erroring. - Dynamic mapping (
.expand()and friends) remains unsupported. - Cross-DAG fan-in and cross-run aggregation are out of scope.
- Bulk FetchXComs RPC to amortise the per-upstream gRPC round-trip cost on large N is a future optimisation.
Implementation¶
Shipped in PR #258 (commit 5db3fdf + 1d1b8ff):
- Parser:
_bind_call_argumentsdetects list-of-XComArg and emitsxcom_input[param] = [task_idsโฆ]. - Schema (
internal/domain/schemas/dag-schema.json, mirror indocs/api/):xcom_inputvalue typestringโarray[string],minItems: 1. - Go domain (
internal/domain/dag.go):XComInput map[string]stringโmap[string][]string. - Proto (
proto/agent.proto): newXComUpstreamswrapper, regenagent.pb.goviabuf generate. - Agent (
internal/agent/runner.go):buildEnviterates upstream lists; single-upstream stamps raw value, fan-in stamps JSON array, absent upstream โnull. - Server (
internal/agentrpc/server.go):declaresUpstreamaccepts list shape; helpertoXComUpstreamsMapconverts domain โ proto.
Tests: parser 41/41 (incl. one new fan-in test), Go all packages OK (incl. two new agent fan-in tests โ happy path + absent upstream โ null). Lint A+ (0 issues).
Examples: examples/ml_hparam_search/ (new), examples/fan_out_aggregate/
and examples/montecarlo_pi/ (restored functional). Cookbook page at
docs/cookbook/map-reduce.md.
Related¶
- ADR 0024 โ Parser structural shim: the dependency-free shim that captures DAG structure without running Airflow. Fan-in capture happens inside this layer.
- ADR 0032 โ Task return values stay out of log files: the durable storage for return values is XCom; the metadata-only log line announces them. Fan-in's chain-of-custody assumes this contract.
- Cookbook โ Map-reduce in Leoflow: the user-facing guide. This ADR is the immutable record; the cookbook is the living explanation.
- Issue #257: pre-implementation tracking issue, closed as superseded by PR #258.