Skip to content

dispatch

import "github.com/neochaotic/leoflow/internal/dispatch"

Package dispatch launches pod-path task instances: it resolves a task's execution context, mints the agent's identity token, and routes the request to the executor. It implements scheduler.Dispatcher.

Index

Variables

ErrAtCapacity is returned by BufferedDispatcher.Dispatch when the buffered queue cannot accept another request. The scheduler treats it exactly like a transient inner-dispatcher error: log + metric + leave the TI scheduled so the next tick re-tries. It is the backpressure signal that bounds tick latency under load (ADR 0031: tick rate decoupled from executor latency).

var ErrAtCapacity = errors.New("dispatch buffer at capacity; will retry next tick")

type BufferConfig

BufferConfig sizes the BufferedDispatcher's worker pool. BufferSize=0 means "passthrough sync" (Lite mode, zero overhead): no goroutines spawned, no channel, the inner dispatcher is called inline. Any BufferSize>0 spawns max(Workers, 1) worker goroutines and a buffered channel of BufferSize slots.

type BufferConfig struct {
    BufferSize int
    Workers    int
}

type BufferedDispatcher

BufferedDispatcher fronts a synchronous Inner dispatcher with a bounded worker pool, so the scheduler tick is never blocked by a slow remote API call. ADR 0031: two-phase scheduler โ€” planning sync, dispatch async.

type BufferedDispatcher struct {
    // contains filtered or unexported fields
}

func NewBuffered

func NewBuffered(inner Inner, sink FailureSink, logger *slog.Logger, metrics MetricsRecorder, cfg BufferConfig) *BufferedDispatcher

NewBuffered constructs a BufferedDispatcher. BufferSize=0 returns a passthrough that is byte-for-byte equivalent to using the inner dispatcher directly (Lite path). BufferSize>0 spawns the worker pool.

func (*BufferedDispatcher) Close

func (b *BufferedDispatcher) Close()

Close stops accepting new dispatches, drains the in-flight queue, and waits for every worker to finish. Calling Close more than once is safe.

func (*BufferedDispatcher) Dispatch

func (b *BufferedDispatcher) Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error

Dispatch hands a task off to the inner dispatcher. In passthrough mode the inner call happens inline. In buffered mode the request is enqueued non- blockingly: success returns nil immediately (the scheduler then records the TI as `queued`); a full channel returns ErrAtCapacity (the scheduler treats it as a transient failure and leaves the TI scheduled).

type Dispatcher

Dispatcher builds executor requests for queued pod-path tasks and runs them.

type Dispatcher struct {
    // contains filtered or unexported fields
}

func NewDispatcher

func NewDispatcher(exec executor.Executor, resolver Resolver, issuer TokenIssuer, controlAddr string, tokenTTL time.Duration) *Dispatcher

NewDispatcher builds a Dispatcher that launches tasks via exec, resolves their context with resolver, mints tokens with issuer (valid for tokenTTL), and tells the agent to reach the control plane at controlAddr.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error

Dispatch resolves the task, mints its agent token, and executes it.

func (*Dispatcher) SetAgentTLSCAConfigMap

func (d *Dispatcher) SetAgentTLSCAConfigMap(name string)

SetAgentTLSCAConfigMap configures the CA ConfigMap mounted into task pods so agents verify the control plane's gRPC TLS cert (issue #58). Empty = the agent stays on the insecure channel (dev).

func (*Dispatcher) SetPlatformDefaults

func (d *Dispatcher) SetPlatformDefaults(p PlatformDefaults)

SetPlatformDefaults configures the per-cluster task defaults applied at dispatch to fill gaps the DAG artifact left empty (ADR 0023, layer L0).

func (*Dispatcher) SetTaskSecret

func (d *Dispatcher) SetTaskSecret(name, mountPath string)

SetTaskSecret configures a Kubernetes Secret mounted read-only into every task pod at mountPath, so tasks can read a credential (e.g. a GCP service-account key referenced by a connection's key_path) from the cluster's secret store rather than from Leoflow (ADR 0035). Empty name = nothing mounted.

type FailureSink

FailureSink lets a worker report that an asynchronously-dispatched task failed inside the inner dispatcher, so the scheduler can fail the TI with a clear reason. Without this callback a `queued` TI whose dispatch failed would sit forever (no reaper targets `queued`; ADR 0031 #128 only targets `running`).

type FailureSink interface {
    MarkTaskDispatchFailed(ctx context.Context, runID, taskID, reason string) error
}

type Inner

Inner is the underlying synchronous dispatcher BufferedDispatcher wraps โ€” matches scheduler.Dispatcher exactly so production wires through one type.

type Inner interface {
    Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
}

type MetricsRecorder

MetricsRecorder records dispatch-pool observability signals.

type MetricsRecorder interface {
    RecordDispatchQueueDepth(depth int)
    RecordDispatchAtCapacity()
    RecordDispatchLatencySeconds(seconds float64)
    RecordDispatchInnerError()
}

type PlatformDefaults

PlatformDefaults are per-cluster task defaults applied at dispatch to fill gaps the DAG artifact left empty (ADR 0023, layer L0). They are the lowest precedence (task override > DAG default > platform default) and never replace a value baked into dag.json, so the artifact stays portable across clusters.

type PlatformDefaults struct {
    // StagingSize/StagingStorageClass default the per-run staging volume when the
    // DAG enabled staging but did not pin them (e.g. the cluster's RWX class).
    StagingSize         string
    StagingStorageClass string
    // StagingAccessMode is the PVC access mode for the staging volume (default
    // ReadWriteMany; single-node dev uses ReadWriteOnce).
    StagingAccessMode string
    // Resources defaults a task's requests/limits when neither the task override
    // nor the DAG set any.
    Resources *domain.Resources
}

type Resolved

Resolved is the execution context the dispatcher needs to launch a task.

type Resolved struct {
    TaskInstanceID  string
    TenantID        string
    Image           string
    ImagePullPolicy string
    TryNumber       int
    // Staging carries the DAG's opt-in staging-volume config (ADR 0022); nil or
    // disabled means no per-run volume.
    Staging *domain.StagingConfig
    // Source is the dag.py text captured at compile time, threaded to the
    // SubprocessExecutor so Lite tasks can importlib their DAG without relying
    // on a globally-correct workdir. Empty for Pro (the container image carries
    // the source) and ignored by the KubernetesExecutor.
    Source string
}

type Resolver

Resolver loads a task instance's execution context from storage.

type Resolver interface {
    ResolveTask(ctx context.Context, runID, taskID string) (Resolved, error)
}

type TokenIssuer

TokenIssuer mints a per-task-instance agent token.

type TokenIssuer interface {
    IssueAgentToken(id auth.AgentIdentity, ttl time.Duration) (string, error)
}

Generated by gomarkdoc