Skip to content

domain

import "github.com/neochaotic/leoflow/internal/domain"

Package domain defines the core Leoflow types (DAG, Task, project config) and validates them against the canonical JSON Schemas in docs/api.

Index

Constants

DefaultInlineMaxDurationSeconds is the fallback cap on inline http_api task duration when the server does not configure one. See ADR 0002.

const DefaultInlineMaxDurationSeconds = 300

Variables

ErrConflict is returned when a write conflicts with an existing resource (e.g. a duplicate dag run for the same logical date). The API maps it to 409.

var ErrConflict = errors.New("resource already exists")

ErrNotFound is returned when a requested resource does not exist.

var ErrNotFound = errors.New("resource not found")

func IsCronlessSchedule

func IsCronlessSchedule(expr string) bool

IsCronlessSchedule reports whether expr is empty (manual-only) or a recognized non-cron Airflow schedule. Such a schedule is valid but is never run on a cron, so callers skip cron handling for it without treating it as an error.

func IsOnceSchedule

func IsOnceSchedule(expr string) bool

IsOnceSchedule reports whether expr is Airflow's "@once" โ€” a DAG that runs exactly one time (on first scheduler sight) and never again.

type AuditLogEntry

AuditLogEntry is one recorded action against a resource โ€” the source for the UI's Audit Log table. ResourceID carries the DAG id for dag-scoped events.

type AuditLogEntry struct {
    ID           int64
    When         time.Time
    Action       string
    ResourceType string
    ResourceID   string
    Owner        string
    Extra        string
}

type BuildConfig

BuildConfig controls how the container image is built from the project.

type BuildConfig struct {
    Dockerfile string            `json:"dockerfile,omitempty" yaml:"dockerfile,omitempty"`
    Context    string            `json:"context,omitempty" yaml:"context,omitempty"`
    Platforms  []string          `json:"platforms,omitempty" yaml:"platforms,omitempty"`
    Labels     map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
}

type ConfigDefaults

ConfigDefaults holds task defaults applied to every task generated from the project at compile time.

type ConfigDefaults struct {
    Retries                 int               `json:"retries,omitempty" yaml:"retries,omitempty"`
    RetryDelaySeconds       int               `json:"retry_delay_seconds,omitempty" yaml:"retry_delay_seconds,omitempty"`
    ExecutionTimeoutSeconds int               `json:"execution_timeout_seconds,omitempty" yaml:"execution_timeout_seconds,omitempty"`
    Resources               *DefaultResources `json:"resources,omitempty" yaml:"resources,omitempty"`
}

type Connection

Connection is an Airflow-style connection: credentials/endpoints for operators, managed from the Admin UI. Password and Extra are encrypted at rest (ADR 0019); Password is write-only and never returned by the API.

type Connection struct {
    ConnID      string
    ConnType    string
    Host        string
    Schema      string
    Login       string
    Password    string
    Port        *int
    Extra       string
    Description string
}

type DAG

DAG is a registered DAG with its scheduling metadata (distinct from DAGSpec, which is the compiled artifact).

type DAG struct {
    DagID          string
    Description    string
    Owner          string
    Tags           []string
    Schedule       *string
    ScheduleTZ     string
    StartDate      *time.Time
    IsPaused       bool
    IsActive       bool
    MaxActiveRuns  int
    Catchup        bool
    LastParsedTime *time.Time
}

type DAGSpec

DAGSpec is the canonical serialized representation of a DAG consumed by the control plane. It mirrors docs/api/dag-schema.json.

