api¶
Package api implements the Airflow-compatible HTTP control plane (ADR 0007).
Index¶
- Constants
- Variables
- func AbortProblem(c *gin.Context, status int, title, detail string)
- func CORS(allowed []string) gin.HandlerFunc
- func DevBypassAuth() gin.HandlerFunc
- func JWTAuth(authn auth.Authenticator) gin.HandlerFunc
- func NewServer(deps Dependencies) *gin.Engine
- func NoStoreOnVolatileRoutes() gin.HandlerFunc
- func Observe(metrics Metrics, tracer trace.Tracer) gin.HandlerFunc
- func RequestID() gin.HandlerFunc
- func RequirePermission(action, resource string) gin.HandlerFunc
- func StructuredLogger(logger *slog.Logger) gin.HandlerFunc
- func UserFromContext(c *gin.Context) (*auth.User, bool)
- type AuditLogReader
- type AuditWriter
- type ConnectionStore
- type ConnectionTester
- type DagLatestRunsReader
- type DagRepository
- type DagRunRepository
- type DagSpecReader
- type DagVersionLister
- type DagVersionRepository
- type DashboardStatsReader
- type Dependencies
- type ExecutorInfo
- type FavoriteStore
- type HealthChecker
- type Heartbeater
- type ImportErrorStore
- type LogReader
- type Metrics
- type Problem
- type TaskInstanceRepository
- type TaskSummaryReader
- type UIServer
- type VariableStore
- type WorkspaceFS
- type XComReader
Constants¶
DefaultUIAutoRefreshIntervalSeconds is the production-safe value returned by /ui/config when no explicit override is configured. Lite overrides this to a smaller value (typically 5s) for a snappy inner-loop dev experience; Pro keeps 30s so the SPA's polling does not hammer a shared metadata DB.
Variables¶
ErrNotFound is returned by repositories when a resource does not exist.
func AbortProblem¶
AbortProblem writes an RFC 7807 problem response and stops the handler chain.
func CORS¶
CORS allows the configured origins (use "*" to allow any).
func DevBypassAuth¶
DevBypassAuth authenticates EVERY request as a fixed admin user, with no token required. It exists solely for `leoflow dev` (the local, unsandboxed loop) so a developer reaches the UI without logging in. It must only be wired under the explicit dev opt-in (config auth.dev_no_auth); the server logs a prominent warning when it is active. NEVER enable this in production.
func JWTAuth¶
JWTAuth validates the bearer token on protected routes and stores the user.
func NewServer¶
NewServer builds the gin engine with the full middleware chain, health and metrics endpoints, embedded Scalar docs, and the auth token endpoint.
func NoStoreOnVolatileRoutes¶
NoStoreOnVolatileRoutes stamps every response served from the SPA-facing JSON surface (`/api/v2/*` and `/ui/*`) with `Cache-Control: no-store, must-revalidate` so the browser HTTP cache does not return a pre-mutation payload after a PATCH/POST/DELETE.
Why this exists (#211, #271): mark-state PATCH succeeds in single-digit ms; TanStack Query then invalidates its in-memory cache and re-fetches. Without this header, the browser's HTTP cache layer can serve the OLD response to that re-fetch (the original GET response had no explicit caching directive, so the browser falls back to heuristic caching). The SPA renders stale state until the next "natural" refresh โ the observable symptom is "marcar como falha demora uma eternidade".
Static assets (`/ide/vs/*` for the Monaco bundle) are content-hashed and SHOULD cache, so they are explicitly excluded.
We deliberately use "no-store" rather than "no-cache": no-store forbids the browser from writing the response anywhere, which is the strongest guarantee we can give a TanStack-backed SPA. "must-revalidate" is added for older intermediaries (proxies / SW) that may not honor no-store alone. This is ADR-0017-compatible: no SPA changes.
func Observe¶
Observe wraps each request in an OTel span and records HTTP metrics (ADR 0010). A nil tracer falls back to the global (no-op) tracer; nil metrics are skipped, so the middleware is safe in tests.
func RequestID¶
RequestID assigns a request id (honoring an inbound X-Request-Id) and echoes it.
func RequirePermission¶
RequirePermission enforces an RBAC permission on a route.
func StructuredLogger¶
StructuredLogger logs one structured line per request (ADR 0010).
func UserFromContext¶
UserFromContext returns the authenticated user stored by JWTAuth.
type AuditLogReader¶
AuditLogReader lists recorded actions for the Audit Log view. dagID == "" means no DAG filter.
type AuditLogReader interface {
ListAuditLogs(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.AuditLogEntry, int, error)
}
type AuditWriter¶
AuditWriter records task-level actions (clear, mark state) for the Audit Log view, with the acting user and the run/task in the entry.
type AuditWriter interface {
RecordTaskActionAudit(ctx context.Context, tenant, userID, action, dagID, runID, taskID string, tryNumber int) error
}
type ConnectionStore¶
ConnectionStore reads and writes Airflow-style Connections for the Admin UI. Password is encrypted at rest by the store (ADR 0019) and never returned.
type ConnectionStore interface {
ListConnections(ctx context.Context, tenant string, limit, offset int) ([]domain.Connection, int, error)
GetConnection(ctx context.Context, tenant, connID string) (domain.Connection, error)
SetConnection(ctx context.Context, tenant string, c domain.Connection) error
DeleteConnection(ctx context.Context, tenant, connID string) error
}
type ConnectionTester¶
ConnectionTester checks whether a connection's endpoint is usable. The default implementation tests reachability (TCP dial / HTTP) from the control plane; full credential/provider validation would need the provider hooks (Python), which the Go control plane does not run.
type ConnectionTester interface {
Test(ctx context.Context, c domain.Connection) (ok bool, message string)
}
type DagLatestRunsReader¶
DagLatestRunsReader fetches the most-recent runs for a set of DAGs in one query, so /ui/dags can embed run history without an N+1.
type DagLatestRunsReader interface {
LatestRunsForDags(ctx context.Context, tenant string, dagIDs []string, perDag int) (map[string][]domain.DagRun, error)
}
type DagRepository¶
DagRepository reads, updates, and deletes registered DAGs.
type DagRepository interface {
ListDags(ctx context.Context, tenant string, limit, offset int) ([]domain.DAG, int, error)
GetDag(ctx context.Context, tenant, dagID string) (domain.DAG, error)
SetPaused(ctx context.Context, tenant, dagID string, paused bool) (domain.DAG, error)
DeleteDag(ctx context.Context, tenant, dagID string) error
ClearDagHistory(ctx context.Context, tenant, dagID string) error
ListDagsFiltered(ctx context.Context, tenant, runState string, paused *bool, limit, offset int) ([]domain.DAG, int, error)
}
type DagRunRepository¶
DagRunRepository reads and creates DAG runs.
type DagRunRepository interface {
ListDagRuns(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.DagRun, int, error)
GetDagRun(ctx context.Context, tenant, dagID, runID string) (domain.DagRun, error)
CreateDagRun(ctx context.Context, tenant, dagID string, run domain.DagRun) (domain.DagRun, error)
SetDagRunState(ctx context.Context, tenant, dagID, runID, state string) error
DeleteDagRun(ctx context.Context, tenant, dagID, runID string) error
}
type DagSpecReader¶
DagSpecReader reads the parsed spec of a DAG's current version, the source of task topology for the grid and graph views.
type DagSpecReader interface {
GetCurrentSpec(ctx context.Context, tenant, dagID string) (domain.DAGSpec, error)
}
type DagVersionLister¶
DagVersionLister lists a DAG's registered versions. The Airflow UI fetches this to resolve a version_number before requesting version-scoped structure (the Graph view); without it the graph never loads. See docs/ui-compatibility.md.
type DagVersionLister interface {
ListDagVersions(ctx context.Context, tenant, dagID string) ([]domain.DagVersion, error)
}
type DagVersionRepository¶
DagVersionRepository registers compiled DAG versions.
type DagVersionRepository interface {
// RegisterDagVersion upserts the DAG and inserts a version keyed by
// specHash, reporting whether a new version was created (false if the hash
// already existed โ the push is idempotent).
RegisterDagVersion(ctx context.Context, tenant string, spec domain.DAGSpec, specHash string) (bool, error)
}
type DashboardStatsReader¶
DashboardStatsReader backs the home dashboard widgets with real counts.
type DashboardStatsReader interface {
DagStats(ctx context.Context, tenant string) (domain.DagStats, error)
HistoricalMetrics(ctx context.Context, tenant string, since, until time.Time) (domain.HistoricalMetrics, error)
}
type Dependencies¶
Dependencies bundles everything the HTTP server needs.
type Dependencies struct {
Logger *slog.Logger
Authenticator auth.Authenticator
RateLimiter *auth.RateLimiter
Registry *prometheus.Registry
Metrics Metrics
Tracer trace.Tracer
HealthChecks map[string]HealthChecker
CORSOrigins []string
TokenTTLSecs int
// InstanceName is shown in the UI navbar (Airflow's instance_name). Empty
// falls back to "Leoflow"; `leoflow dev` sets it to mark the DEV environment.
InstanceName string
// UIAutoRefreshIntervalSeconds controls the SPA's polling cadence for DAG /
// DagRun / task-instance state refresh (Airflow's auto_refresh_interval).
// Non-positive (the zero default) falls back to DefaultUIAutoRefreshIntervalSeconds
// (30s, production-safe). `leoflow lite` sets it to ~5s for a snappy inner loop.
UIAutoRefreshIntervalSeconds int
// DevNoAuth replaces JWT auth with a dev-only bypass that authenticates every
// request as an admin (no login). It is for `leoflow dev` only and must never
// be set in production. See DevBypassAuth.
DevNoAuth bool
// InlineHTTPMaxDurationSeconds caps inline http_api task timeouts at push
// time. Zero falls back to domain.DefaultInlineMaxDurationSeconds.
InlineHTTPMaxDurationSeconds int
// Resource repositories. Routes for nil repositories are not registered.
Dags DagRepository
DagRuns DagRunRepository
Tasks TaskInstanceRepository
Versions DagVersionRepository
Xcoms XComReader
Logs LogReader
Specs DagSpecReader
LatestRuns DagLatestRunsReader
TaskSummary TaskSummaryReader
DagVersions DagVersionLister
DashboardStats DashboardStatsReader
AuditLog AuditLogReader
Variables VariableStore
Connections ConnectionStore
ConnectionTest ConnectionTester
Favorites FavoriteStore
ImportErrors ImportErrorStore
Audit AuditWriter
ExecutorInfo ExecutorInfo
// Workspace backs the Lite web editor (ADR 0025). When nil the editor's
// filesystem API is not registered (Production, or Lite without a workspace).
Workspace WorkspaceFS
// MonacoDir is the directory holding the pinned Monaco bundle that
// `leoflow setup` fetched; the editor page is served Monaco from it. Empty or
// missing makes the page show a setup hint instead of a broken editor.
MonacoDir string
// ExamplesFS backs the IDE's "Download examples" button โ typically the
// `embed.FS` shipped from the leoflow root package. Nil disables the button.
ExamplesFS fs.FS
// SchedulerHealth reports the scheduler's heartbeat for /monitor/health.
// When nil the component reports healthy (single-process role assumption).
SchedulerHealth Heartbeater
// UI serves the embedded SPA. When nil the server is API-only.
UI UIServer
}
type ExecutorInfo¶
ExecutorInfo describes the control plane's execution capacity. It surfaces whether pod dispatch is available โ the cluster-level answer to "why is a task stuck queued" (#46/#47). The stock Airflow UI has no widget for it, but operators (curl/monitoring) and a future custom Cluster Activity view consume it. Cluster Activity in Airflow 3.2 is otherwise the Home dashboard, already backed by /api/v2/monitor/health (#33) and /ui/dashboard/* (#39).
type ExecutorInfo struct {
PodDispatchEnabled bool
TaskNamespace string
AgentControlPlaneAddr string
InlineConcurrency int
}
type FavoriteStore¶
FavoriteStore persists per-user DAG favorites (the DAG-list star).
type FavoriteStore interface {
AddFavorite(ctx context.Context, tenant, userID, dagID string) error
RemoveFavorite(ctx context.Context, tenant, userID, dagID string) error
FavoriteDagIDs(ctx context.Context, tenant, userID string) (map[string]bool, error)
}
type HealthChecker¶
HealthChecker reports dependency health for readiness checks.
type Heartbeater¶
Heartbeater reports a long-running component's liveness and last heartbeat for the monitor health endpoint. The scheduler implements it.
type ImportErrorStore¶
ImportErrorStore reads and writes DAG parse/compile errors that back Airflow's "Import Errors" banner on the home dashboard. The `leoflow dev` watcher writes an entry on a failed compile and clears it on the next good compile; the public GET /api/v2/importErrors feed is what the UI polls.
type ImportErrorStore interface {
ListImportErrors(ctx context.Context, tenant string) ([]domain.ImportError, error)
SetImportError(ctx context.Context, tenant string, e domain.ImportError) error
ClearImportError(ctx context.Context, tenant, filename string) error
}
type LogReader¶
LogReader streams a task attempt's stored logs and, for running tasks, tails new lines live.
type LogReader interface {
ReadLogs(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (io.ReadCloser, error)
Tail(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (<-chan string, func(), error)
}
type Metrics¶
Metrics records HTTP request metrics. observability.Metrics implements it.
type Problem¶
Problem is an RFC 7807 problem-details response body.
type Problem struct {
Type string `json:"type"`
Title string `json:"title"`
Status int `json:"status"`
Detail string `json:"detail,omitempty"`
Instance string `json:"instance,omitempty"`
}
type TaskInstanceRepository¶
TaskInstanceRepository reads task instances, clears them for re-run, and sets their state directly (the UI's mark-success/failed actions).
type TaskInstanceRepository interface {
ListTaskInstances(ctx context.Context, tenant, dagID, runID string, limit, offset int) ([]domain.TaskInstance, int, error)
// ListTaskInstanceAttempts returns every attempt for (run, task), oldest
// first โ the current row UNIONed with the archived history. The UI's
// /tries endpoint needs all attempts to render its navigable tabs.
ListTaskInstanceAttempts(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.TaskInstance, error)
ClearTaskInstances(ctx context.Context, tenant, dagID, runID string, taskIDs []string, onlyFailed, resetDagRun bool) (int, error)
SetTaskInstanceState(ctx context.Context, tenant, dagID, runID, taskID, state string) error
}
type TaskSummaryReader¶
TaskSummaryReader fetches task instances across a set of runs of a DAG, the source for the grid's per-cell state summaries.
type TaskSummaryReader interface {
TaskInstancesForRuns(ctx context.Context, tenant, dagID string, runIDs []string) ([]domain.TaskInstance, error)
}
type UIServer¶
UIServer serves the embedded single-page app: static assets and an index.html shell that the SPA's client-side router falls back to. It is satisfied by internal/ui.Server. When nil, the server runs API-only and unknown paths return 404 instead of the SPA shell.
type UIServer interface {
StaticHandler() http.Handler
Index(w http.ResponseWriter, basePath string)
}
type VariableStore¶
VariableStore reads and writes Airflow-style Variables for the Admin UI.
type VariableStore interface {
ListVariables(ctx context.Context, tenant string, limit, offset int) ([]domain.Variable, int, error)
GetVariable(ctx context.Context, tenant, key string) (domain.Variable, error)
SetVariable(ctx context.Context, tenant string, v domain.Variable) error
DeleteVariable(ctx context.Context, tenant, key string) error
}
type WorkspaceFS¶
WorkspaceFS is the workspace-confined filesystem backing the Lite web editor (ADR 0025). Every path is relative to the workspace root and confined to it.
type WorkspaceFS interface {
Tree() ([]workspace.Entry, error)
Read(rel string) ([]byte, error)
Write(rel string, data []byte) error
Create(rel string, dir bool) error
Move(from, to string) error
Delete(rel string) error
}
type XComReader¶
XComReader reads stored XCom values and lists a task instance's XCom keys for the read API.
type XComReader interface {
GetXCom(ctx context.Context, tenant, dagID, runID, taskID, key string) (xcom.Entry, error)
ListXComEntries(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.XComEntryMeta, error)
}
Generated by gomarkdoc