dispatch¶
Package dispatch launches pod-path task instances: it resolves a task's execution context, mints the agent's identity token, and routes the request to the executor. It implements scheduler.Dispatcher.
Index¶
- Variables
- type BufferConfig
- type BufferedDispatcher
- func NewBuffered(inner Inner, sink FailureSink, logger *slog.Logger, metrics MetricsRecorder, cfg BufferConfig) *BufferedDispatcher
- func (b *BufferedDispatcher) Close()
- func (b *BufferedDispatcher) Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
- type Dispatcher
- func NewDispatcher(exec executor.Executor, resolver Resolver, issuer TokenIssuer, controlAddr string, tokenTTL time.Duration) *Dispatcher
- func (d *Dispatcher) Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
- func (d *Dispatcher) SetAgentTLSCAConfigMap(name string)
- func (d *Dispatcher) SetPlatformDefaults(p PlatformDefaults)
- func (d *Dispatcher) SetTaskSecret(name, mountPath string)
- type FailureSink
- type Inner
- type MetricsRecorder
- type PlatformDefaults
- type Resolved
- type Resolver
- type TokenIssuer
Variables¶
ErrAtCapacity is returned by BufferedDispatcher.Dispatch when the buffered queue cannot accept another request. The scheduler treats it exactly like a transient inner-dispatcher error: log + metric + leave the TI scheduled so the next tick re-tries. It is the backpressure signal that bounds tick latency under load (ADR 0031: tick rate decoupled from executor latency).
type BufferConfig¶
BufferConfig sizes the BufferedDispatcher's worker pool. BufferSize=0 means "passthrough sync" (Lite mode, zero overhead): no goroutines spawned, no channel, the inner dispatcher is called inline. Any BufferSize>0 spawns max(Workers, 1) worker goroutines and a buffered channel of BufferSize slots.
type BufferedDispatcher¶
BufferedDispatcher fronts a synchronous Inner dispatcher with a bounded worker pool, so the scheduler tick is never blocked by a slow remote API call. ADR 0031: two-phase scheduler โ planning sync, dispatch async.
func NewBuffered¶
func NewBuffered(inner Inner, sink FailureSink, logger *slog.Logger, metrics MetricsRecorder, cfg BufferConfig) *BufferedDispatcher
NewBuffered constructs a BufferedDispatcher. BufferSize=0 returns a passthrough that is byte-for-byte equivalent to using the inner dispatcher directly (Lite path). BufferSize>0 spawns the worker pool.
func (*BufferedDispatcher) Close¶
Close stops accepting new dispatches, drains the in-flight queue, and waits for every worker to finish. Calling Close more than once is safe.
func (*BufferedDispatcher) Dispatch¶
func (b *BufferedDispatcher) Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
Dispatch hands a task off to the inner dispatcher. In passthrough mode the inner call happens inline. In buffered mode the request is enqueued non- blockingly: success returns nil immediately (the scheduler then records the TI as `queued`); a full channel returns ErrAtCapacity (the scheduler treats it as a transient failure and leaves the TI scheduled).
type Dispatcher¶
Dispatcher builds executor requests for queued pod-path tasks and runs them.
func NewDispatcher¶
func NewDispatcher(exec executor.Executor, resolver Resolver, issuer TokenIssuer, controlAddr string, tokenTTL time.Duration) *Dispatcher
NewDispatcher builds a Dispatcher that launches tasks via exec, resolves their context with resolver, mints tokens with issuer (valid for tokenTTL), and tells the agent to reach the control plane at controlAddr.
func (*Dispatcher) Dispatch¶
Dispatch resolves the task, mints its agent token, and executes it.
func (*Dispatcher) SetAgentTLSCAConfigMap¶
SetAgentTLSCAConfigMap configures the CA ConfigMap mounted into task pods so agents verify the control plane's gRPC TLS cert (issue #58). Empty = the agent stays on the insecure channel (dev).
func (*Dispatcher) SetPlatformDefaults¶
SetPlatformDefaults configures the per-cluster task defaults applied at dispatch to fill gaps the DAG artifact left empty (ADR 0023, layer L0).
func (*Dispatcher) SetTaskSecret¶
SetTaskSecret configures a Kubernetes Secret mounted read-only into every task pod at mountPath, so tasks can read a credential (e.g. a GCP service-account key referenced by a connection's key_path) from the cluster's secret store rather than from Leoflow (ADR 0035). Empty name = nothing mounted.
type FailureSink¶
FailureSink lets a worker report that an asynchronously-dispatched task failed inside the inner dispatcher, so the scheduler can fail the TI with a clear reason. Without this callback a `queued` TI whose dispatch failed would sit forever (no reaper targets `queued`; ADR 0031 #128 only targets `running`).
type FailureSink interface {
MarkTaskDispatchFailed(ctx context.Context, runID, taskID, reason string) error
}
type Inner¶
Inner is the underlying synchronous dispatcher BufferedDispatcher wraps โ matches scheduler.Dispatcher exactly so production wires through one type.
type Inner interface {
Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
}
type MetricsRecorder¶
MetricsRecorder records dispatch-pool observability signals.
type MetricsRecorder interface {
RecordDispatchQueueDepth(depth int)
RecordDispatchAtCapacity()
RecordDispatchLatencySeconds(seconds float64)
RecordDispatchInnerError()
}
type PlatformDefaults¶
PlatformDefaults are per-cluster task defaults applied at dispatch to fill gaps the DAG artifact left empty (ADR 0023, layer L0). They are the lowest precedence (task override > DAG default > platform default) and never replace a value baked into dag.json, so the artifact stays portable across clusters.
type PlatformDefaults struct {
// StagingSize/StagingStorageClass default the per-run staging volume when the
// DAG enabled staging but did not pin them (e.g. the cluster's RWX class).
StagingSize string
StagingStorageClass string
// StagingAccessMode is the PVC access mode for the staging volume (default
// ReadWriteMany; single-node dev uses ReadWriteOnce).
StagingAccessMode string
// Resources defaults a task's requests/limits when neither the task override
// nor the DAG set any.
Resources *domain.Resources
}
type Resolved¶
Resolved is the execution context the dispatcher needs to launch a task.
type Resolved struct {
TaskInstanceID string
TenantID string
Image string
ImagePullPolicy string
TryNumber int
// Staging carries the DAG's opt-in staging-volume config (ADR 0022); nil or
// disabled means no per-run volume.
Staging *domain.StagingConfig
// Source is the dag.py text captured at compile time, threaded to the
// SubprocessExecutor so Lite tasks can importlib their DAG without relying
// on a globally-correct workdir. Empty for Pro (the container image carries
// the source) and ignored by the KubernetesExecutor.
Source string
}
type Resolver¶
Resolver loads a task instance's execution context from storage.
type Resolver interface {
ResolveTask(ctx context.Context, runID, taskID string) (Resolved, error)
}
type TokenIssuer¶
TokenIssuer mints a per-task-instance agent token.
type TokenIssuer interface {
IssueAgentToken(id auth.AgentIdentity, ttl time.Duration) (string, error)
}
Generated by gomarkdoc