Skip to content

scheduler

import "github.com/neochaotic/leoflow/internal/scheduler"

Package scheduler implements the Leoflow scheduling state machine and loop.

Index

Constants

LockID is the fixed Postgres advisory-lock id gating scheduler leadership ("LeoFlow" in hex), per ADR 0009.

const LockID int64 = 0x4C656F466C6F77

func CanTransition

func CanTransition(from, to domain.TaskState) bool

CanTransition reports whether a task instance may move from one state to another under the Leoflow state machine.

func CanTransitionDagRun

func CanTransitionDagRun(from, to domain.DagRunState) bool

CanTransitionDagRun reports whether a dag run may move from one state to another.

func FinalizeRun

func FinalizeRun(run RunState) (domain.DagRunState, bool)

FinalizeRun reports the terminal dag-run state once every task is terminal. A failed task that still has retry budget counts as non-terminal, so the run keeps running until the retry resolves. The boolean is false while any task is still non-terminal.

func IsAgentLost

func IsAgentLost(c AgentLostCandidate, threshold time.Duration, now time.Time) bool

IsAgentLost reports whether the agent has been silent long enough to be declared lost. A zero LastHeartbeat (never reported) is treated as alive, not lost β€” the TI may be inline (no agent ever exists), or simply has not completed its first interval yet. The reaper only fires on TIs that did heartbeat at least once and then went silent; this is the "do no harm" rule of ADR 0031. Future timestamps (clock skew) are treated as alive.

func IsDispatchLost

func IsDispatchLost(c StaleQueuedCandidate, threshold time.Duration, now time.Time) bool

IsDispatchLost reports whether a queued TI has been waiting long enough to be declared dispatch-lost. A zero QueuedAt is treated as alive β€” a TI without that stamp is too poorly observed to reap defensively. Future timestamps (clock skew) are treated as alive. Mirrors IsAgentLost's "do no harm" rule (ADR 0031).

func IsOrphaned

func IsOrphaned(c ReapCandidate, threshold time.Duration, now time.Time) bool

IsOrphaned reports whether a running run has been quiet long enough to be declared orphaned. A zero LastActivity (no progress signal at all) counts as orphaned: a running run with no recorded activity since at least its started_at is, by definition, a run nothing is touching. Future timestamps (clock skew) are treated as fresh β€” the reaper is a backstop, not a clock arbiter, so it errs on the side of leaving recent-looking runs alone.

type AgentLostCandidate

AgentLostCandidate is one task instance in `running` whose agent may have gone silent, with the timestamp of its most recent heartbeat. The reaper compares the gap from this stamp to "now" against a stall threshold; a non-zero gap larger than the threshold means the agent is presumed gone and the TI is failed with reason "agent_lost".

type AgentLostCandidate struct {
    TaskInstanceID string
    DagRunID       string
    DagID          string
    TaskID         string
    LastHeartbeat  time.Time
}

type DispatchLostReapStore

DispatchLostReapStore is the slice of scheduler.Store the dispatch-lost reaper needs. The full scheduler.Store embeds this interface so production wires through one type; unit tests fake just this surface.

type DispatchLostReapStore interface {
    // ListStaleQueuedCandidates returns every `queued` TI alongside the
    // timestamp it entered the queue. The threshold decision is purely in Go
    // so the SQL stays simple.
    ListStaleQueuedCandidates(ctx context.Context) ([]StaleQueuedCandidate, error)
    // MarkTaskDispatchLost transitions one TI to `failed` with
    // error_message='dispatch_lost'. The WHERE state='queued' guard makes
    // this idempotent: a second call on a now-non-queued TI is a no-op.
    MarkTaskDispatchLost(ctx context.Context, taskInstanceID string) error
}

type Dispatcher

Dispatcher launches a task instance for execution. The scheduler dispatches a task as it becomes queued; the concrete implementation builds the executor request and routes it to the right executor.

