Skip to content

api

import "github.com/neochaotic/leoflow/internal/api"

Package api implements the Airflow-compatible HTTP control plane (ADR 0007).

Index

Constants

DefaultUIAutoRefreshIntervalSeconds is the production-safe value returned by /ui/config when no explicit override is configured. Lite overrides this to a smaller value (typically 5s) for a snappy inner-loop dev experience; Pro keeps 30s so the SPA's polling does not hammer a shared metadata DB.

const DefaultUIAutoRefreshIntervalSeconds = 30

Variables

ErrNotFound is returned by repositories when a resource does not exist.

var ErrNotFound = domain.ErrNotFound

func AbortProblem

func AbortProblem(c *gin.Context, status int, title, detail string)

AbortProblem writes an RFC 7807 problem response and stops the handler chain.

func CORS

func CORS(allowed []string) gin.HandlerFunc

CORS allows the configured origins (use "*" to allow any).

func DevBypassAuth

func DevBypassAuth() gin.HandlerFunc

DevBypassAuth authenticates EVERY request as a fixed admin user, with no token required. It exists solely for `leoflow dev` (the local, unsandboxed loop) so a developer reaches the UI without logging in. It must only be wired under the explicit dev opt-in (config auth.dev_no_auth); the server logs a prominent warning when it is active. NEVER enable this in production.

func JWTAuth

func JWTAuth(authn auth.Authenticator) gin.HandlerFunc

JWTAuth validates the bearer token on protected routes and stores the user.

func NewServer

func NewServer(deps Dependencies) *gin.Engine

NewServer builds the gin engine with the full middleware chain, health and metrics endpoints, embedded Scalar docs, and the auth token endpoint.

func NoStoreOnVolatileRoutes

func NoStoreOnVolatileRoutes() gin.HandlerFunc

NoStoreOnVolatileRoutes stamps every response served from the SPA-facing JSON surface (`/api/v2/*` and `/ui/*`) with `Cache-Control: no-store, must-revalidate` so the browser HTTP cache does not return a pre-mutation payload after a PATCH/POST/DELETE.

