Skip to content

executor

import "github.com/neochaotic/leoflow/internal/executor"

Package executor runs task instances via Kubernetes, Docker, a subprocess, or inline HTTP, selected by the Router (ADR 0002).

Index

func BuildPod

func BuildPod(req Request) *corev1.Pod

BuildPod constructs the pod spec for a task instance. It is pure (modulo the random name suffix) and unit-tested independently of any cluster.

func StagingClaimName

func StagingClaimName(dagID, runID string) string

StagingClaimName is the deterministic PVC name for a run's staging volume. It must be stable across retries and clear+re-run so the same PVC is re-attached (ADR 0022), and DNS-safe.

type Executor

Executor runs or dispatches a task. For synchronous executors (inline HTTP) the returned error reflects the task outcome; for asynchronous executors (Kubernetes/Docker/subprocess) it reflects dispatch, and the agent reports the final state over gRPC.

type Executor interface {
    Execute(ctx context.Context, req Request) error
}

type FailureReporter

FailureReporter marks a task instance failed when its pod failed without the agent reporting. The implementation must be idempotent and only act on a non-terminal task instance.

type FailureReporter interface {
    FailTask(ctx context.Context, taskInstanceID, reason string) error
}

type InlineConfig

InlineConfig bundles an InlineRunner's dependencies. XCom and Logs are optional; when nil, the corresponding shipping is skipped.

type InlineConfig struct {
    Sink        StateSink
    Metrics     InlineMetrics
    XCom        XComPusher
    Logs        logs.Sink
    Concurrency int
    MaxSeconds  int
    UserAgent   string
}

type InlineHTTPExecutor

InlineHTTPExecutor runs http_api tasks in-process (no pod, no agent), with retries and a per-request timeout.

type InlineHTTPExecutor struct {
    // contains filtered or unexported fields
}

func NewInlineHTTPExecutor

func NewInlineHTTPExecutor(client *http.Client, maxRetries int) *InlineHTTPExecutor

NewInlineHTTPExecutor builds an executor with the given underlying client (nil uses a default) and retry count.

func (*InlineHTTPExecutor) Execute

func (e *InlineHTTPExecutor) Execute(ctx context.Context, req Request) error

Execute performs the request and returns nil on a success status, retrying transient failures with exponential backoff.

func (*InlineHTTPExecutor) Run

func (e *InlineHTTPExecutor) Run(ctx context.Context, req Request) ([]byte, error)

Run performs the request like Execute but also returns the response body of the successful attempt, so callers can capture it as an XCom value.

type InlineMetrics

InlineMetrics records inline task execution metrics.

type InlineMetrics interface {
    RecordTaskTransition(from, to, dagID string)
    RecordTaskDuration(dagID, taskID, taskType string, seconds float64)
}

type InlineRunner

InlineRunner executes http_api tasks declared with execution_mode: inline as goroutines in the control plane, bounded by a concurrency semaphore and a per-task duration cap (ADR 0002). On success it ships the response body as the task's return_value XCom and writes a summary log line.

type InlineRunner struct {
    // contains filtered or unexported fields
}

func NewInlineRunner

func NewInlineRunner(cfg InlineConfig) *InlineRunner

NewInlineRunner builds an InlineRunner from the given config.

func (*InlineRunner) Start

func (r *InlineRunner) Start(ctx context.Context, runID, dagID, tenantID string, tryNumber int, task domain.TaskSpec) (bool, error)

Start begins inline execution of task. It returns (false, nil) when the concurrency limit is reached so the scheduler retries on the next tick, and (false, error) when the task is invalid for inline execution (timeout above the cap). On success it marks the task running, launches the work, and returns (true, nil).

func (*InlineRunner) Wait

func (r *InlineRunner) Wait()

Wait blocks until every in-flight inline task has finished. It is used for graceful shutdown and by tests.

type KubernetesExecutor

KubernetesExecutor runs each task as an ephemeral pod (ADR 0002).

type KubernetesExecutor struct {
    // contains filtered or unexported fields
}

func NewKubernetesExecutor

func NewKubernetesExecutor(clientset kubernetes.Interface, namespace string) *KubernetesExecutor

NewKubernetesExecutor builds an executor creating pods in the given namespace.

func (*KubernetesExecutor) Execute

func (e *KubernetesExecutor) Execute(ctx context.Context, req Request) error

Execute creates the task pod. The agent inside the pod reports state over gRPC.

func (*KubernetesExecutor) GCStagingClaims

func (e *KubernetesExecutor) GCStagingClaims(ctx context.Context, ttl time.Duration) error

GCStagingClaims reclaims per-run staging PVCs from the metadatabase-tracked lifecycle (ADR 0022): a successful run frees its volume immediately; a failed run keeps it until ttl elapses after the run's terminal time (clear+re-run safety); an orphaned volume (run row gone) is reclaimed. Each deletion is recorded with its reason. A no-op when no StagingStore is wired.

func (*KubernetesExecutor) SetStagingStore

func (e *KubernetesExecutor) SetStagingStore(s StagingStore)

SetStagingStore wires the metadatabase-backed staging-volume lifecycle store (ADR 0022). With no store set, provisioning is not recorded and GC is a no-op.

type Reconciler

Reconciler detects task pods that failed without the agent reporting state and marks the corresponding task instance failed, so retries and run finalization can proceed instead of stranding the task. It also garbage-collects finished pods once they age out.

type Reconciler struct {
    // contains filtered or unexported fields
}

func NewReconciler

func NewReconciler(clientset kubernetes.Interface, namespace string, reporter FailureReporter) *Reconciler

