Skip to content

storage

import "github.com/neochaotic/leoflow/internal/storage"

Package storage wraps the Postgres and Redis connections used by the control plane, exposing the sqlc-generated query set and health checks.

Index

func AttachRedisObservability

func AttachRedisObservability(ctx context.Context, r *Redis, m RedisMetrics, interval time.Duration) func()

AttachRedisObservability registers the metrics hook and starts a goroutine that scrapes the pool stats every interval. The returned function cancels the scraper goroutine; the caller defers it (typically the datastore cleanup chain). Lite (Redis nil) is a no-op.

The scraper is a closure so the "last seen cumulative timeouts" counter (used to compute per-scrape DELTAS for the Prometheus counter, since go-redis exposes Timeouts as a cumulative value) is goroutine-local — no shared mutable state.

func NewLeaderPool

func NewLeaderPool(ctx context.Context, cfg config.DatabaseSection) (*pgxpool.Pool, error)

NewLeaderPool opens a dedicated single-connection pool for the scheduler advisory lock, so the session holding the lock is stable (ADR 0009).

type ExecutionStore

ExecutionStore resolves task execution context from Postgres. It implements both agentrpc.Store (serving the in-pod agent) and dispatch.Resolver (feeding the pod-path dispatcher) over the same dag_version spec.

type ExecutionStore struct {
    // contains filtered or unexported fields
}

func NewExecutionStore

func NewExecutionStore(pg *Postgres) *ExecutionStore

NewExecutionStore builds an ExecutionStore over the given Postgres connection.

func (*ExecutionStore) FailTask

func (s *ExecutionStore) FailTask(ctx context.Context, taskInstanceID, reason string) error

FailTask marks a task instance failed by its ID, but only while it is still active (scheduled/queued/running), so it never clobbers a terminal state. It implements executor.FailureReporter for the pod reconciler.

func (*ExecutionStore) RecordHeartbeat

func (s *ExecutionStore) RecordHeartbeat(ctx context.Context, id auth.AgentIdentity) error