Why this exists (#211, #271): mark-state PATCH succeeds in single-digit ms; TanStack Query then invalidates its in-memory cache and re-fetches. Without this header, the browser's HTTP cache layer can serve the OLD response to that re-fetch (the original GET response had no explicit caching directive, so the browser falls back to heuristic caching). The SPA renders stale state until the next "natural" refresh โ€” the observable symptom is "marcar como falha demora uma eternidade".

Static assets (`/ide/vs/*` for the Monaco bundle) are content-hashed and SHOULD cache, so they are explicitly excluded.

We deliberately use "no-store" rather than "no-cache": no-store forbids the browser from writing the response anywhere, which is the strongest guarantee we can give a TanStack-backed SPA. "must-revalidate" is added for older intermediaries (proxies / SW) that may not honor no-store alone. This is ADR-0017-compatible: no SPA changes.

func Observe

func Observe(metrics Metrics, tracer trace.Tracer) gin.HandlerFunc

Observe wraps each request in an OTel span and records HTTP metrics (ADR 0010). A nil tracer falls back to the global (no-op) tracer; nil metrics are skipped, so the middleware is safe in tests.

func RequestID

func RequestID() gin.HandlerFunc

RequestID assigns a request id (honoring an inbound X-Request-Id) and echoes it.

func RequirePermission

func RequirePermission(action, resource string) gin.HandlerFunc

RequirePermission enforces an RBAC permission on a route.

func StructuredLogger

func StructuredLogger(logger *slog.Logger) gin.HandlerFunc

StructuredLogger logs one structured line per request (ADR 0010).

func UserFromContext

func UserFromContext(c *gin.Context) (*auth.User, bool)

UserFromContext returns the authenticated user stored by JWTAuth.

type AuditLogReader

AuditLogReader lists recorded actions for the Audit Log view. dagID == "" means no DAG filter.

type AuditLogReader interface {
    ListAuditLogs(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.AuditLogEntry, int, error)
}

type AuditWriter

AuditWriter records task-level actions (clear, mark state) for the Audit Log view, with the acting user and the run/task in the entry.

type AuditWriter interface {
    RecordTaskActionAudit(ctx context.Context, tenant, userID, action, dagID, runID, taskID string, tryNumber int) error
}

type ConnectionStore

ConnectionStore reads and writes Airflow-style Connections for the Admin UI. Password is encrypted at rest by the store (ADR 0019) and never returned.

type ConnectionStore interface {
    ListConnections(ctx context.Context, tenant string, limit, offset int) ([]domain.Connection, int, error)
    GetConnection(ctx context.Context, tenant, connID string) (domain.Connection, error)
    SetConnection(ctx context.Context, tenant string, c domain.Connection) error
    DeleteConnection(ctx context.Context, tenant, connID string) error
}

type ConnectionTester

ConnectionTester checks whether a connection's endpoint is usable. The default implementation tests reachability (TCP dial / HTTP) from the control plane; full credential/provider validation would need the provider hooks (Python), which the Go control plane does not run.

type ConnectionTester interface {
    Test(ctx context.Context, c domain.Connection) (ok bool, message string)
}

type DagLatestRunsReader

DagLatestRunsReader fetches the most-recent runs for a set of DAGs in one query, so /ui/dags can embed run history without an N+1.

type DagLatestRunsReader interface {
    LatestRunsForDags(ctx context.Context, tenant string, dagIDs []string, perDag int) (map[string][]domain.DagRun, error)
}

type DagRepository

DagRepository reads, updates, and deletes registered DAGs.

type DagRepository interface {
    ListDags(ctx context.Context, tenant string, limit, offset int) ([]domain.DAG, int, error)
    GetDag(ctx context.Context, tenant, dagID string) (domain.DAG, error)
    SetPaused(ctx context.Context, tenant, dagID string, paused bool) (domain.DAG, error)
    DeleteDag(ctx context.Context, tenant, dagID string) error
    ClearDagHistory(ctx context.Context, tenant, dagID string) error
    ListDagsFiltered(ctx context.Context, tenant, runState string, paused *bool, limit, offset int) ([]domain.DAG, int, error)
}

type DagRunRepository

DagRunRepository reads and creates DAG runs.

type DagRunRepository interface {
    ListDagRuns(ctx context.Context, tenant, dagID string, limit, offset int) ([]domain.DagRun, int, error)
    GetDagRun(ctx context.Context, tenant, dagID, runID string) (domain.DagRun, error)
    CreateDagRun(ctx context.Context, tenant, dagID string, run domain.DagRun) (domain.DagRun, error)
    SetDagRunState(ctx context.Context, tenant, dagID, runID, state string) error
    DeleteDagRun(ctx context.Context, tenant, dagID, runID string) error
}

type DagSpecReader

DagSpecReader reads the parsed spec of a DAG's current version, the source of task topology for the grid and graph views.

type DagSpecReader interface {
    GetCurrentSpec(ctx context.Context, tenant, dagID string) (domain.DAGSpec, error)
}

type DagVersionLister

DagVersionLister lists a DAG's registered versions. The Airflow UI fetches this to resolve a version_number before requesting version-scoped structure (the Graph view); without it the graph never loads. See docs/ui-compatibility.md.

type DagVersionLister interface {
    ListDagVersions(ctx context.Context, tenant, dagID string) ([]domain.DagVersion, error)
}

type DagVersionRepository

DagVersionRepository registers compiled DAG versions.

type DagVersionRepository interface {
    // RegisterDagVersion upserts the DAG and inserts a version keyed by
    // specHash, reporting whether a new version was created (false if the hash
    // already existed โ€” the push is idempotent).
    RegisterDagVersion(ctx context.Context, tenant string, spec domain.DAGSpec, specHash string) (bool, error)
}

type DashboardStatsReader

DashboardStatsReader backs the home dashboard widgets with real counts.

type DashboardStatsReader interface {
    DagStats(ctx context.Context, tenant string) (domain.DagStats, error)
    HistoricalMetrics(ctx context.Context, tenant string, since, until time.Time) (domain.HistoricalMetrics, error)
}

type Dependencies

Dependencies bundles everything the HTTP server needs.

type Dependencies struct {
    Logger        *slog.Logger
    Authenticator auth.Authenticator
    RateLimiter   *auth.RateLimiter
    Registry      *prometheus.Registry
    Metrics       Metrics
    Tracer        trace.Tracer
    HealthChecks  map[string]HealthChecker
    CORSOrigins   []string
    TokenTTLSecs  int
    // InstanceName is shown in the UI navbar (Airflow's instance_name). Empty
    // falls back to "Leoflow"; `leoflow dev` sets it to mark the DEV environment.
    InstanceName string
    // UIAutoRefreshIntervalSeconds controls the SPA's polling cadence for DAG /
    // DagRun / task-instance state refresh (Airflow's auto_refresh_interval).
    // Non-positive (the zero default) falls back to DefaultUIAutoRefreshIntervalSeconds
    // (30s, production-safe). `leoflow lite` sets it to ~5s for a snappy inner loop.
    UIAutoRefreshIntervalSeconds int
    // DevNoAuth replaces JWT auth with a dev-only bypass that authenticates every
    // request as an admin (no login). It is for `leoflow dev` only and must never
    // be set in production. See DevBypassAuth.
    DevNoAuth bool

    // InlineHTTPMaxDurationSeconds caps inline http_api task timeouts at push
    // time. Zero falls back to domain.DefaultInlineMaxDurationSeconds.
    InlineHTTPMaxDurationSeconds int

    // Resource repositories. Routes for nil repositories are not registered.
    Dags           DagRepository
    DagRuns        DagRunRepository
    Tasks          TaskInstanceRepository
    Versions       DagVersionRepository
    Xcoms          XComReader
    Logs           LogReader
    Specs          DagSpecReader
    LatestRuns     DagLatestRunsReader
    TaskSummary    TaskSummaryReader
    DagVersions    DagVersionLister
    DashboardStats DashboardStatsReader
    AuditLog       AuditLogReader
    Variables      VariableStore
    Connections    ConnectionStore
    ConnectionTest ConnectionTester
    Favorites      FavoriteStore
    ImportErrors   ImportErrorStore
    Audit          AuditWriter
    ExecutorInfo   ExecutorInfo

    // Workspace backs the Lite web editor (ADR 0025). When nil the editor's
    // filesystem API is not registered (Production, or Lite without a workspace).
    Workspace WorkspaceFS

    // MonacoDir is the directory holding the pinned Monaco bundle that
    // `leoflow setup` fetched; the editor page is served Monaco from it. Empty or
    // missing makes the page show a setup hint instead of a broken editor.
    MonacoDir string

    // ExamplesFS backs the IDE's "Download examples" button โ€” typically the
    // `embed.FS` shipped from the leoflow root package. Nil disables the button.
    ExamplesFS fs.FS

    // SchedulerHealth reports the scheduler's heartbeat for /monitor/health.
    // When nil the component reports healthy (single-process role assumption).
    SchedulerHealth Heartbeater

    // UI serves the embedded SPA. When nil the server is API-only.
    UI  UIServer
}

type ExecutorInfo

ExecutorInfo describes the control plane's execution capacity. It surfaces whether pod dispatch is available โ€” the cluster-level answer to "why is a task stuck queued" (#46/#47). The stock Airflow UI has no widget for it, but operators (curl/monitoring) and a future custom Cluster Activity view consume it. Cluster Activity in Airflow 3.2 is otherwise the Home dashboard, already backed by /api/v2/monitor/health (#33) and /ui/dashboard/* (#39).

type ExecutorInfo struct {
    PodDispatchEnabled    bool
    TaskNamespace         string
    AgentControlPlaneAddr string
    InlineConcurrency     int
}

type FavoriteStore

FavoriteStore persists per-user DAG favorites (the DAG-list star).

type FavoriteStore interface {
    AddFavorite(ctx context.Context, tenant, userID, dagID string) error
    RemoveFavorite(ctx context.Context, tenant, userID, dagID string) error
    FavoriteDagIDs(ctx context.Context, tenant, userID string) (map[string]bool, error)
}

type HealthChecker

HealthChecker reports dependency health for readiness checks.

type HealthChecker interface {
    Ping(ctx context.Context) error
}

type Heartbeater

Heartbeater reports a long-running component's liveness and last heartbeat for the monitor health endpoint. The scheduler implements it.

type Heartbeater interface {
    Heartbeat() (healthy bool, last time.Time)
}

type ImportErrorStore

ImportErrorStore reads and writes DAG parse/compile errors that back Airflow's "Import Errors" banner on the home dashboard. The `leoflow dev` watcher writes an entry on a failed compile and clears it on the next good compile; the public GET /api/v2/importErrors feed is what the UI polls.

type ImportErrorStore interface {
    ListImportErrors(ctx context.Context, tenant string) ([]domain.ImportError, error)
    SetImportError(ctx context.Context, tenant string, e domain.ImportError) error
    ClearImportError(ctx context.Context, tenant, filename string) error
}

type LogReader

LogReader streams a task attempt's stored logs and, for running tasks, tails new lines live.

type LogReader interface {
    ReadLogs(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (io.ReadCloser, error)
    Tail(ctx context.Context, tenant, dagID, runID, taskID string, tryNumber int) (<-chan string, func(), error)
}

type Metrics

Metrics records HTTP request metrics. observability.Metrics implements it.

type Metrics interface {
    RecordHTTPRequest(method, path string, status int, dur time.Duration)
}

type Problem

Problem is an RFC 7807 problem-details response body.

type Problem struct {
    Type     string `json:"type"`
    Title    string `json:"title"`
    Status   int    `json:"status"`
    Detail   string `json:"detail,omitempty"`
    Instance string `json:"instance,omitempty"`
}

type TaskInstanceRepository

TaskInstanceRepository reads task instances, clears them for re-run, and sets their state directly (the UI's mark-success/failed actions).

type TaskInstanceRepository interface {
    ListTaskInstances(ctx context.Context, tenant, dagID, runID string, limit, offset int) ([]domain.TaskInstance, int, error)
    // ListTaskInstanceAttempts returns every attempt for (run, task), oldest
    // first โ€” the current row UNIONed with the archived history. The UI's
    // /tries endpoint needs all attempts to render its navigable tabs.
    ListTaskInstanceAttempts(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.TaskInstance, error)
    ClearTaskInstances(ctx context.Context, tenant, dagID, runID string, taskIDs []string, onlyFailed, resetDagRun bool) (int, error)
    SetTaskInstanceState(ctx context.Context, tenant, dagID, runID, taskID, state string) error
}

type TaskSummaryReader

TaskSummaryReader fetches task instances across a set of runs of a DAG, the source for the grid's per-cell state summaries.

type TaskSummaryReader interface {
    TaskInstancesForRuns(ctx context.Context, tenant, dagID string, runIDs []string) ([]domain.TaskInstance, error)
}

type UIServer

UIServer serves the embedded single-page app: static assets and an index.html shell that the SPA's client-side router falls back to. It is satisfied by internal/ui.Server. When nil, the server runs API-only and unknown paths return 404 instead of the SPA shell.

type UIServer interface {
    StaticHandler() http.Handler
    Index(w http.ResponseWriter, basePath string)
}

type VariableStore

VariableStore reads and writes Airflow-style Variables for the Admin UI.

type VariableStore interface {
    ListVariables(ctx context.Context, tenant string, limit, offset int) ([]domain.Variable, int, error)
    GetVariable(ctx context.Context, tenant, key string) (domain.Variable, error)
    SetVariable(ctx context.Context, tenant string, v domain.Variable) error
    DeleteVariable(ctx context.Context, tenant, key string) error
}

type WorkspaceFS

WorkspaceFS is the workspace-confined filesystem backing the Lite web editor (ADR 0025). Every path is relative to the workspace root and confined to it.

type WorkspaceFS interface {
    Tree() ([]workspace.Entry, error)
    Read(rel string) ([]byte, error)
    Write(rel string, data []byte) error
    Create(rel string, dir bool) error
    Move(from, to string) error
    Delete(rel string) error
}

type XComReader

XComReader reads stored XCom values and lists a task instance's XCom keys for the read API.

type XComReader interface {
    GetXCom(ctx context.Context, tenant, dagID, runID, taskID, key string) (xcom.Entry, error)
    ListXComEntries(ctx context.Context, tenant, dagID, runID, taskID string) ([]domain.XComEntryMeta, error)
}

Generated by gomarkdoc