scheduler¶
Package scheduler implements the Leoflow scheduling state machine and loop.
Index¶
- Constants
- func CanTransition(from, to domain.TaskState) bool
- func CanTransitionDagRun(from, to domain.DagRunState) bool
- func FinalizeRun(run RunState) (domain.DagRunState, bool)
- func IsAgentLost(c AgentLostCandidate, threshold time.Duration, now time.Time) bool
- func IsDispatchLost(c StaleQueuedCandidate, threshold time.Duration, now time.Time) bool
- func IsOrphaned(c ReapCandidate, threshold time.Duration, now time.Time) bool
- type AgentLostCandidate
- type DispatchLostReapStore
- type Dispatcher
- type HeartbeatReapStore
- type InlineRunner
- type Leader
- func NewLeader(pool *pgxpool.Pool) *Leader
- func (l *Leader) HoldsLock(ctx context.Context) (bool, error)
- func (l *Leader) Release(ctx context.Context) error
- func (l *Leader) TryAcquire(ctx context.Context) (bool, error)
- type PlannedTransition
- func PlanRun(run RunState) []PlannedTransition
- type ReapCandidate
- type ReapStore
- type Recorder
- type RunState
- type ScheduledDAG
- type Scheduler
- func NewScheduler(store Store, logger *slog.Logger, interval time.Duration) *Scheduler
- func (s *Scheduler) ClearSteppingDown()
- func (s *Scheduler) Heartbeat() (bool, time.Time)
- func (s *Scheduler) MarkSteppingDown(reason string)
- func (s *Scheduler) RecordReacquireSince(stepDownAt time.Time)
- func (s *Scheduler) Run(ctx context.Context) error
- func (s *Scheduler) SetAgentLostThreshold(d time.Duration)
- func (s *Scheduler) SetDispatchLostThreshold(d time.Duration)
- func (s *Scheduler) SetDispatcher(d Dispatcher)
- func (s *Scheduler) SetInlineRunner(r InlineRunner)
- func (s *Scheduler) SetLeading(on bool)
- func (s *Scheduler) SetOrphanThreshold(d time.Duration)
- func (s *Scheduler) SetRecorder(r Recorder)
- func (s *Scheduler) SetStepTimeout(d time.Duration)
- func (s *Scheduler) Step(ctx context.Context) error
- func (s *Scheduler) SteppingDown() bool
- type StaleQueuedCandidate
- type Store
- type TriggerDecision
- func EvaluateTriggerRule(rule domain.TriggerRule, upstreams []domain.TaskState) TriggerDecision
Constants¶
LockID is the fixed Postgres advisory-lock id gating scheduler leadership ("LeoFlow" in hex), per ADR 0009.
func CanTransition¶
CanTransition reports whether a task instance may move from one state to another under the Leoflow state machine.
func CanTransitionDagRun¶
CanTransitionDagRun reports whether a dag run may move from one state to another.
func FinalizeRun¶
FinalizeRun reports the terminal dag-run state once every task is terminal. A failed task that still has retry budget counts as non-terminal, so the run keeps running until the retry resolves. The boolean is false while any task is still non-terminal.
func IsAgentLost¶
IsAgentLost reports whether the agent has been silent long enough to be declared lost. A zero LastHeartbeat (never reported) is treated as alive, not lost β the TI may be inline (no agent ever exists), or simply has not completed its first interval yet. The reaper only fires on TIs that did heartbeat at least once and then went silent; this is the "do no harm" rule of ADR 0031. Future timestamps (clock skew) are treated as alive.
func IsDispatchLost¶
IsDispatchLost reports whether a queued TI has been waiting long enough to be declared dispatch-lost. A zero QueuedAt is treated as alive β a TI without that stamp is too poorly observed to reap defensively. Future timestamps (clock skew) are treated as alive. Mirrors IsAgentLost's "do no harm" rule (ADR 0031).
func IsOrphaned¶
IsOrphaned reports whether a running run has been quiet long enough to be declared orphaned. A zero LastActivity (no progress signal at all) counts as orphaned: a running run with no recorded activity since at least its started_at is, by definition, a run nothing is touching. Future timestamps (clock skew) are treated as fresh β the reaper is a backstop, not a clock arbiter, so it errs on the side of leaving recent-looking runs alone.
type AgentLostCandidate¶
AgentLostCandidate is one task instance in `running` whose agent may have gone silent, with the timestamp of its most recent heartbeat. The reaper compares the gap from this stamp to "now" against a stall threshold; a non-zero gap larger than the threshold means the agent is presumed gone and the TI is failed with reason "agent_lost".
type AgentLostCandidate struct {
TaskInstanceID string
DagRunID string
DagID string
TaskID string
LastHeartbeat time.Time
}
type DispatchLostReapStore¶
DispatchLostReapStore is the slice of scheduler.Store the dispatch-lost reaper needs. The full scheduler.Store embeds this interface so production wires through one type; unit tests fake just this surface.
type DispatchLostReapStore interface {
// ListStaleQueuedCandidates returns every `queued` TI alongside the
// timestamp it entered the queue. The threshold decision is purely in Go
// so the SQL stays simple.
ListStaleQueuedCandidates(ctx context.Context) ([]StaleQueuedCandidate, error)
// MarkTaskDispatchLost transitions one TI to `failed` with
// error_message='dispatch_lost'. The WHERE state='queued' guard makes
// this idempotent: a second call on a now-non-queued TI is a no-op.
MarkTaskDispatchLost(ctx context.Context, taskInstanceID string) error
}
type Dispatcher¶
Dispatcher launches a task instance for execution. The scheduler dispatches a task as it becomes queued; the concrete implementation builds the executor request and routes it to the right executor.
type Dispatcher interface {
Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
}
type HeartbeatReapStore¶
HeartbeatReapStore is the slice of scheduler.Store the TI heartbeat reaper needs. The full scheduler.Store embeds this interface so production wires through one type; unit tests fake just this surface.
type HeartbeatReapStore interface {
// ListAgentLostCandidates returns every `running` TI whose last heartbeat
// is non-null (it has heartbeated at least once). The reaper applies the
// threshold per candidate so the SQL stays simple and the decision is
// purely in Go.
ListAgentLostCandidates(ctx context.Context) ([]AgentLostCandidate, error)
// MarkTaskAgentLost transitions one TI to `failed` with
// error_message='agent_lost'. The WHERE state='running' guard makes this
// idempotent: a second call on a now-failed TI is a no-op.
MarkTaskAgentLost(ctx context.Context, taskInstanceID string) error
}
type InlineRunner¶
InlineRunner executes an inline http_api task out of band in the control plane. Start reports whether the task was launched (false without error means it should be retried on the next tick); the runner owns the task's state once started, so the scheduler does not record a queued transition for it.
type InlineRunner interface {
Start(ctx context.Context, runID, dagID, tenantID string, tryNumber int, task domain.TaskSpec) (started bool, err error)
}
type Leader¶
Leader acquires and releases the advisory lock that restricts the scheduler loop to a single replica. It must run on a dedicated single-connection pool so the session holding the lock is stable.
func NewLeader¶
NewLeader builds a Leader over a dedicated (single-connection) pool.
func (*Leader) HoldsLock¶
HoldsLock reports whether this leader's session still holds the advisory lock. The lock is session-scoped, so if the dedicated connection dropped (network blip, idle reap, lifetime recycle) and was replaced, the new session does not hold it and another replica may have taken over β this returns false, letting the caller step down instead of running on as a stale leader (the split-brain guard). A query error (connection down) is surfaced so the caller treats it as lost leadership too.
func (*Leader) Release¶
Release frees the scheduler advisory lock.
func (*Leader) TryAcquire¶
TryAcquire attempts to take the scheduler advisory lock without blocking.
type PlannedTransition¶
PlannedTransition is a decided state change for a task instance within a run.
func PlanRun¶
PlanRun computes the task transitions for one dag run. It first handles retries β a failed task with retry budget moves to up_for_retry, and an up_for_retry task resets (none, try_number+1) β then plans the rest off the resulting effective states: none -> scheduled (or skipped / upstream_failed per the trigger rule) and scheduled -> queued. A retriable failed task is treated as still active, so downstream tasks wait rather than seeing a failure. The result is deterministic: identical inputs yield identical output.
type ReapCandidate¶
ReapCandidate is one running dag run the reaper is considering, with the timestamp of its most recent observable activity (max of the run's started_at and its task instances' started_at / ended_at). The reaper compares the gap from this stamp to "now" against a stall threshold; a non-zero gap larger than the threshold means the run is orphaned and should be failed.
type ReapStore¶
ReapStore is the slice of scheduler.Store the reaper needs. The full scheduler.Store embeds this interface so production wires through one type; the unit tests fake just this surface.
type ReapStore interface {
// ListReapCandidates returns every dag_run currently in 'running' state
// alongside its last-activity timestamp. The query is the authority on what
// "running" means and how to compute the timestamp; the reaper only decides
// whether each one has been quiet for too long.
ListReapCandidates(ctx context.Context) ([]ReapCandidate, error)
// ReapRun transitions a run to 'failed' with an "orphaned" note and fails
// any still-active task instances. It is idempotent: a second call on the
// same run is a no-op.
ReapRun(ctx context.Context, runID string) error
}
type Recorder¶
Recorder records scheduler metrics. observability.Metrics implements it.
type Recorder interface {
RecordSchedulerDecision(decisionType string)
RecordTaskTransition(from, to, dagID string)
// RecordUndispatchable counts tasks queued with no executor to launch them.
RecordUndispatchable(reason string)
// RecordSchedulerStepDown counts a leader step-down by reason β the rate
// of this counter is the operator-facing SLI for leader-churn (#311). A
// nil Recorder is tolerated by the scheduler so tests need not stub it.
RecordSchedulerStepDown(reason string)
// ObserveSchedulerReacquire records the wall-clock duration of a step-down
// β re-acquire cycle (#311). Operators alert on P99 to spot churn that
// starts to delay scheduling latency.
ObserveSchedulerReacquire(d time.Duration)
}
type RunState¶
RunState is the scheduler's snapshot of a dag run: its topology and the current state of each task.
type RunState struct {
RunID string
DagID string
TenantID string
State domain.DagRunState
Tasks []domain.TaskSpec
States map[string]domain.TaskState
// Tries and MaxTries hold the current and maximum attempt counts per task,
// driving retry decisions. Absent entries mean no retry budget.
Tries map[string]int
MaxTries map[string]int
// EndedAt holds the failure timestamp per task (only meaningful for tasks
// currently in `up_for_retry`). Combined with RetryDelaySeconds + Now, it
// gates the `up_for_retry β none` transition so retries honor user-
// declared backoff (issue #201). Absent entries mean no cooldown applies.
EndedAt map[string]*time.Time
RetryDelaySeconds map[string]int
// Now is the wall-clock value the planner compares against EndedAt. Zero
// means "skip the cooldown gate" so legacy callers + tests that don't set
// retry_delay get the previous (immediate-retry) behavior.
Now time.Time
}
type ScheduledDAG¶
ScheduledDAG is a cron-scheduled DAG and the logical date of its latest run. Catchup and StartDate drive the per-tick catchup decision (#129): when a leader has been down across multiple slots, catchup=true backfills every missed slot while catchup=false jumps straight to the most recent one.
type ScheduledDAG struct {
DagID string
Schedule string
LastLogical *time.Time
StartDate *time.Time
Catchup bool
// MaxActiveRuns caps how many runs of this DAG may be active (queued or
// running) at once. Zero means "unlimited" (Airflow-faithful: a missing
// limit is unbounded). The scheduler enforces the cap in createDueRuns:
// once the active-run count reaches this value, additional due slots are
// skipped on this tick (issue #200).
MaxActiveRuns int
}
type Scheduler¶
Scheduler advances dag runs by applying the planning rules each tick.
func NewScheduler¶
NewScheduler builds a Scheduler over the given store, ticking every interval.
func (*Scheduler) ClearSteppingDown¶
ClearSteppingDown ends the step-down window opened by MarkSteppingDown. Idempotent β calling it when no step-down is active is a no-op.
func (*Scheduler) Heartbeat¶
Heartbeat reports whether the scheduling loop is live and when it last ticked. Only a leader is expected to tick, so a non-leader (a follower, or an instance that stepped down after losing the lock) reports healthy without ticking β it is correctly idle, not stalled. A leader is healthy during the startup grace (before its first tick) and while ticks stay within a small multiple of the loop interval; a stalled leader goes unhealthy so the UI/monitor surfaces it.
func (*Scheduler) MarkSteppingDown¶
MarkSteppingDown records that a graceful step-down has begun. The campaign loop calls this BEFORE canceling the scheduler's run-context, so any in-flight reaper/Step that returns "context canceled" inside the window logs at WARN (expected) instead of ERROR. It also increments the step-down counter labeled by reason, so operators can alert on the *rate* of churn (rate(...[5m])) instead of grep'ing log content. ClearSteppingDown closes the window; outside it, context.Canceled stays ERROR β the tripwire that catches an unexpected cancel a flat downgrade would silently swallow.
func (*Scheduler) RecordReacquireSince¶
RecordReacquireSince records the time spent stepped down (#311). It is called by the campaign loop immediately after a successful re-acquire, with the timestamp captured at the moment of step-down. A zero stepDownAt (no prior step-down β first acquisition at boot) is ignored.
func (*Scheduler) Run¶
Run drives the scheduling loop until ctx is canceled. The loop is crash-proof: a panic or error in a tick is recovered and logged, so the scheduler keeps ticking β it may fall behind, but it never dies (the critical invariant).
func (*Scheduler) SetAgentLostThreshold¶
SetAgentLostThreshold overrides the silence window the TI heartbeat reaper uses to declare a task's agent lost (optional; mainly for tests). The default is defaultAgentLostThreshold.
func (*Scheduler) SetDispatchLostThreshold¶
SetDispatchLostThreshold overrides the wait window the dispatch-lost reaper uses to declare a queued task's dispatch lost (optional; mainly for tests). The default is defaultDispatchLostThreshold.
func (*Scheduler) SetDispatcher¶
SetDispatcher attaches the executor dispatcher (optional; without it the scheduler advances state only and launches nothing).
func (*Scheduler) SetInlineRunner¶
SetInlineRunner attaches the inline http_api runner (optional; without it inline http_api tasks fall back to the standard queued dispatch path).
func (*Scheduler) SetLeading¶
SetLeading marks whether this instance currently holds scheduler leadership. The leadership manager sets it true while the loop runs and false when it steps down (lost lock) or stops. Becoming leader resets the tick clock so the startup grace applies afresh and a stale pre-step-down heartbeat is not mistaken for a stall. It governs Heartbeat: only a leader is expected to tick.
func (*Scheduler) SetOrphanThreshold¶
SetOrphanThreshold overrides the stall-detection window the reaper uses to declare a running dag run orphaned (optional; mainly for tests). The default is defaultOrphanThreshold.
func (*Scheduler) SetRecorder¶
SetRecorder attaches a metrics recorder (optional).
func (*Scheduler) SetStepTimeout¶
SetStepTimeout overrides the per-tick timeout (optional; mainly for tests).
func (*Scheduler) Step¶
Step runs one deterministic scheduling iteration over every active run. Each run is advanced in isolation (see advanceSafely): a panic or error in one run is contained, so it never blocks the other runs or new-run creation. The reaper runs independently of createDueRuns success β they share no dependency, and silencing the reaper when scheduling has a hiccup would let orphans accumulate exactly when the operator is most likely to notice the counter is wrong. The first non-nil infra-level error is returned (logged by the caller); the later phases still execute.
func (*Scheduler) SteppingDown¶
SteppingDown exposes the current step-down state for tests and callers that want to classify an error themselves (the four scheduler.go log sites call logSchedulerError with this value).
type StaleQueuedCandidate¶
StaleQueuedCandidate is one task instance in `queued` whose dispatch may have been lost β typically because the scheduler crashed mid-tick between committing the scheduledβqueued transition and actually dispatching the TI to an executor. The reaper compares the gap from QueuedAt to "now" against a dispatch-lost threshold; a non-zero gap larger than the threshold means the dispatch is presumed gone and the TI is failed with reason `dispatch_lost`. This unblocks the orphan-run reaper, which keeps stuck runs out of its candidate set as long as any TI looks active (#202).
type StaleQueuedCandidate struct {
TaskInstanceID string
DagRunID string
DagID string
TaskID string
QueuedAt time.Time
}
type Store¶
Store is the scheduler's view of persistent state. The concrete implementation is sqlc-backed; tests use a fake.
type Store interface {
ActiveRuns(ctx context.Context) ([]RunState, error)
MaterializeTasks(ctx context.Context, runID string, tasks []domain.TaskSpec) error
ApplyTransition(ctx context.Context, runID, taskID string, to domain.TaskState) error
// ResetForRetry returns a task to 'none' and increments its try number so a
// retry re-evaluates and re-runs it.
ResetForRetry(ctx context.Context, runID, taskID string) error
SetRunState(ctx context.Context, runID string, state domain.DagRunState) error
ScheduledDAGs(ctx context.Context) ([]ScheduledDAG, error)
CreateScheduledRun(ctx context.Context, dagID string, logical time.Time) error
// SetTaskNote attaches operational context to a task instance (shown in the
// UI), e.g. why it is queued but not running.
SetTaskNote(ctx context.Context, runID, taskID, note string) error
// ReapStore methods drive the orphan reaper (#120): they list running runs
// that have gone quiet and fail them so the dashboard counter is correct.
ReapStore
// HeartbeatReapStore methods drive the TI heartbeat reaper (#128): they
// list `running` task instances whose agent has gone silent and fail them
// as `agent_lost` so the dashboard counter recovers from agent-side crashes.
HeartbeatReapStore
// DispatchLostReapStore methods drive the dispatch-lost reaper (#202):
// they list `queued` task instances whose dispatch has been pending past
// the threshold and fail them as `dispatch_lost`. This unblocks the
// orphan-run reaper for runs stuck by a mid-tick scheduler crash, which
// would otherwise keep stuck queued TIs out of the candidate set forever
// (the orphan reaper's "no active TI" safety guard).
DispatchLostReapStore
}
type TriggerDecision¶
TriggerDecision is the scheduler's decision for a task given its trigger rule and the states of its upstream tasks.
Possible scheduler decisions for a task.
const (
// DecisionWait means the upstreams are not yet settled; re-evaluate later.
DecisionWait TriggerDecision = iota
// DecisionSchedule means dependencies are satisfied; move the task to scheduled.
DecisionSchedule
// DecisionSkip means the trigger rule can no longer be satisfied; skip the task.
DecisionSkip
// DecisionUpstreamFailed means a required upstream failed; propagate the failure.
DecisionUpstreamFailed
)
func EvaluateTriggerRule¶
EvaluateTriggerRule decides what to do with a task given its trigger rule and the current states of its upstream tasks. A task with no upstreams (a root task) is always scheduled.
Generated by gomarkdoc