RecordHeartbeat stamps last_heartbeat_at on the agent's TI so the scheduler's heartbeat reaper (#128) can tell a live task from one whose agent has gone silent. The SQL guard skips terminal rows — a late heartbeat after a terminal report is a no-op, not a regression.

func (*ExecutionStore) ReportState

func (s *ExecutionStore) ReportState(ctx context.Context, id auth.AgentIdentity, state domain.TaskState, exitCode int, errMsg string) error

ReportState records a state transition reported by the agent, persisting the exit code and error message and stamping started/ended/duration timestamps.

func (*ExecutionStore) ResolveTask

func (s *ExecutionStore) ResolveTask(ctx context.Context, runID, taskID string) (dispatch.Resolved, error)

ResolveTask returns the dispatcher's execution context for a run's task.

func (*ExecutionStore) TaskSpec

func (s *ExecutionStore) TaskSpec(ctx context.Context, id auth.AgentIdentity) (agentrpc.TaskSpec, error)

TaskSpec returns the agent-facing execution spec for a task instance.

type LogReader

LogReader resolves a task attempt's log location from API-facing identifiers and reads it from the log sink, and tails its live lines.

type LogReader struct {
    // contains filtered or unexported fields
}

func NewLogReader

func NewLogReader(pg *Postgres, sink logs.Sink, tailer logs.Tailer) *LogReader

NewLogReader builds a LogReader over the given Postgres connection, sink, and live-tail tailer (tailer may be nil to disable following).

func (*LogReader) ReadLogs

func (r *LogReader) ReadLogs(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (io.ReadCloser, error)

ReadLogs resolves the run reference (tenant name -> id, run_id -> dag_run id), then opens the stored log for the task attempt. It returns domain.ErrNotFound when the run or its log file is absent. See issue #21 for the resolution cost.

func (*LogReader) Tail

func (r *LogReader) Tail(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (lines <-chan string, cancel func(), err error)

Tail subscribes to the task attempt's live log lines, returning a line channel and a cancel function. It resolves the run reference the same way ReadLogs does so the channel matches what the agent publishes.

type Postgres

Postgres holds a pgx connection pool and the generated query set.

type Postgres struct {
    Pool    *pgxpool.Pool
    Queries *queries.Queries
}

func NewPostgres

func NewPostgres(ctx context.Context, cfg config.DatabaseSection) (*Postgres, error)

NewPostgres opens a connection pool and verifies connectivity, retrying transient failures during boot for up to pgStartupBudget. Pre-2026-06, the first failed ping fatal-ed the server, so a docker compose race or any Pro failover blip became a hard crash. The retry loop keeps Lite boot ergonomic and Pro startup resilient under realistic upstream-PG dynamics. A truly broken setup (wrong DSN, bad auth) still surfaces quickly because the underlying error is wrapped into the final error.

func (*Postgres) Close

func (p *Postgres) Close()

Close releases the connection pool.

func (*Postgres) Ping

func (p *Postgres) Ping(ctx context.Context) error

Ping checks database connectivity (used by /readyz).

type Redis

Redis wraps a go-redis client used for XCom and locks.

type Redis struct {
    Client *redis.Client
}

func NewRedis

func NewRedis(ctx context.Context, cfg config.RedisSection) (*Redis, error)

NewRedis connects to Redis and verifies connectivity.

func (*Redis) Close

func (r *Redis) Close() error

Close releases the Redis client.

func (*Redis) Ping

func (r *Redis) Ping(ctx context.Context) error

Ping checks Redis connectivity (used by /readyz).

type RedisMetrics

RedisMetrics is the subset of observability.Metrics the redis hook + pool scraper need. Declared as a local interface so internal/storage doesn't import internal/observability (cycle-avoidance, also lets tests inject a fake without standing up a Prometheus registry).

type RedisMetrics interface {
    RecordRedisCommandFailure(reason string)
    RecordRedisDialFailure(reason string)
    ObserveRedisDialDuration(d time.Duration)
    UpdateRedisPoolStats(active, idle, total uint32)
    RecordRedisPoolTimeout()
}

type Repository

Repository implements the API resource and auth user-store interfaces over Postgres using the sqlc-generated query set.

type Repository struct {
    // contains filtered or unexported fields
}

func NewRepository

func NewRepository(pg *Postgres) *Repository

NewRepository builds a Repository backed by the given Postgres connection.

func (*Repository) AddFavorite

func (r *Repository) AddFavorite(ctx context.Context, tenant, userID, dagID string) error

AddFavorite marks a DAG as a favorite for the user (idempotent).

func (*Repository) BootstrapAdmin

func (r *Repository) BootstrapAdmin(ctx context.Context, tenant, email, password string) (bool, error)

BootstrapAdmin creates a default admin user with the given password when the tenant has no users yet, assigning the seeded admin role. It returns whether a user was created (false when users already exist).

func (*Repository) BootstrapAdminHash

func (r *Repository) BootstrapAdminHash(ctx context.Context, tenant, email, hash string) (bool, error)

BootstrapAdminHash provisions the Lite admin from a precomputed bcrypt hash (so the plaintext never reaches the control plane). It RECONCILES: if the admin already exists, its password is reset to this hash. The Lite config (admin_password_hash) is the source of truth, so the password the setup printed always logs in — even against a pre-existing or stale database — without anyone having to wipe Docker volumes. The only sanctioned way to change the password, `reset-password`, also writes the config, so the two never drift. Returns true only when the admin was newly created (false when an existing one was reconciled). See cmd/leoflow-server bootstrapAdmin.

func (*Repository) ClearDagHistory

func (r *Repository) ClearDagHistory(ctx context.Context, tenant, dagID string) error

ClearDagHistory deletes a DAG's runs (cascading task instances and XCom index rows) while keeping the DAG and its versions registered — the safe "clear" the UI trash maps to (ADR 0020). Returns ErrNotFound when the DAG is absent.

func (*Repository) ClearImportError

func (r *Repository) ClearImportError(ctx context.Context, tenant, filename string) error

ClearImportError removes any recorded error for a file (a good re-import).

func (*Repository) ClearTaskInstances

func (r *Repository) ClearTaskInstances(ctx context.Context, tenant, dagID, runID string, taskIDs []string, onlyFailed, resetDagRun bool) (int, error)

ClearTaskInstances resets tasks to none for re-run, optionally resetting the parent run to queued. When onlyFailed is true, only tasks currently in a failed-ish state (failed, upstream_failed, up_for_retry) are reset; with an empty taskIDs and onlyFailed, every failed task in the run is cleared. It returns the number of task instances actually reset.

func (*Repository) CreateDagRun

func (r *Repository) CreateDagRun(ctx context.Context, tenant, dagID string, run domain.DagRun) (domain.DagRun, error)

CreateDagRun inserts a new run for a DAG at its current version. The per-DAG max_active_runs cap (#200) is enforced here for any caller that goes through the repository — manual triggers via the API, scripted backfills, and any future programmatic trigger path — so the contract is honored in one place. A cap of zero is treated as "unlimited" to match the scheduler path (see `Scheduler.hasHeadroom`). The check races with concurrent inserts, but the small overshoot window is bounded by the number of concurrent writers and lets us avoid an advisory lock on the hot path.

func (*Repository) DagStats

func (r *Repository) DagStats(ctx context.Context, tenant string) (domain.DagStats, error)

DagStats returns the home dashboard's DAG counters: the total active DAG count plus how many DAGs have a latest run in the failed/running/queued state.

func (*Repository) DeleteConnection

func (r *Repository) DeleteConnection(ctx context.Context, tenant, connID string) error

DeleteConnection removes a connection, returning ErrNotFound when none matched.

func (*Repository) DeleteDag

func (r *Repository) DeleteDag(ctx context.Context, tenant, dagID string) error

DeleteDag removes a DAG and (via ON DELETE CASCADE) its versions, runs, task instances, and XCom index rows. It returns ErrNotFound when no DAG matched.

func (*Repository) DeleteDagRun

func (r *Repository) DeleteDagRun(ctx context.Context, tenant, dagID, runID string) error

DeleteDagRun removes one run (and, by cascade, its task instances and XCom). It returns domain.ErrNotFound when no run with that id exists for the DAG, so the API can answer 404 rather than a silent 204 for a bad id.

func (*Repository) DeleteVariable

func (r *Repository) DeleteVariable(ctx context.Context, tenant, key string) error

DeleteVariable removes a variable, returning ErrNotFound when none matched.

func (*Repository) FavoriteDagIDs

func (r *Repository) FavoriteDagIDs(ctx context.Context, tenant, userID string) (map[string]bool, error)

FavoriteDagIDs returns the set of DAG ids the user has favorited.

func (*Repository) FindUserByLogin

func (r *Repository) FindUserByLogin(ctx context.Context, tenant, username string) (*auth.User, string, error)

FindUserByLogin loads a user and its bcrypt hash for authentication.

func (*Repository) GetConnection

func (r *Repository) GetConnection(ctx context.Context, tenant, connID string) (domain.Connection, error)

GetConnection returns a connection with extra decrypted; the password is not returned (write-only). Returns ErrNotFound when absent.

func (*Repository) GetCurrentSpec

func (r *Repository) GetCurrentSpec(ctx context.Context, tenant, dagID string) (domain.DAGSpec, error)

GetCurrentSpec returns the parsed spec of the DAG's current version, or domain.ErrNotFound if the DAG or its current version does not exist.

func (*Repository) GetDag

func (r *Repository) GetDag(ctx context.Context, tenant, dagID string) (domain.DAG, error)

GetDag returns a single DAG by its user-facing id.

func (*Repository) GetDagRun

func (r *Repository) GetDagRun(ctx context.Context, tenant, dagID, runID string) (domain.DagRun, error)

GetDagRun returns a single run by its run id.

func (*Repository) GetVariable

func (r *Repository) GetVariable(ctx context.Context, tenant, key string) (domain.Variable, error)

GetVariable returns one variable by key, or ErrNotFound.

func (*Repository) HistoricalMetrics

func (r *Repository) HistoricalMetrics(ctx context.Context, tenant string, since, until time.Time) (domain.HistoricalMetrics, error)

HistoricalMetrics returns run- and task-instance state counts for runs whose logical date falls within [since, until], keyed by Leoflow state name.

func (*Repository) LatestRunsForDags

func (r *Repository) LatestRunsForDags(ctx context.Context, tenant string, dagIDs []string, perDag int) (map[string][]domain.DagRun, error)

LatestRunsForDags returns up to perDag most-recent runs for each named DAG, keyed by dag_id, in a single windowed query (no per-DAG round trips).

func (*Repository) ListAuditLogs

func (r *Repository) ListAuditLogs(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.AuditLogEntry, int, error)

ListAuditLogs returns a page of audit-log entries for the tenant, newest first, optionally filtered to a single DAG (dagID == "" means no filter).

func (*Repository) ListConnections

func (r *Repository) ListConnections(ctx context.Context, tenant string, limit, offset int) ([]domain.Connection, int, error)

ListConnections returns a page of connections (no passwords) and the total.

func (*Repository) ListDagRuns

func (r *Repository) ListDagRuns(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.DagRun, int, error)

ListDagRuns returns a page of runs for a DAG and the total count.

func (*Repository) ListDagVersions

func (r *Repository) ListDagVersions(ctx context.Context, tenant, dagID string) ([]domain.DagVersion, error)

ListDagVersions returns the DAG's versions, newest first, with a 1-based version_number the UI uses to query version-scoped structure.

func (*Repository) ListDags

func (r *Repository) ListDags(ctx context.Context, tenant string, limit, offset int) ([]domain.DAG, int, error)

ListDags returns a page of DAGs for the tenant and the total count.

func (*Repository) ListDagsFiltered

func (r *Repository) ListDagsFiltered(ctx context.Context, tenant, runState string, paused *bool, limit, offset int) ([]domain.DAG, int, error)

ListDagsFiltered returns a page of active DAGs for the tenant, optionally filtered by paused state and/or latest-run state, with the matching total. An empty runState or nil paused disables that filter.

func (*Repository) ListImportErrors

func (r *Repository) ListImportErrors(ctx context.Context, tenant string) ([]domain.ImportError, error)

ListImportErrors returns the tenant's DAG parse/compile errors, newest first.

func (*Repository) ListTaskInstanceAttempts

func (r *Repository) ListTaskInstanceAttempts(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.TaskInstance, error)

ListTaskInstanceAttempts returns every attempt for (run, task), oldest first — the current task_instances row UNIONed with all archived task_instance_history rows. The UI's /tries endpoint needs this to render one navigable tab per attempt; without history, a cleared task shows only the latest attempt and the user cannot inspect prior failures (Lima bug #241).

func (*Repository) ListTaskInstances

func (r *Repository) ListTaskInstances(ctx context.Context, tenant, dagID, runID string, _, _ int) ([]domain.TaskInstance, int, error)

ListTaskInstances returns the task instances of a run.

func (*Repository) ListVariables

func (r *Repository) ListVariables(ctx context.Context, tenant string, limit, offset int) ([]domain.Variable, int, error)

ListVariables returns a page of variables for the tenant and the total count.

func (*Repository) RecordTaskActionAudit

func (r *Repository) RecordTaskActionAudit(ctx context.Context, tenant, userID, action, dagID, runID, taskID string, tryNumber int) error

RecordTaskActionAudit logs a task-level action (clear, mark state) with the acting user and the run/task/try in metadata, so the Audit Log view shows the owner and the task columns. Scoped to the DAG (resource_id = dag_id) so it appears on the DAG's Audit Log tab.

func (*Repository) RegisterDagVersion

func (r *Repository) RegisterDagVersion(ctx context.Context, tenant string, spec domain.DAGSpec, specHash string) (bool, error)

RegisterDagVersion upserts the DAG and inserts a version keyed by specHash, setting it as current. It is idempotent: an existing hash yields created=false.

func (*Repository) RemoveFavorite

func (r *Repository) RemoveFavorite(ctx context.Context, tenant, userID, dagID string) error

RemoveFavorite clears a DAG's favorite mark for the user (idempotent).

func (*Repository) SecretConnectionURIs

func (r *Repository) SecretConnectionURIs(ctx context.Context, tenantID string) (map[string]string, error)

SecretConnectionURIs returns the tenant's connections as conn_id→Airflow URI (password decrypted), for delivering to task pods (ADR 0021). The agent exports them as AIRFLOW_CONN_\<CONN_ID>. tenantID is the tenant UUID carried by the agent token. Never expose these in UI/API responses.

func (*Repository) SecretVariables

func (r *Repository) SecretVariables(ctx context.Context, tenantID string) (map[string]string, error)

SecretVariables returns the tenant's variables as key→value, for delivering to task pods (ADR 0021). The agent exports them as AIRFLOW_VAR_\<KEY>. tenantID is the tenant UUID carried by the agent token (not the tenant name).

func (*Repository) SetCipher

func (r *Repository) SetCipher(c secrets.Cipher)

SetCipher attaches the encryption cipher used for connection secrets (ADR 0019). Without it, connection writes fail rather than storing plaintext.

func (*Repository) SetConnection

func (r *Repository) SetConnection(ctx context.Context, tenant string, c domain.Connection) error

SetConnection creates or updates a connection, encrypting password and extra at rest. It fails if no encryption cipher is configured (never stores a credential in plaintext — ADR 0019).

func (*Repository) SetDagRunState

func (r *Repository) SetDagRunState(ctx context.Context, tenant, dagID, runID, state string) error

SetDagRunState sets a DAG run's state directly, backing the UI's mark run success/failed actions. Terminal states stamp ended_at; re-opening to a non-terminal state clears it. started_at is preserved.

func (*Repository) SetImportError

func (r *Repository) SetImportError(ctx context.Context, tenant string, e domain.ImportError) error

SetImportError records (or replaces) the parse/compile error for a file.

func (*Repository) SetPaused

func (r *Repository) SetPaused(ctx context.Context, tenant, dagID string, paused bool) (domain.DAG, error)

SetPaused toggles the paused flag of a DAG.

func (*Repository) SetTaskInstanceState

func (r *Repository) SetTaskInstanceState(ctx context.Context, tenant, dagID, runID, taskID, state string) error

SetTaskInstanceState sets a task instance's state directly, backing the UI's "mark success"/"mark failed" actions. It does not run the task.

func (*Repository) SetUserPassword

func (r *Repository) SetUserPassword(ctx context.Context, tenant, email, hash string) (bool, error)

SetUserPassword sets a user's bcrypt hash by email, returning whether a user was updated (false when no such user exists). Used by `leoflow lite reset-password`.

func (*Repository) SetVariable

func (r *Repository) SetVariable(ctx context.Context, tenant string, v domain.Variable) error

SetVariable creates or updates a variable.

func (*Repository) TaskInstancesForRuns

func (r *Repository) TaskInstancesForRuns(ctx context.Context, tenant, dagID string, runIDs []string) ([]domain.TaskInstance, error)

TaskInstancesForRuns returns the task instances of the given runs of a DAG in one query, ordered by run_id, task_id, try_number, for the grid summaries.

func (*Repository) TenantUUID

func (r *Repository) TenantUUID(ctx context.Context, name string) (string, error)

TenantUUID resolves a tenant name to its UUID string — the form the agent token carries and that the secret-delivery methods expect.

type SchedulerStore

SchedulerStore is the sqlc-backed implementation of scheduler.Store.

type SchedulerStore struct {
    // contains filtered or unexported fields
}

func NewSchedulerStore

func NewSchedulerStore(pg *Postgres) *SchedulerStore

NewSchedulerStore builds a SchedulerStore over the given Postgres connection.

func (*SchedulerStore) ActiveRuns

func (s *SchedulerStore) ActiveRuns(ctx context.Context) ([]scheduler.RunState, error)

ActiveRuns returns every queued/running run with its topology and task states.

func (*SchedulerStore) ApplyTransition

func (s *SchedulerStore) ApplyTransition(ctx context.Context, runID, taskID string, to domain.TaskState) error

ApplyTransition moves a task instance to a new state.

func (*SchedulerStore) CreateScheduledRun

func (s *SchedulerStore) CreateScheduledRun(ctx context.Context, dagID string, logical time.Time) error

CreateScheduledRun inserts a scheduled run for a DAG (idempotent on run_id).

func (*SchedulerStore) ListActiveStagingVolumes

func (s *SchedulerStore) ListActiveStagingVolumes(ctx context.Context) ([]domain.StagingVolumeState, error)

ListActiveStagingVolumes returns active staging volumes joined with their DAG run's state (empty when the run row is gone), for the GC (ADR 0022).

func (*SchedulerStore) ListAgentLostCandidates

func (s *SchedulerStore) ListAgentLostCandidates(ctx context.Context) ([]scheduler.AgentLostCandidate, error)

ListAgentLostCandidates returns every `running` TI with a non-null last_heartbeat_at, for the scheduler's TI heartbeat reaper (#128). The reaper applies the threshold per row so the SQL stays simple.

func (*SchedulerStore) ListReapCandidates

func (s *SchedulerStore) ListReapCandidates(ctx context.Context) ([]scheduler.ReapCandidate, error)

ListReapCandidates returns every dag_run currently in 'running' state with the timestamp of its most recent activity, for the scheduler's orphan reaper. The query (sqlc.runs.ListOrphanCandidates) is the authority on how to compute the timestamp; the reaper only decides whether each one is past its threshold.

func (*SchedulerStore) ListStaleQueuedCandidates

func (s *SchedulerStore) ListStaleQueuedCandidates(ctx context.Context) ([]scheduler.StaleQueuedCandidate, error)

ListStaleQueuedCandidates returns every `queued` TI with its queued_at, for the dispatch-lost reaper (#202). The reaper applies the threshold per row so the SQL stays simple.

func (*SchedulerStore) MarkStagingDeleted

func (s *SchedulerStore) MarkStagingDeleted(ctx context.Context, pvcName, reason string) error

MarkStagingDeleted records that a staging volume's PVC was deleted and why (run_succeeded | ttl_expired | orphaned).

func (*SchedulerStore) MarkTaskAgentLost

func (s *SchedulerStore) MarkTaskAgentLost(ctx context.Context, taskInstanceID string) error

MarkTaskAgentLost transitions one TI to `failed` with the agent_lost reason. The WHERE state='running' guard makes this idempotent and prevents a late terminal report being overwritten — if the row already moved, we touch zero rows and return nil.

func (*SchedulerStore) MarkTaskDispatchFailed

func (s *SchedulerStore) MarkTaskDispatchFailed(ctx context.Context, runID, taskID, reason string) error

MarkTaskDispatchFailed transitions a TI to `failed` after its asynchronous dispatch failed inside the BufferedDispatcher worker (#127). The SQL guard only targets scheduled/queued rows, so a TI that already moved to running or terminal between the worker accepting the request and the dispatch failing is left alone (defense in depth — the agent's late progress report wins over the dispatcher's "I failed" claim).

func (*SchedulerStore) MarkTaskDispatchLost

func (s *SchedulerStore) MarkTaskDispatchLost(ctx context.Context, taskInstanceID string) error

MarkTaskDispatchLost transitions one TI to `failed` with the dispatch_lost reason. The WHERE state='queued' guard makes this idempotent: a TI that has since been dispatched (real progress landed) is left alone.

func (*SchedulerStore) MaterializeTasks

func (s *SchedulerStore) MaterializeTasks(ctx context.Context, runID string, tasks []domain.TaskSpec) error

MaterializeTasks creates a none-state task instance for each task in the run.

func (*SchedulerStore) ReapRun

func (s *SchedulerStore) ReapRun(ctx context.Context, runID string) error

ReapRun fails an orphaned dag run, then any of its still-active task instances, inside a single transaction. The run UPDATE comes first and is guarded by `state = 'running'`: if zero rows are touched, the run was no longer running (a competing finalizer beat us) and we abort with a clean rollback — the TI table is never touched. This guarantees we cannot leave a run as `success`/`failed` while flipping its TIs to `failed (orphaned)`. Idempotent: a second call on an already-failed run no-ops.

func (*SchedulerStore) RecordStagingVolume

func (s *SchedulerStore) RecordStagingVolume(ctx context.Context, tenantID, dagID, runID, pvcName, size string) error

RecordStagingVolume records a per-run staging volume as active, keyed by PVC name (idempotent — called per task as the PVC is ensured). ADR 0022.

func (*SchedulerStore) ResetForRetry

func (s *SchedulerStore) ResetForRetry(ctx context.Context, runID, taskID string) error

ResetForRetry returns a task instance to 'none', clears its timestamps, and increments its try number so the scheduler re-evaluates and re-runs it.

func (*SchedulerStore) ScheduledDAGs

func (s *SchedulerStore) ScheduledDAGs(ctx context.Context) ([]scheduler.ScheduledDAG, error)

ScheduledDAGs returns active, unpaused, cron-scheduled DAGs with the logical date of their most recent run.

func (*SchedulerStore) SetRunState

func (s *SchedulerStore) SetRunState(ctx context.Context, runID string, state domain.DagRunState) error

SetRunState updates a run's state.

func (*SchedulerStore) SetTaskNote

func (s *SchedulerStore) SetTaskNote(ctx context.Context, runID, taskID, note string) error

SetTaskNote attaches operational context to a task instance, shown in the UI.

type XComIndex

XComIndex is the Postgres-backed XCom metadata index. It implements xcom.Index, recording each pushed value so the API can find and list it.

type XComIndex struct {
    // contains filtered or unexported fields
}

func NewXComIndex

func NewXComIndex(pg *Postgres) *XComIndex

NewXComIndex builds an XComIndex over the given Postgres connection.

func (*XComIndex) PurgeExpired

func (x *XComIndex) PurgeExpired(ctx context.Context) error

PurgeExpired deletes xcom_index rows past their expiry. Redis expires the values natively; this reclaims the metadata rows.

func (*XComIndex) RecordXCom

func (x *XComIndex) RecordXCom(ctx context.Context, e xcom.IndexEntry) error

RecordXCom upserts the metadata for a pushed XCom value.

type XComReader

XComReader reads XCom values for the API: it resolves the Redis key from the Postgres index by name and fetches the value from the backend.

type XComReader struct {
    // contains filtered or unexported fields
}

func NewXComReader

func NewXComReader(pg *Postgres, backend xcom.Backend) *XComReader

NewXComReader builds an XComReader over the given Postgres connection and XCom backend.

func (*XComReader) GetXCom

func (r *XComReader) GetXCom(ctx context.Context, tenant, dagID, runID, taskID, key string) (xcom.Entry, error)

GetXCom returns the XCom entry for the named value, or domain.ErrNotFound when it is absent or expired (in the index or in Redis).

func (*XComReader) ListXComEntries

func (r *XComReader) ListXComEntries(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.XComEntryMeta, error)

ListXComEntries returns the metadata of every non-expired XCom pushed by a task instance (keys and timestamps, no values), for the XCom list view.

Generated by gomarkdoc