type Dispatcher interface {
    Dispatch(ctx context.Context, runID, dagID string, task domain.TaskSpec) error
}

type HeartbeatReapStore

HeartbeatReapStore is the slice of scheduler.Store the TI heartbeat reaper needs. The full scheduler.Store embeds this interface so production wires through one type; unit tests fake just this surface.

type HeartbeatReapStore interface {
    // ListAgentLostCandidates returns every `running` TI whose last heartbeat
    // is non-null (it has heartbeated at least once). The reaper applies the
    // threshold per candidate so the SQL stays simple and the decision is
    // purely in Go.
    ListAgentLostCandidates(ctx context.Context) ([]AgentLostCandidate, error)
    // MarkTaskAgentLost transitions one TI to `failed` with
    // error_message='agent_lost'. The WHERE state='running' guard makes this
    // idempotent: a second call on a now-failed TI is a no-op.
    MarkTaskAgentLost(ctx context.Context, taskInstanceID string) error
}

type InlineRunner

InlineRunner executes an inline http_api task out of band in the control plane. Start reports whether the task was launched (false without error means it should be retried on the next tick); the runner owns the task's state once started, so the scheduler does not record a queued transition for it.

type InlineRunner interface {
    Start(ctx context.Context, runID, dagID, tenantID string, tryNumber int, task domain.TaskSpec) (started bool, err error)
}

type Leader

Leader acquires and releases the advisory lock that restricts the scheduler loop to a single replica. It must run on a dedicated single-connection pool so the session holding the lock is stable.

type Leader struct {
    // contains filtered or unexported fields
}

func NewLeader

func NewLeader(pool *pgxpool.Pool) *Leader

NewLeader builds a Leader over a dedicated (single-connection) pool.

func (*Leader) HoldsLock

func (l *Leader) HoldsLock(ctx context.Context) (bool, error)

HoldsLock reports whether this leader's session still holds the advisory lock. The lock is session-scoped, so if the dedicated connection dropped (network blip, idle reap, lifetime recycle) and was replaced, the new session does not hold it and another replica may have taken over β€” this returns false, letting the caller step down instead of running on as a stale leader (the split-brain guard). A query error (connection down) is surfaced so the caller treats it as lost leadership too.

func (*Leader) Release

func (l *Leader) Release(ctx context.Context) error

Release frees the scheduler advisory lock.

func (*Leader) TryAcquire

func (l *Leader) TryAcquire(ctx context.Context) (bool, error)

TryAcquire attempts to take the scheduler advisory lock without blocking.

type PlannedTransition

PlannedTransition is a decided state change for a task instance within a run.

type PlannedTransition struct {
    TaskID string
    To     domain.TaskState
}

func PlanRun

func PlanRun(run RunState) []PlannedTransition

PlanRun computes the task transitions for one dag run. It first handles retries β€” a failed task with retry budget moves to up_for_retry, and an up_for_retry task resets (none, try_number+1) β€” then plans the rest off the resulting effective states: none -> scheduled (or skipped / upstream_failed per the trigger rule) and scheduled -> queued. A retriable failed task is treated as still active, so downstream tasks wait rather than seeing a failure. The result is deterministic: identical inputs yield identical output.

type ReapCandidate

ReapCandidate is one running dag run the reaper is considering, with the timestamp of its most recent observable activity (max of the run's started_at and its task instances' started_at / ended_at). The reaper compares the gap from this stamp to "now" against a stall threshold; a non-zero gap larger than the threshold means the run is orphaned and should be failed.

type ReapCandidate struct {
    RunID        string
    DagID        string
    LastActivity time.Time
}

type ReapStore

ReapStore is the slice of scheduler.Store the reaper needs. The full scheduler.Store embeds this interface so production wires through one type; the unit tests fake just this surface.

