DAG authoring¶
A Leoflow DAG is two files in a project directory, compiled into an immutable
artifact (dag.json + a container image, versioned together โ ADR 0003).
dags/my_pipeline/
dag.py # real Apache Airflow SDK 3.2.x code
leoflow.yaml # Leoflow deploy config (not an Airflow file)
Scaffold one with leoflow init dags/my_pipeline.
Workspace layout (multi-DAG, Lite only)¶
Lite-specific. Multi-DAG workspace discovery and the hot-reload watcher exist only in
leoflow liteโ the developer-mode loop. Pro does not have a "workspace"; in Pro every DAG ships as its own image-and-dag.jsonpair, built by CI and registered vialeoflow push dag.json. See The development โ deploy lifecycle.
leoflow lite watches a workspace that can hold many DAGs as sibling
subdirectories. The default workspace is ~/leoflow/ (set by leoflow setup).
~/leoflow/ # workspace root
recurring_print/ # one DAG project per subdir
leoflow.yaml
dag.py
recurring_parallel/
leoflow.yaml
dag.py
ml/
train/ # nested subdirs are scanned too
leoflow.yaml
dag.py
Recommended layout (best practice, not enforced): name the subdirectory the
same as the dag_id. The binding is by dag_id (yaml field, or the subdir
basename when no yaml is present โ see below), but matching names make the
workspace navigable by humans and grep-friendly. ~/leoflow/sales_etl/ for a
DAG named sales_etl.
Discovery rules¶
leoflow lite walks the workspace and treats every subdirectory containing a
dag.py (with or without a leoflow.yaml) as a project. The scan:
- Goes at most 5 levels deep from the workspace root. A DAG at
<ws>/a/b/c/d/dag.pyis the deepest valid case. Deeper paths are skipped. - Skips the
exclude_pathsdefaults:.git,__pycache__,*.pyc,.venv,venv, and any other hidden directory (.*). - Fails loud on a duplicate
dag_id: if two subdirs resolve to the same id, lite refuses to compile any of them and prints both paths so you can rename one. There is no last-write-wins.
Every compile log line names the resolved config source โ either the absolute
path of the leoflow.yaml that was loaded, or auto-defaults: <subdir> when
none exists. This is the one line to grep when "which config did it pick up?"
is the debugging question.
leoflow.yaml is optional¶
A subdir with just a dag.py is a valid project. Leoflow synthesizes a config
with dag_id = <subdir-basename> and every other field filled from the schema
defaults (see Configuration โ Defaults). Add a
leoflow.yaml when you need to pin a Python version, declare dependencies, set
per-task overrides, or change the dag_id to something other than the subdir
name.
dag.py โ the Airflow dialect¶
dag.py is real Airflow SDK 3.2.x code, imported by the Python parser via the
Airflow DagBag. TaskFlow and classic operators both work:
from airflow.sdk import DAG, task
@task
def extract() -> dict:
return {"rows": 100}
@task
def transform(data: dict) -> dict:
return {"rows": data["rows"], "doubled": data["rows"] * 2}
with DAG("my_pipeline", schedule="@daily", catchup=False, tags=["etl"]):
transform(extract())
Supported task types¶
Leoflow accepts a closed set of three task types today. Anything outside the set is a hard compile error โ never silently dropped, never silently translated. The list is deliberately small so what runs in production matches what you wrote.
| Task type | Airflow operator | Where it runs | Notes |
|---|---|---|---|
python |
@task (TaskFlow) or PythonOperator |
agent in a pod (Pro) / subprocess (Lite) | The general-purpose escape hatch โ any provider library you pip install is callable from inside a @task. |
bash |
BashOperator |
agent | Executes a shell command. |
http_api |
HttpOperator (airflow.providers.http) |
inline in the control plane (no pod, no agent) | Synchronous outbound HTTP. Use it for lightweight webhooks/probes; for anything with retries, throughput, or auth headers you control, prefer @task + requests in python. |
Also supported:
- Trigger rules:
all_success,all_failed,all_done,one_success,one_failed. - XCom: TaskFlow data-flow (
transform(extract())) is resolved automatically into typed inputs (xcom_input). - Schedule: cron strings and presets (
@daily,0 * * * *). - Dependencies: linear ordering via TaskFlow calls or
a >> b.
Connector cookbooks (postgres, mysql, sqlite, redis, http) are all
pythontasks that read a managed Connection (AIRFLOW_CONN_*) injected as an env var โ they are not new operator types.
Not supported โ leoflow compile rejects these¶
If your DAG uses any of these, compile fails โ by design
The contract is loud rejection, not silent mistranslation: every "skipped" branch would otherwise actually execute at runtime, so a DAG that imports a sensor or interpolates a Jinja template is refused at compile with a clear error naming the construct (#225).
The unsupported set, with the things Airflow users most often expect to "just work" called out first:
- Sensors โ
FileSensor,S3KeySensor,ExternalTaskSensor, every customBaseSensorOperatorsubclass. Pre-alpha Leoflow has no sensor runtime; an async/sensor scheduler is post-alpha work. Workaround: poll inside a@taskand let the run drive itself. - Jinja templating โ
{{ ds }},{{ ti }},{{ var.value.x }}, everytemplates_dict=knob. The control plane never re-parses Python and the templating step is intentionally not implemented. Workaround: build the values inside the@taskfromairflow.sdkcontext. - Branching (
BranchPythonOperator,@task.branch) and short-circuit (@task.short_circuit,ShortCircuitOperator) โ refused for now: the current scheduler does not model skipped-vs-executed downstream paths. On the post-alpha backlog if user demand justifies the scheduler change. - Virtualenv operators (
PythonVirtualenvOperator,@task.virtualenv) โ refused because each DAG already ships as its own image with its own dependencies; spinning up a venv at runtime is the problem Leoflow's one-image-per-DAG model already solved. - Dynamic task mapping (
.expand/.partial) โ refused; map-reduce fan-out is on the post-alpha roadmap, tracked separately. - KubernetesPodOperator โ refused; the pod is the runtime substrate for every Leoflow task already, so wrapping a user task in another pod is redundant and adds an isolation hole.
- Datasets / Assets triggers โ not implemented yet; the asset graph is a 3.x Airflow feature on the post-alpha backlog.
- Provider operators (S3, Postgres, Snowflake, โฆ) โ refused. Do
the work inside a
@taskinstead; your DAG's image already has the libraries (declare them inleoflow.yaml'sdependencies). - Per-task
default_argsindag.pyare ignored at the parser level โ useleoflow.yaml'stasks.<id>:override block instead, which is checked at compile time.
leoflow.yaml โ deploy config¶
These are Leoflow concerns, not Airflow operator attributes (you cannot invent kwargs on an operator โ the parser imports real Airflow and would raise).
schema_version: "1.0"
dag_id: my_pipeline
description: Daily ETL.
owner: data-eng
tags: [etl]
python_version: "3.11"
dependencies: # pip packages baked into the image
- pandas==2.1.0
defaults: # DAG-level defaults (applied to every task)
retries: 1
retry_delay_seconds: 30
staging: # opt-in shared per-run RWX volume (ADR 0022)
enabled: false
tasks: # per-task overrides, keyed by task_id (ADR 0023)
transform:
retries: 3
resources:
requests: { cpu: "2", memory: 4Gi }
Binding + override layers (ADR 0023)¶
Config binds to the DAG by dag_id and to tasks by task_id. Three layers,
most specific wins:
tasks.<id>is merged at compile time onto the task indag.json.- Platform defaults are applied at dispatch time, filling only gaps the artifact left empty (keeps the artifact portable across clusters).
stagingis DAG-level only โ one RWX volume is shared atomically by the whole run, so it cannot be per-task.
Guardrails (fail loudly, never silently)¶
- A
tasks:entry naming atask_idabsent from the DAG โ compile error. - A duplicate
task_idkey in the YAML โ parse error. - Across a monorepo, a duplicate
dag_idis a CI-gate concern (one image per DAG).
The development โ deploy lifecycle¶
flowchart LR
I[leoflow init] --> D[leoflow lite<br/>hot-reload loop]
D -->|save & iterate| D
D --> G[git push]
G --> CI[CI: leoflow compile --build --push]
CI --> REG[leoflow push dag.json]
REG --> PROD[(control plane<br/>immutable artifact)]
1 ยท Develop (fast, isolated)¶
leoflow init dags/my_pipeline # scaffold dag.py + leoflow.yaml
leoflow lite dags/my_pipeline # cluster-mode: real pods on an isolated k3d
# or: leoflow lite --executor=subprocess dags/my_pipeline (fastest, host venv)
Open http://localhost:8088 โ the UI is marked Leoflow Lite (silver
edition badge); log in with the admin password generated by leoflow setup
(or run leoflow lite reset-password if you misplaced it). Edit
dag.py/leoflow.yaml and save; Leoflow recompiles, re-runs the guardrails,
and re-registers. A bad binding prints an error in the terminal immediately:
Dev is fully isolated from Demo/Pro (own database, cluster, and ports) โ no split brain. See Operating modes.
2 ยท Deploy (authoritative, immutable)¶
On git push, CI compiles + builds + pushes the artifact. The same parser,
overlay, and guardrails run as a gate, so what you tested in Dev is what ships:
leoflow compile dags/my_pipeline --image ghcr.io/org/my_pipeline:$GIT_SHA --build --push
leoflow push dag.json
Full, copy-pasteable pipelines for GitHub Actions, GitLab CI, Google Cloud Build/Run, and generic runners are in CI/CD & deploy examples.
See also: Concepts & glossary ยท Operating modes ยท HTTP API ยท ADR 0023 โ binding & overrides.