NewReconciler builds a Reconciler over the given cluster and failure reporter.

func (*Reconciler) Reconcile

func (r *Reconciler) Reconcile(ctx context.Context) error

Reconcile lists managed task pods, reports each failed one's task instance, and garbage-collects finished pods older than the grace period.

type Request

Request bundles everything an executor needs to run a single task instance.

type Request struct {
    TaskInstanceID string
    TenantID       string
    DagID          string
    RunID          string
    TaskID         string
    TryNumber      int

    Image           string
    ImagePullPolicy string
    Operator        string
    Entrypoint      string
    Env             map[string]string
    Resources       domain.Resources
    Execution       domain.Execution
    TimeoutSeconds  int

    // Source is the dag.py text captured at compile time. The SubprocessExecutor
    // materializes it to a per-TI temp dir so `python -m leoflow_runtime
    // dag:<task>` can importlib it from there โ€” this is how multi-DAG Lite setups
    // avoid the ModuleNotFoundError that hit Lima 2026-06-01 when the agent's
    // global workdir didn't carry the user's dag.py. Empty for Pro (the
    // container image already carries the source); ignored by the K8s executor.
    Source string

    // HTTPRequest is set for http_api tasks (run by InlineHTTPExecutor).
    HTTPRequest *domain.HTTPRequest

    // Agent connection details injected into the worker environment.
    ControlPlaneAddr string
    AgentToken       string

    // StagingClaim, when set, is the name of the per-DAG-run RWX PVC mounted at
    // /staging in the task pod for large intermediate data shared across the run
    // (ADR 0022). Empty means no staging volume. StagingSize/StagingStorageClass
    // are used to provision the claim on first use.
    StagingClaim        string
    StagingSize         string
    StagingStorageClass string
    // StagingAccessMode is the PVC access mode (default ReadWriteMany; single-node
    // dev uses ReadWriteOnce). Empty means ReadWriteMany.
    StagingAccessMode string

    // AgentTLSCAConfigMap, when set, is the name of a ConfigMap holding the CA
    // (key ca.crt) the agent uses to verify the control plane's gRPC TLS cert
    // (issue #58). It is mounted into the task pod and selects TLS for the agent.
    AgentTLSCAConfigMap string

    // TaskSecretName, when set, is a Kubernetes Secret mounted read-only into the
    // task pod at TaskSecretMountPath. It carries a credential a task references by
    // path (e.g. a GCP service-account key via the connection's key_path), keeping
    // the key in the cluster's secret store rather than in Leoflow (ADR 0035).
    TaskSecretName      string
    TaskSecretMountPath string
}

type Router

Router selects the executor for a task: http_api tasks run inline in the control plane; everything else uses the configured standard executor.

type Router struct {
    // contains filtered or unexported fields
}

func NewRouter

func NewRouter(standard, inline Executor) *Router

NewRouter builds a Router over the standard (k8s/docker/subprocess) and inline HTTP executors.

func (*Router) ExecutorFor

func (r *Router) ExecutorFor(operator string) Executor

ExecutorFor returns the executor responsible for a task of the given operator.

type StagingStore

StagingStore persists the per-run staging-volume lifecycle in the metadatabase (ADR 0022): provisioning records an active row, GC marks it deleted with a reason, and GC reads the active set joined with each run's state. Identified by the deterministic PVC name (unique per namespace).

type StagingStore interface {
    RecordStagingVolume(ctx context.Context, tenantID, dagID, runID, pvcName, size string) error
    MarkStagingDeleted(ctx context.Context, pvcName, reason string) error
    ListActiveStagingVolumes(ctx context.Context) ([]domain.StagingVolumeState, error)
}

type StateSink

StateSink records the state transitions of an inline task.

type StateSink interface {
    Transition(ctx context.Context, runID, taskID string, state domain.TaskState) error
}

type SubprocessExecutor

SubprocessExecutor runs the agent as a host subprocess with no isolation. It is for dev mode only and logs a prominent warning on construction.

type SubprocessExecutor struct {
    // contains filtered or unexported fields
}

func NewSubprocessExecutor

func NewSubprocessExecutor(agentPath string, logger *slog.Logger) *SubprocessExecutor

NewSubprocessExecutor builds a SubprocessExecutor running the given agent binary. It warns that user code runs unsandboxed. The per-DAG venv root is read from LEOFLOW_LITE_VENVS_ROOT at construction time so the executor can pick the right Python for each task without a follow-up call.

func (*SubprocessExecutor) Execute

func (e *SubprocessExecutor) Execute(ctx context.Context, req Request) error

Execute launches the agent subprocess and returns once it has started, like the Kubernetes executor creating a pod. The agent reports its own task state over gRPC, so the scheduler can record the task as queued before the agent finishes; running it synchronously here would let the agent report success before the scheduler recorded queued, and the queued write would clobber it. A non-zero exit is therefore NOT a synchronous error; only a failure to start is. The process is reaped in the background.

func (*SubprocessExecutor) SetWorkDir

func (e *SubprocessExecutor) SetWorkDir(dir string)

SetWorkDir sets the working directory the agent runs in. In a task pod the image's WORKDIR holds the DAG code; on a dev host `leoflow dev` points this at the project directory so the agent can import the user's dag.py. Empty keeps the parent process's working directory.

type XComPusher

XComPusher stores an inline task's output as an XCom value.

type XComPusher interface {
    Push(ctx context.Context, key xcom.Key, value []byte, contentType string, schema map[string]any) error
}

Generated by gomarkdoc