Skip to content

Python runtime API

The leoflow_runtime package runs your task callable inside the container and bridges its return value to XCom. It is installed in the DAG image (and the dev venv); your dag.py uses the Apache Airflow Task SDK (from airflow.sdk import DAG, task), and the agent invokes leoflow_runtime to execute the callable.

leoflow_runtime.runner

Run a user task callable and capture its return value.

return_value_path()

Return the path the task's return value is written to.

Overridable via LEOFLOW_RETURN_VALUE_PATH (primarily for tests).

Source code in runtime/python/leoflow_runtime/runner.py
def return_value_path() -> str:
    """Return the path the task's return value is written to.

    Overridable via ``LEOFLOW_RETURN_VALUE_PATH`` (primarily for tests).
    """
    return os.environ.get("LEOFLOW_RETURN_VALUE_PATH", DEFAULT_RETURN_VALUE_PATH)

run(entrypoint)

Import and call module:callable, writing a non-None return as JSON.

The agent reads the file and pushes it as the task's return_value XCom. A None return writes nothing, so downstream tasks see no XCom.

Emits three lifecycle lines so the UI's log panel is informative even when the user function is silent: loading <entrypoint>, resolved kwargs: {...} (when any), and returned <repr> (success) or no extra line on raise (the agent wraps the traceback). All three use [leoflow] prefix via :func:_lifecycle so they are distinguishable from user print().

Source code in runtime/python/leoflow_runtime/runner.py
def run(entrypoint: str) -> None:
    """Import and call ``module:callable``, writing a non-None return as JSON.

    The agent reads the file and pushes it as the task's ``return_value`` XCom.
    A None return writes nothing, so downstream tasks see no XCom.

    Emits three lifecycle lines so the UI's log panel is informative even when
    the user function is silent: ``loading <entrypoint>``, ``resolved kwargs:
    {...}`` (when any), and ``returned <repr>`` (success) or no extra line on
    raise (the agent wraps the traceback). All three use ``[leoflow]`` prefix
    via :func:`_lifecycle` so they are distinguishable from user ``print()``.
    """
    module_name, sep, fn_name = entrypoint.partition(":")
    if not sep or not module_name or not fn_name:
        raise ValueError(f"entrypoint must be 'module:callable', got {entrypoint!r}")

    _lifecycle(f"loading {entrypoint}")
    module = importlib.import_module(module_name)
    fn = getattr(module, fn_name)
    # Airflow TaskFlow @task decorators are not executed when called directly โ€”
    # calling them returns an XComArg (a task reference), not the function's
    # result. Unwrap to the underlying Python function so we run the user's code
    # and capture its real return value.
    if hasattr(fn, "function"):
        fn = fn.function
    kwargs = _resolve_kwargs(fn)
    if kwargs:
        # Log keys only โ€” the values can carry XCom-pulled secrets or any
        # user payload; per ADR 0032 they belong in the XCom tab, not in the
        # log file. The pulled lines above already report each XCom source
        # and its wire size so the operator can correlate.
        _lifecycle(f"resolved kwargs: {sorted(kwargs.keys())}")

    try:
        result = fn(**kwargs)
    except Exception as exc:  # noqa: BLE001 โ€” surface user errors to the log
        _lifecycle(f"user function {fn_name} raised {type(exc).__name__}: {exc}")
        # Stdout is line-buffered or `-u` unbuffered; flush stderr too so the
        # ordering in the log panel matches the wall-clock order.
        sys.stdout.flush()
        sys.stderr.flush()
        # Re-raise so Python's default handler emits the traceback to stderr โ€”
        # the agent captures it.
        raise

    if result is None:
        _lifecycle("returned None (no XCom pushed)")
        return
    # Mention the type + payload size so the user can confirm the right value
    # is leaving the task without dumping huge return values into the log.
    payload = json.dumps(result)
    _lifecycle(f"returned {type(result).__name__} ({len(payload)} B XCom)")
    with open(return_value_path(), "w", encoding="utf-8") as f:
        f.write(payload)

leoflow_runtime.xcom

Access XCom inputs injected into the task container by the agent.

xcom_pull(name, default=None)

Return the upstream XCom mapped to name, or default if absent.

The agent injects each declared input as LEOFLOW_XCOM_<NAME>=<json>; the name is matched case-insensitively.

Source code in runtime/python/leoflow_runtime/xcom.py
def xcom_pull(name: str, default: Any = None) -> Any:
    """Return the upstream XCom mapped to ``name``, or ``default`` if absent.

    The agent injects each declared input as ``LEOFLOW_XCOM_<NAME>=<json>``;
    the name is matched case-insensitively.
    """
    raw = os.environ.get(XCOM_ENV_PREFIX + name.upper())
    if raw is None:
        return default
    return json.loads(raw)