type DAGSpec struct {
    SchemaVersion string       `json:"schema_version"`
    DagID         string       `json:"dag_id"`
    DagVersion    string       `json:"dag_version"`
    Image         string       `json:"image"`
    Description   string       `json:"description,omitempty"`
    Owner         string       `json:"owner,omitempty"`
    Tags          []string     `json:"tags,omitempty"`
    Schedule      *string      `json:"schedule,omitempty"`
    ScheduleTZ    string       `json:"schedule_timezone,omitempty"`
    StartDate     string       `json:"start_date,omitempty"`
    EndDate       *string      `json:"end_date,omitempty"`
    MaxActiveRuns int          `json:"max_active_runs,omitempty"`
    Catchup       bool         `json:"catchup,omitempty"`
    DefaultArgs   *DefaultArgs `json:"default_args,omitempty"`
    // Staging, when enabled, requests an ephemeral RWX volume shared by the run's
    // tasks at /staging (ADR 0022). nil/disabled means no staging volume.
    Staging *StagingConfig `json:"staging,omitempty"`
    Tasks   []TaskSpec     `json:"tasks"`
    // Source is the original dag.py text, captured at compile time so the UI's
    // Code tab can show the Python a human wrote (not the compiled spec). It is
    // part of the artifact: changing it produces a new version.
    Source string `json:"source,omitempty"`
}

func (*DAGSpec) CanonicalHash

func (d *DAGSpec) CanonicalHash() (string, error)

CanonicalHash returns the SHA-256 of the spec's canonical JSON encoding. Go's struct marshaling is deterministic (fixed field order, sorted map keys), so identical specs hash identically โ€” used to deduplicate DAG versions.

func (*DAGSpec) Validate

func (d *DAGSpec) Validate() error

Validate checks the DAGSpec against the canonical dag.json schema and returns a joined error describing every schema violation, or nil when valid.

func (*DAGSpec) ValidateInlineExecution

func (d *DAGSpec) ValidateInlineExecution(maxInlineSeconds int) error

ValidateInlineExecution rejects inline http_api tasks whose execution_timeout_seconds exceeds the server's inline duration cap. Such a task must declare execution_mode: pod. maxInlineSeconds is the server limit.

func (*DAGSpec) ValidateSchedule

func (d *DAGSpec) ValidateSchedule() error

ValidateSchedule checks that a DAG's cron schedule is parseable. An empty or absent schedule (manual-only) and the recognized non-cron Airflow schedules (@once, @continuous) are valid. A malformed cron expression โ€” a 4-field cron, a typo โ€” is rejected here so it fails loudly at compile time; otherwise the scheduler silently can't parse it and the DAG simply never runs, with no error surfaced anywhere (the worst failure mode). The parser is robfig/cron's ParseStandard, the same one the scheduler uses, so what validates here is exactly what the scheduler can run (see scheduler/cron.go).

type DagRun

DagRun is an execution of a DAG, identified by dag_id + run_id.

type DagRun struct {
    DagID       string
    RunID       string
    LogicalDate time.Time
    State       DagRunState
    RunType     string
    QueuedAt    time.Time
    StartedAt   *time.Time
    EndedAt     *time.Time
    Note        string
}

type DagRunState

DagRunState is the lifecycle state of a DagRun. The values mirror the dag_run_state enum in the database (migration 003).

type DagRunState string

DAG run lifecycle states.

const (
    // DagRunStateQueued means the run has been created but not started.
    DagRunStateQueued DagRunState = "queued"
    // DagRunStateRunning means at least one task instance is active.
    DagRunStateRunning DagRunState = "running"
    // DagRunStateSuccess means every leaf task reached a successful terminal state.
    DagRunStateSuccess DagRunState = "success"
    // DagRunStateFailed means the run finished with at least one failure.
    DagRunStateFailed DagRunState = "failed"
)

func (DagRunState) IsTerminal

func (s DagRunState) IsTerminal() bool

IsTerminal reports whether the dag run state is final.

type DagStats

DagStats holds the home dashboard's DAG counters: the number of active DAGs and how many have a latest run in each state.

type DagStats struct {
    Active  int
    Failed  int
    Running int
    Queued  int
}

type DagVersion

DagVersion is a registered version of a DAG. VersionNumber is the 1-based ordinal the UI uses (the stored version label is free-form).

type DagVersion struct {
    ID            string
    VersionNumber int
    CreatedAt     time.Time
    // Version is the deployment label that produced this snapshot: a git describe
    // (tag/SHA) in production, or "dev-<timestamp>" in dev. It is the stable
    // per-deployment identifier under a stable dag_id.
    Version string
}