type ReapStore interface {
    // ListReapCandidates returns every dag_run currently in 'running' state
    // alongside its last-activity timestamp. The query is the authority on what
    // "running" means and how to compute the timestamp; the reaper only decides
    // whether each one has been quiet for too long.
    ListReapCandidates(ctx context.Context) ([]ReapCandidate, error)
    // ReapRun transitions a run to 'failed' with an "orphaned" note and fails
    // any still-active task instances. It is idempotent: a second call on the
    // same run is a no-op.
    ReapRun(ctx context.Context, runID string) error
}

type Recorder

Recorder records scheduler metrics. observability.Metrics implements it.

type Recorder interface {
    RecordSchedulerDecision(decisionType string)
    RecordTaskTransition(from, to, dagID string)
    // RecordUndispatchable counts tasks queued with no executor to launch them.
    RecordUndispatchable(reason string)
    // RecordSchedulerStepDown counts a leader step-down by reason β€” the rate
    // of this counter is the operator-facing SLI for leader-churn (#311). A
    // nil Recorder is tolerated by the scheduler so tests need not stub it.
    RecordSchedulerStepDown(reason string)
    // ObserveSchedulerReacquire records the wall-clock duration of a step-down
    // β†’ re-acquire cycle (#311). Operators alert on P99 to spot churn that
    // starts to delay scheduling latency.
    ObserveSchedulerReacquire(d time.Duration)
}

type RunState

RunState is the scheduler's snapshot of a dag run: its topology and the current state of each task.

type RunState struct {
    RunID    string
    DagID    string
    TenantID string
    State    domain.DagRunState
    Tasks    []domain.TaskSpec
    States   map[string]domain.TaskState
    // Tries and MaxTries hold the current and maximum attempt counts per task,
    // driving retry decisions. Absent entries mean no retry budget.
    Tries    map[string]int
    MaxTries map[string]int
    // EndedAt holds the failure timestamp per task (only meaningful for tasks
    // currently in `up_for_retry`). Combined with RetryDelaySeconds + Now, it
    // gates the `up_for_retry β†’ none` transition so retries honor user-
    // declared backoff (issue #201). Absent entries mean no cooldown applies.
    EndedAt           map[string]*time.Time
    RetryDelaySeconds map[string]int
    // Now is the wall-clock value the planner compares against EndedAt. Zero
    // means "skip the cooldown gate" so legacy callers + tests that don't set
    // retry_delay get the previous (immediate-retry) behavior.
    Now time.Time
}

type ScheduledDAG

