executor¶
Package executor runs task instances via Kubernetes, Docker, a subprocess, or inline HTTP, selected by the Router (ADR 0002).
Index¶
- func BuildPod(req Request) *corev1.Pod
- func StagingClaimName(dagID, runID string) string
- type Executor
- type FailureReporter
- type InlineConfig
- type InlineHTTPExecutor
- func NewInlineHTTPExecutor(client *http.Client, maxRetries int) *InlineHTTPExecutor
- func (e *InlineHTTPExecutor) Execute(ctx context.Context, req Request) error
- func (e *InlineHTTPExecutor) Run(ctx context.Context, req Request) ([]byte, error)
- type InlineMetrics
- type InlineRunner
- func NewInlineRunner(cfg InlineConfig) *InlineRunner
- func (r *InlineRunner) Start(ctx context.Context, runID, dagID, tenantID string, tryNumber int, task domain.TaskSpec) (bool, error)
- func (r *InlineRunner) Wait()
- type KubernetesExecutor
- func NewKubernetesExecutor(clientset kubernetes.Interface, namespace string) *KubernetesExecutor
- func (e *KubernetesExecutor) Execute(ctx context.Context, req Request) error
- func (e *KubernetesExecutor) GCStagingClaims(ctx context.Context, ttl time.Duration) error
- func (e *KubernetesExecutor) SetStagingStore(s StagingStore)
- type Reconciler
- func NewReconciler(clientset kubernetes.Interface, namespace string, reporter FailureReporter) *Reconciler
- func (r *Reconciler) Reconcile(ctx context.Context) error
- type Request
- type Router
- func NewRouter(standard, inline Executor) *Router
- func (r *Router) ExecutorFor(operator string) Executor
- type StagingStore
- type StateSink
- type SubprocessExecutor
- func NewSubprocessExecutor(agentPath string, logger *slog.Logger) *SubprocessExecutor
- func (e *SubprocessExecutor) Execute(ctx context.Context, req Request) error
- func (e *SubprocessExecutor) SetWorkDir(dir string)
- type XComPusher
func BuildPod¶
BuildPod constructs the pod spec for a task instance. It is pure (modulo the random name suffix) and unit-tested independently of any cluster.
func StagingClaimName¶
StagingClaimName is the deterministic PVC name for a run's staging volume. It must be stable across retries and clear+re-run so the same PVC is re-attached (ADR 0022), and DNS-safe.
type Executor¶
Executor runs or dispatches a task. For synchronous executors (inline HTTP) the returned error reflects the task outcome; for asynchronous executors (Kubernetes/Docker/subprocess) it reflects dispatch, and the agent reports the final state over gRPC.
type FailureReporter¶
FailureReporter marks a task instance failed when its pod failed without the agent reporting. The implementation must be idempotent and only act on a non-terminal task instance.
type FailureReporter interface {
FailTask(ctx context.Context, taskInstanceID, reason string) error
}
type InlineConfig¶
InlineConfig bundles an InlineRunner's dependencies. XCom and Logs are optional; when nil, the corresponding shipping is skipped.
type InlineConfig struct {
Sink StateSink
Metrics InlineMetrics
XCom XComPusher
Logs logs.Sink
Concurrency int
MaxSeconds int
UserAgent string
}
type InlineHTTPExecutor¶
InlineHTTPExecutor runs http_api tasks in-process (no pod, no agent), with retries and a per-request timeout.
func NewInlineHTTPExecutor¶
NewInlineHTTPExecutor builds an executor with the given underlying client (nil uses a default) and retry count.
func (*InlineHTTPExecutor) Execute¶
Execute performs the request and returns nil on a success status, retrying transient failures with exponential backoff.
func (*InlineHTTPExecutor) Run¶
Run performs the request like Execute but also returns the response body of the successful attempt, so callers can capture it as an XCom value.
type InlineMetrics¶
InlineMetrics records inline task execution metrics.
type InlineMetrics interface {
RecordTaskTransition(from, to, dagID string)
RecordTaskDuration(dagID, taskID, taskType string, seconds float64)
}
type InlineRunner¶
InlineRunner executes http_api tasks declared with execution_mode: inline as goroutines in the control plane, bounded by a concurrency semaphore and a per-task duration cap (ADR 0002). On success it ships the response body as the task's return_value XCom and writes a summary log line.
func NewInlineRunner¶
NewInlineRunner builds an InlineRunner from the given config.
func (*InlineRunner) Start¶
func (r *InlineRunner) Start(ctx context.Context, runID, dagID, tenantID string, tryNumber int, task domain.TaskSpec) (bool, error)
Start begins inline execution of task. It returns (false, nil) when the concurrency limit is reached so the scheduler retries on the next tick, and (false, error) when the task is invalid for inline execution (timeout above the cap). On success it marks the task running, launches the work, and returns (true, nil).
func (*InlineRunner) Wait¶
Wait blocks until every in-flight inline task has finished. It is used for graceful shutdown and by tests.
type KubernetesExecutor¶
KubernetesExecutor runs each task as an ephemeral pod (ADR 0002).
func NewKubernetesExecutor¶
NewKubernetesExecutor builds an executor creating pods in the given namespace.
func (*KubernetesExecutor) Execute¶
Execute creates the task pod. The agent inside the pod reports state over gRPC.
func (*KubernetesExecutor) GCStagingClaims¶
GCStagingClaims reclaims per-run staging PVCs from the metadatabase-tracked lifecycle (ADR 0022): a successful run frees its volume immediately; a failed run keeps it until ttl elapses after the run's terminal time (clear+re-run safety); an orphaned volume (run row gone) is reclaimed. Each deletion is recorded with its reason. A no-op when no StagingStore is wired.
func (*KubernetesExecutor) SetStagingStore¶
SetStagingStore wires the metadatabase-backed staging-volume lifecycle store (ADR 0022). With no store set, provisioning is not recorded and GC is a no-op.
type Reconciler¶
Reconciler detects task pods that failed without the agent reporting state and marks the corresponding task instance failed, so retries and run finalization can proceed instead of stranding the task. It also garbage-collects finished pods once they age out.
func NewReconciler¶
func NewReconciler(clientset kubernetes.Interface, namespace string, reporter FailureReporter) *Reconciler
NewReconciler builds a Reconciler over the given cluster and failure reporter.
func (*Reconciler) Reconcile¶
Reconcile lists managed task pods, reports each failed one's task instance, and garbage-collects finished pods older than the grace period.
type Request¶
Request bundles everything an executor needs to run a single task instance.
type Request struct {
TaskInstanceID string
TenantID string
DagID string
RunID string
TaskID string
TryNumber int
Image string
ImagePullPolicy string
Operator string
Entrypoint string
Env map[string]string
Resources domain.Resources
Execution domain.Execution
TimeoutSeconds int
// Source is the dag.py text captured at compile time. The SubprocessExecutor
// materializes it to a per-TI temp dir so `python -m leoflow_runtime
// dag:<task>` can importlib it from there โ this is how multi-DAG Lite setups
// avoid the ModuleNotFoundError that hit Lima 2026-06-01 when the agent's
// global workdir didn't carry the user's dag.py. Empty for Pro (the
// container image already carries the source); ignored by the K8s executor.
Source string
// HTTPRequest is set for http_api tasks (run by InlineHTTPExecutor).
HTTPRequest *domain.HTTPRequest
// Agent connection details injected into the worker environment.
ControlPlaneAddr string
AgentToken string
// StagingClaim, when set, is the name of the per-DAG-run RWX PVC mounted at
// /staging in the task pod for large intermediate data shared across the run
// (ADR 0022). Empty means no staging volume. StagingSize/StagingStorageClass
// are used to provision the claim on first use.
StagingClaim string
StagingSize string
StagingStorageClass string
// StagingAccessMode is the PVC access mode (default ReadWriteMany; single-node
// dev uses ReadWriteOnce). Empty means ReadWriteMany.
StagingAccessMode string
// AgentTLSCAConfigMap, when set, is the name of a ConfigMap holding the CA
// (key ca.crt) the agent uses to verify the control plane's gRPC TLS cert
// (issue #58). It is mounted into the task pod and selects TLS for the agent.
AgentTLSCAConfigMap string
// TaskSecretName, when set, is a Kubernetes Secret mounted read-only into the
// task pod at TaskSecretMountPath. It carries a credential a task references by
// path (e.g. a GCP service-account key via the connection's key_path), keeping
// the key in the cluster's secret store rather than in Leoflow (ADR 0035).
TaskSecretName string
TaskSecretMountPath string
}
type Router¶
Router selects the executor for a task: http_api tasks run inline in the control plane; everything else uses the configured standard executor.
func NewRouter¶
NewRouter builds a Router over the standard (k8s/docker/subprocess) and inline HTTP executors.
func (*Router) ExecutorFor¶
ExecutorFor returns the executor responsible for a task of the given operator.
type StagingStore¶
StagingStore persists the per-run staging-volume lifecycle in the metadatabase (ADR 0022): provisioning records an active row, GC marks it deleted with a reason, and GC reads the active set joined with each run's state. Identified by the deterministic PVC name (unique per namespace).
type StagingStore interface {
RecordStagingVolume(ctx context.Context, tenantID, dagID, runID, pvcName, size string) error
MarkStagingDeleted(ctx context.Context, pvcName, reason string) error
ListActiveStagingVolumes(ctx context.Context) ([]domain.StagingVolumeState, error)
}
type StateSink¶
StateSink records the state transitions of an inline task.
type StateSink interface {
Transition(ctx context.Context, runID, taskID string, state domain.TaskState) error
}
type SubprocessExecutor¶
SubprocessExecutor runs the agent as a host subprocess with no isolation. It is for dev mode only and logs a prominent warning on construction.
func NewSubprocessExecutor¶
NewSubprocessExecutor builds a SubprocessExecutor running the given agent binary. It warns that user code runs unsandboxed. The per-DAG venv root is read from LEOFLOW_LITE_VENVS_ROOT at construction time so the executor can pick the right Python for each task without a follow-up call.
func (*SubprocessExecutor) Execute¶
Execute launches the agent subprocess and returns once it has started, like the Kubernetes executor creating a pod. The agent reports its own task state over gRPC, so the scheduler can record the task as queued before the agent finishes; running it synchronously here would let the agent report success before the scheduler recorded queued, and the queued write would clobber it. A non-zero exit is therefore NOT a synchronous error; only a failure to start is. The process is reaped in the background.
func (*SubprocessExecutor) SetWorkDir¶
SetWorkDir sets the working directory the agent runs in. In a task pod the image's WORKDIR holds the DAG code; on a dev host `leoflow dev` points this at the project directory so the agent can import the user's dag.py. Empty keeps the parent process's working directory.
type XComPusher¶
XComPusher stores an inline task's output as an XCom value.
type XComPusher interface {
Push(ctx context.Context, key xcom.Key, value []byte, contentType string, schema map[string]any) error
}
Generated by gomarkdoc