type DefaultArgs

DefaultArgs holds retry and timeout defaults applied to every task in a DAG.

type DefaultArgs struct {
    Retries                 int `json:"retries,omitempty"`
    RetryDelaySeconds       int `json:"retry_delay_seconds,omitempty"`
    ExecutionTimeoutSeconds int `json:"execution_timeout_seconds,omitempty"`
}

type DefaultResources

DefaultResources expresses default CPU and memory for generated tasks.

type DefaultResources struct {
    CPU    string `json:"cpu,omitempty" yaml:"cpu,omitempty"`
    Memory string `json:"memory,omitempty" yaml:"memory,omitempty"`
}

type Execution

Execution carries executor-specific placement hints for a task.

type Execution struct {
    NodeSelector    map[string]string `json:"node_selector,omitempty" yaml:"node_selector,omitempty"`
    Tolerations     []map[string]any  `json:"tolerations,omitempty" yaml:"tolerations,omitempty"`
    ServiceAccount  string            `json:"service_account,omitempty" yaml:"service_account,omitempty"`
    ImagePullPolicy string            `json:"image_pull_policy,omitempty" yaml:"image_pull_policy,omitempty"`
}

type ExecutionMode

ExecutionMode selects how a task runs. It is only meaningful for http_api tasks; python and bash tasks always run in a pod.

type ExecutionMode string

Supported execution modes. See docs/api/dag-schema.json and ADR 0002.

const (
    // ExecutionModeInline runs an http_api task as a goroutine in the control
    // plane, capped at the server's inline duration limit.
    ExecutionModeInline ExecutionMode = "inline"
    // ExecutionModePod runs a task inside a worker pod via the agent.
    ExecutionModePod ExecutionMode = "pod"
)

type HTTPRequest

HTTPRequest is the request executed directly by the control plane for http_api tasks.

type HTTPRequest struct {
    Method             string            `json:"method"`
    URL                string            `json:"url"`
    Headers            map[string]string `json:"headers,omitempty"`
    Body               any               `json:"body,omitempty"`
    TimeoutSeconds     int               `json:"timeout_seconds,omitempty"`
    SuccessStatusCodes []int             `json:"success_status_codes,omitempty"`
}

type HistoricalMetrics

HistoricalMetrics holds run- and task-instance counts grouped by state over a time window, keyed by the Leoflow state name (e.g. "success", "up_for_retry").

type HistoricalMetrics struct {
    RunStates map[string]int
    TIStates  map[string]int
}

type ImportError

ImportError is a DAG parse/compile failure surfaced as Airflow's "Import Errors" banner on the home dashboard. It is keyed by Filename; a successful re-import of the same file clears it. The `leoflow dev` watcher writes these on a failed compile and removes them on the next good compile.

type ImportError struct {
    // ID is the stable identifier of the error record.
    ID  string
    // Filename is the DAG source path that failed to import.
    Filename string
    // StackTrace is the human-readable parse/compile error (traceback).
    StackTrace string
    // BundleName is the originating bundle (empty when unknown).
    BundleName string
    // Timestamp is when the error was recorded.
    Timestamp time.Time
}

type LeoflowConfig

LeoflowConfig is the developer-facing project configuration parsed from leoflow.yaml. It mirrors docs/api/leoflow-yaml-schema.json and is consumed by `leoflow compile` to build an image and emit a DAGSpec.