ScheduledDAG is a cron-scheduled DAG and the logical date of its latest run. Catchup and StartDate drive the per-tick catchup decision (#129): when a leader has been down across multiple slots, catchup=true backfills every missed slot while catchup=false jumps straight to the most recent one.

type ScheduledDAG struct {
    DagID       string
    Schedule    string
    LastLogical *time.Time
    StartDate   *time.Time
    Catchup     bool
    // MaxActiveRuns caps how many runs of this DAG may be active (queued or
    // running) at once. Zero means "unlimited" (Airflow-faithful: a missing
    // limit is unbounded). The scheduler enforces the cap in createDueRuns:
    // once the active-run count reaches this value, additional due slots are
    // skipped on this tick (issue #200).
    MaxActiveRuns int
}

type Scheduler

Scheduler advances dag runs by applying the planning rules each tick.

type Scheduler struct {
    // contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(store Store, logger *slog.Logger, interval time.Duration) *Scheduler

NewScheduler builds a Scheduler over the given store, ticking every interval.

func (*Scheduler) ClearSteppingDown

func (s *Scheduler) ClearSteppingDown()

ClearSteppingDown ends the step-down window opened by MarkSteppingDown. Idempotent β€” calling it when no step-down is active is a no-op.

func (*Scheduler) Heartbeat

func (s *Scheduler) Heartbeat() (bool, time.Time)

Heartbeat reports whether the scheduling loop is live and when it last ticked. Only a leader is expected to tick, so a non-leader (a follower, or an instance that stepped down after losing the lock) reports healthy without ticking β€” it is correctly idle, not stalled. A leader is healthy during the startup grace (before its first tick) and while ticks stay within a small multiple of the loop interval; a stalled leader goes unhealthy so the UI/monitor surfaces it.

func (*Scheduler) MarkSteppingDown

func (s *Scheduler) MarkSteppingDown(reason string)

MarkSteppingDown records that a graceful step-down has begun. The campaign loop calls this BEFORE canceling the scheduler's run-context, so any in-flight reaper/Step that returns "context canceled" inside the window logs at WARN (expected) instead of ERROR. It also increments the step-down counter labeled by reason, so operators can alert on the *rate* of churn (rate(...[5m])) instead of grep'ing log content. ClearSteppingDown closes the window; outside it, context.Canceled stays ERROR β€” the tripwire that catches an unexpected cancel a flat downgrade would silently swallow.

func (*Scheduler) RecordReacquireSince

func (s *Scheduler) RecordReacquireSince(stepDownAt time.Time)

RecordReacquireSince records the time spent stepped down (#311). It is called by the campaign loop immediately after a successful re-acquire, with the timestamp captured at the moment of step-down. A zero stepDownAt (no prior step-down β€” first acquisition at boot) is ignored.

func (*Scheduler) Run

func (s *Scheduler) Run(ctx context.Context) error

Run drives the scheduling loop until ctx is canceled. The loop is crash-proof: a panic or error in a tick is recovered and logged, so the scheduler keeps ticking β€” it may fall behind, but it never dies (the critical invariant).

func (*Scheduler) SetAgentLostThreshold

func (s *Scheduler) SetAgentLostThreshold(d time.Duration)

SetAgentLostThreshold overrides the silence window the TI heartbeat reaper uses to declare a task's agent lost (optional; mainly for tests). The default is defaultAgentLostThreshold.

func (*Scheduler) SetDispatchLostThreshold

func (s *Scheduler) SetDispatchLostThreshold(d time.Duration)

SetDispatchLostThreshold overrides the wait window the dispatch-lost reaper uses to declare a queued task's dispatch lost (optional; mainly for tests). The default is defaultDispatchLostThreshold.

func (*Scheduler) SetDispatcher

func (s *Scheduler) SetDispatcher(d Dispatcher)

SetDispatcher attaches the executor dispatcher (optional; without it the scheduler advances state only and launches nothing).

func (*Scheduler) SetInlineRunner

func (s *Scheduler) SetInlineRunner(r InlineRunner)

SetInlineRunner attaches the inline http_api runner (optional; without it inline http_api tasks fall back to the standard queued dispatch path).

func (*Scheduler) SetLeading

func (s *Scheduler) SetLeading(on bool)

SetLeading marks whether this instance currently holds scheduler leadership. The leadership manager sets it true while the loop runs and false when it steps down (lost lock) or stops. Becoming leader resets the tick clock so the startup grace applies afresh and a stale pre-step-down heartbeat is not mistaken for a stall. It governs Heartbeat: only a leader is expected to tick.

func (*Scheduler) SetOrphanThreshold

func (s *Scheduler) SetOrphanThreshold(d time.Duration)

SetOrphanThreshold overrides the stall-detection window the reaper uses to declare a running dag run orphaned (optional; mainly for tests). The default is defaultOrphanThreshold.

func (*Scheduler) SetRecorder

func (s *Scheduler) SetRecorder(r Recorder)

SetRecorder attaches a metrics recorder (optional).

func (*Scheduler) SetStepTimeout

func (s *Scheduler) SetStepTimeout(d time.Duration)

SetStepTimeout overrides the per-tick timeout (optional; mainly for tests).

func (*Scheduler) Step

func (s *Scheduler) Step(ctx context.Context) error

Step runs one deterministic scheduling iteration over every active run. Each run is advanced in isolation (see advanceSafely): a panic or error in one run is contained, so it never blocks the other runs or new-run creation. The reaper runs independently of createDueRuns success β€” they share no dependency, and silencing the reaper when scheduling has a hiccup would let orphans accumulate exactly when the operator is most likely to notice the counter is wrong. The first non-nil infra-level error is returned (logged by the caller); the later phases still execute.

func (*Scheduler) SteppingDown

func (s *Scheduler) SteppingDown() bool

SteppingDown exposes the current step-down state for tests and callers that want to classify an error themselves (the four scheduler.go log sites call logSchedulerError with this value).

type StaleQueuedCandidate

StaleQueuedCandidate is one task instance in `queued` whose dispatch may have been lost — typically because the scheduler crashed mid-tick between committing the scheduled→queued transition and actually dispatching the TI to an executor. The reaper compares the gap from QueuedAt to "now" against a dispatch-lost threshold; a non-zero gap larger than the threshold means the dispatch is presumed gone and the TI is failed with reason `dispatch_lost`. This unblocks the orphan-run reaper, which keeps stuck runs out of its candidate set as long as any TI looks active (#202).

type StaleQueuedCandidate struct {
    TaskInstanceID string
    DagRunID       string
    DagID          string
    TaskID         string
    QueuedAt       time.Time
}

type Store

Store is the scheduler's view of persistent state. The concrete implementation is sqlc-backed; tests use a fake.

type Store interface {
    ActiveRuns(ctx context.Context) ([]RunState, error)
    MaterializeTasks(ctx context.Context, runID string, tasks []domain.TaskSpec) error
    ApplyTransition(ctx context.Context, runID, taskID string, to domain.TaskState) error
    // ResetForRetry returns a task to 'none' and increments its try number so a
    // retry re-evaluates and re-runs it.
    ResetForRetry(ctx context.Context, runID, taskID string) error
    SetRunState(ctx context.Context, runID string, state domain.DagRunState) error
    ScheduledDAGs(ctx context.Context) ([]ScheduledDAG, error)
    CreateScheduledRun(ctx context.Context, dagID string, logical time.Time) error
    // SetTaskNote attaches operational context to a task instance (shown in the
    // UI), e.g. why it is queued but not running.
    SetTaskNote(ctx context.Context, runID, taskID, note string) error
    // ReapStore methods drive the orphan reaper (#120): they list running runs
    // that have gone quiet and fail them so the dashboard counter is correct.
    ReapStore
    // HeartbeatReapStore methods drive the TI heartbeat reaper (#128): they
    // list `running` task instances whose agent has gone silent and fail them
    // as `agent_lost` so the dashboard counter recovers from agent-side crashes.
    HeartbeatReapStore
    // DispatchLostReapStore methods drive the dispatch-lost reaper (#202):
    // they list `queued` task instances whose dispatch has been pending past
    // the threshold and fail them as `dispatch_lost`. This unblocks the
    // orphan-run reaper for runs stuck by a mid-tick scheduler crash, which
    // would otherwise keep stuck queued TIs out of the candidate set forever
    // (the orphan reaper's "no active TI" safety guard).
    DispatchLostReapStore
}

type TriggerDecision

TriggerDecision is the scheduler's decision for a task given its trigger rule and the states of its upstream tasks.

type TriggerDecision int

Possible scheduler decisions for a task.

const (
    // DecisionWait means the upstreams are not yet settled; re-evaluate later.
    DecisionWait TriggerDecision = iota
    // DecisionSchedule means dependencies are satisfied; move the task to scheduled.
    DecisionSchedule
    // DecisionSkip means the trigger rule can no longer be satisfied; skip the task.
    DecisionSkip
    // DecisionUpstreamFailed means a required upstream failed; propagate the failure.
    DecisionUpstreamFailed
)

func EvaluateTriggerRule

func EvaluateTriggerRule(rule domain.TriggerRule, upstreams []domain.TaskState) TriggerDecision

EvaluateTriggerRule decides what to do with a task given its trigger rule and the current states of its upstream tasks. A task with no upstreams (a root task) is always scheduled.

Generated by gomarkdoc