type LeoflowConfig struct {
    SchemaVersion  string          `json:"schema_version,omitempty" yaml:"schema_version,omitempty"`
    DagID          string          `json:"dag_id" yaml:"dag_id"`
    Description    string          `json:"description,omitempty" yaml:"description,omitempty"`
    Owner          string          `json:"owner,omitempty" yaml:"owner,omitempty"`
    Tags           []string        `json:"tags,omitempty" yaml:"tags,omitempty"`
    PythonVersion  string          `json:"python_version,omitempty" yaml:"python_version,omitempty"`
    BaseImage      string          `json:"base_image,omitempty" yaml:"base_image,omitempty"`
    Dependencies   []string        `json:"dependencies,omitempty" yaml:"dependencies,omitempty"`
    SystemPackages []string        `json:"system_packages,omitempty" yaml:"system_packages,omitempty"`
    DagSource      string          `json:"dag_source,omitempty" yaml:"dag_source,omitempty"`
    IncludePaths   []string        `json:"include_paths,omitempty" yaml:"include_paths,omitempty"`
    ExcludePaths   []string        `json:"exclude_paths,omitempty" yaml:"exclude_paths,omitempty"`
    Build          *BuildConfig    `json:"build,omitempty" yaml:"build,omitempty"`
    Registry       *RegistryConfig `json:"registry,omitempty" yaml:"registry,omitempty"`
    Defaults       *ConfigDefaults `json:"defaults,omitempty" yaml:"defaults,omitempty"`
    // Staging requests the opt-in per-DAG-run shared volume (ADR 0022). It is a
    // Leoflow deployment concern (not an Airflow DAG attribute), so it lives in
    // leoflow.yaml and the compiler overlays it onto the produced dag.json.
    Staging *StagingConfig `json:"staging,omitempty" yaml:"staging,omitempty"`
    // Tasks holds per-task overrides bound by task_id (ADR 0023). Each entry's
    // key must match a task_id in the compiled DAG; the compiler errors on an
    // unknown id rather than silently dropping it.
    Tasks map[string]*TaskConfig `json:"tasks,omitempty" yaml:"tasks,omitempty"`
}

func (*LeoflowConfig) ApplyDefaults

func (c *LeoflowConfig) ApplyDefaults()

ApplyDefaults fills zero-valued fields with the defaults declared in the canonical JSON Schema (internal/domain/schemas/leoflow-yaml-schema.json). Explicit user-set values are preserved; nested structs (Build, Registry) are instantiated when nil so their own defaults can be applied. The method is idempotent: a second call after the first is a no-op.

Centralizing defaults here (instead of scattered `if x == ""` fallbacks at each consumer) is what lets the multi-DAG workspace synthesize a working config when a subdir ships no leoflow.yaml, while keeping the resolved values debuggable from one place.

func (*LeoflowConfig) Validate

func (c *LeoflowConfig) Validate() error

Validate checks the LeoflowConfig against the canonical leoflow.yaml schema and returns a joined error describing every violation, or nil when valid.

type RegistryConfig

RegistryConfig describes where the built image is pushed and how it is tagged.

type RegistryConfig struct {
    URL         string `json:"url,omitempty" yaml:"url,omitempty"`
    AuthMethod  string `json:"auth_method,omitempty" yaml:"auth_method,omitempty"`
    ImageName   string `json:"image_name,omitempty" yaml:"image_name,omitempty"`
    TagStrategy string `json:"tag_strategy,omitempty" yaml:"tag_strategy,omitempty"`
}

type ResourceQuantity

ResourceQuantity expresses CPU and memory in Kubernetes notation.

type ResourceQuantity struct {
    CPU    string `json:"cpu,omitempty" yaml:"cpu,omitempty"`
    Memory string `json:"memory,omitempty" yaml:"memory,omitempty"`
}

type Resources

Resources holds Kubernetes-style resource requests and limits for a task.

type Resources struct {
    Requests *ResourceQuantity `json:"requests,omitempty" yaml:"requests,omitempty"`
    Limits   *ResourceQuantity `json:"limits,omitempty" yaml:"limits,omitempty"`
}

type Secret

Secret references a credential injected into the worker at run time.

type Secret struct {
    Name      string `json:"name"`
    Source    string `json:"source"`
    Reference string `json:"reference,omitempty"`
}

type StagingConfig

StagingConfig is the opt-in per-DAG-run shared staging volume (ADR 0022). Size is a Kubernetes quantity (e.g. "5Gi"); StorageClass empty uses the cluster default RWX class.

type StagingConfig struct {
    Enabled      bool   `json:"enabled" yaml:"enabled"`
    Size         string `json:"size,omitempty" yaml:"size,omitempty"`
    StorageClass string `json:"storage_class,omitempty" yaml:"storage_class,omitempty"`
}

type StagingVolumeState

StagingVolumeState is a tracked per-run staging volume joined with its DAG run's state, used by the GC to decide deletion (ADR 0022). RunState is empty when the run row is gone (orphan); RunEndedAt is the run's terminal time, used for the post-terminal TTL on failed runs.

type StagingVolumeState struct {
    // PVCName is the staging PersistentVolumeClaim's name.
    PVCName string
    // RunState is the DAG run's state ("success", "failed", "running", โ€ฆ), or
    // empty when the run no longer exists.
    RunState string
    // RunEndedAt is when the run reached a terminal state, if known.
    RunEndedAt *time.Time
    // CreatedAt is when the volume was provisioned. The GC never deletes a volume
    // younger than the TTL when its run cannot be resolved, so a lookup miss can
    // never reclaim an active run's fresh volume.
    CreatedAt time.Time
}

type TaskConfig

TaskConfig holds the leoflow.yaml per-task overrides bound by task_id (ADR 0023). Every field is optional; a set field overrides the value compiled from the DAG (most specific wins: task override > DAG default_args). These are Leoflow deployment concerns, not Airflow operator attributes.

type TaskConfig struct {
    Retries                 *int              `json:"retries,omitempty" yaml:"retries,omitempty"`
    RetryDelaySeconds       *int              `json:"retry_delay_seconds,omitempty" yaml:"retry_delay_seconds,omitempty"`
    ExecutionTimeoutSeconds *int              `json:"execution_timeout_seconds,omitempty" yaml:"execution_timeout_seconds,omitempty"`
    Env                     map[string]string `json:"env,omitempty" yaml:"env,omitempty"`
    Resources               *Resources        `json:"resources,omitempty" yaml:"resources,omitempty"`
    Execution               *Execution        `json:"execution,omitempty" yaml:"execution,omitempty"`
}

type TaskInstance

TaskInstance is an execution of a task within a DagRun.

type TaskInstance struct {
    DagID     string
    RunID     string
    TaskID    string
    MapIndex  int
    TryNumber int
    MaxTries  int
    State     TaskState
    Operator  string
    // ScheduledAt and QueuedAt record when the instance first entered the
    // scheduled and queued states (Airflow's scheduled_when / queued_when).
    ScheduledAt *time.Time
    QueuedAt    *time.Time
    StartedAt   *time.Time
    EndedAt     *time.Time
    Duration    *float64
    Hostname    string
    // Note is operational context shown in the UI's task panel โ€” e.g. why a task
    // is queued but not running (no executor available).
    Note string
}

type TaskSpec

TaskSpec describes a single unit of work within a DAG.

type TaskSpec struct {
    TaskID                  string              `json:"task_id"`
    Type                    TaskType            `json:"type"`
    DependsOn               []string            `json:"depends_on,omitempty"`
    TriggerRule             TriggerRule         `json:"trigger_rule,omitempty"`
    Retries                 *int                `json:"retries,omitempty"`
    RetryDelaySeconds       *int                `json:"retry_delay_seconds,omitempty"`
    ExecutionTimeoutSeconds *int                `json:"execution_timeout_seconds,omitempty"`
    ExecutionMode           ExecutionMode       `json:"execution_mode,omitempty"`
    Entrypoint              string              `json:"entrypoint,omitempty"`
    HTTPRequest             *HTTPRequest        `json:"http_request,omitempty"`
    Env                     map[string]string   `json:"env,omitempty"`
    Secrets                 []Secret            `json:"secrets,omitempty"`
    Resources               *Resources          `json:"resources,omitempty"`
    Execution               *Execution          `json:"execution,omitempty"`
    XComInput               map[string][]string `json:"xcom_input,omitempty"`
    XComSchema              map[string]any      `json:"xcom_schema,omitempty"`
    // CallArgs carries TaskFlow literal call arguments captured at compile time
    // (#115). The agent serializes the whole map as a single env var
    // LEOFLOW_CALL_ARGS_JSON; the runtime decodes and delivers each value to
    // the user function. XCom upstreams take precedence at runtime over a
    // same-name literal (the deterministic merge owned by leoflow_runtime).
    // Named call_args (not params) to leave the term free for Airflow's
    // DAG-run params semantic (#148).
    CallArgs map[string]any `json:"call_args,omitempty"`
}

func (TaskSpec) EffectiveExecutionMode

func (t TaskSpec) EffectiveExecutionMode() ExecutionMode

EffectiveExecutionMode returns the task's execution mode, applying the defaults: http_api tasks default to inline, every other type runs in a pod.

type TaskState

TaskState is the lifecycle state of a TaskInstance. The values mirror the task_state enum in the database (migration 003).

type TaskState string

Task lifecycle states.

const (
    // TaskStateNone is the initial state: the task has not been considered yet.
    TaskStateNone TaskState = "none"
    // TaskStateScheduled means dependencies are satisfied and the task is queued for dispatch.
    TaskStateScheduled TaskState = "scheduled"
    // TaskStateQueued means the executor has been asked to start the task.
    TaskStateQueued TaskState = "queued"
    // TaskStateRunning means the task is executing.
    TaskStateRunning TaskState = "running"
    // TaskStateSuccess means the task finished successfully.
    TaskStateSuccess TaskState = "success"
    // TaskStateFailed means the task finished with an error.
    TaskStateFailed TaskState = "failed"
    // TaskStateSkipped means the task was deliberately not run.
    TaskStateSkipped TaskState = "skipped"
    // TaskStateUpstreamFailed means a required upstream failed, so the task cannot run.
    TaskStateUpstreamFailed TaskState = "upstream_failed"
    // TaskStateUpForRetry means the task failed but has retries remaining.
    TaskStateUpForRetry TaskState = "up_for_retry"
)

func (TaskState) IsTerminal

func (s TaskState) IsTerminal() bool

IsTerminal reports whether the task state is final (no further automatic transitions occur from it).

type TaskType

TaskType enumerates the kinds of work a task can perform.

type TaskType string

Supported task types. See docs/api/dag-schema.json.

const (
    // TaskTypePython runs a Python callable identified by an entrypoint.
    TaskTypePython TaskType = "python"
    // TaskTypeBash runs a shell command supplied as the entrypoint.
    TaskTypeBash TaskType = "bash"
    // TaskTypeHTTPAPI performs an outbound HTTP request from the control plane.
    TaskTypeHTTPAPI TaskType = "http_api"
)

type TriggerRule

TriggerRule decides whether a task runs based on its upstreams' states.

type TriggerRule string

Supported trigger rules for the MVP. See docs/api/dag-schema.json.

const (
    // TriggerRuleAllSuccess runs when every upstream succeeded (default).
    TriggerRuleAllSuccess TriggerRule = "all_success"
    // TriggerRuleAllFailed runs when every upstream failed.
    TriggerRuleAllFailed TriggerRule = "all_failed"
    // TriggerRuleAllDone runs once every upstream finished, regardless of state.
    TriggerRuleAllDone TriggerRule = "all_done"
    // TriggerRuleOneSuccess runs as soon as one upstream succeeds.
    TriggerRuleOneSuccess TriggerRule = "one_success"
    // TriggerRuleOneFailed runs as soon as one upstream fails.
    TriggerRuleOneFailed TriggerRule = "one_failed"
)

type Variable

Variable is a tenant-scoped key/value setting consumed by DAGs and managed from the Admin UI. Value is stored as-is (plaintext for the MVP); the API masks values of secret-ish keys.

type Variable struct {
    Key         string
    Value       string
    Description string
}

type XComEntryMeta

XComEntryMeta is the metadata for one stored XCom value (without the value payload) โ€” the source for a task instance's XCom list. Leoflow XComs are unmapped, so MapIndex is -1.

type XComEntryMeta struct {
    Key       string
    Timestamp time.Time
    MapIndex  int
}

Generated by gomarkdoc