domain¶
Package domain defines the core Leoflow types (DAG, Task, project config) and validates them against the canonical JSON Schemas in docs/api.
Index¶
- Constants
- Variables
- func IsCronlessSchedule(expr string) bool
- func IsOnceSchedule(expr string) bool
- type AuditLogEntry
- type BuildConfig
- type ConfigDefaults
- type Connection
- type DAG
- type DAGSpec
- func (d *DAGSpec) CanonicalHash() (string, error)
- func (d *DAGSpec) Validate() error
- func (d *DAGSpec) ValidateInlineExecution(maxInlineSeconds int) error
- func (d *DAGSpec) ValidateSchedule() error
- type DagRun
- type DagRunState
- func (s DagRunState) IsTerminal() bool
- type DagStats
- type DagVersion
- type DefaultArgs
- type DefaultResources
- type Execution
- type ExecutionMode
- type HTTPRequest
- type HistoricalMetrics
- type ImportError
- type LeoflowConfig
- func (c *LeoflowConfig) ApplyDefaults()
- func (c *LeoflowConfig) Validate() error
- type RegistryConfig
- type ResourceQuantity
- type Resources
- type Secret
- type StagingConfig
- type StagingVolumeState
- type TaskConfig
- type TaskInstance
- type TaskSpec
- func (t TaskSpec) EffectiveExecutionMode() ExecutionMode
- type TaskState
- func (s TaskState) IsTerminal() bool
- type TaskType
- type TriggerRule
- type Variable
- type XComEntryMeta
Constants¶
DefaultInlineMaxDurationSeconds is the fallback cap on inline http_api task duration when the server does not configure one. See ADR 0002.
Variables¶
ErrConflict is returned when a write conflicts with an existing resource (e.g. a duplicate dag run for the same logical date). The API maps it to 409.
ErrNotFound is returned when a requested resource does not exist.
func IsCronlessSchedule¶
IsCronlessSchedule reports whether expr is empty (manual-only) or a recognized non-cron Airflow schedule. Such a schedule is valid but is never run on a cron, so callers skip cron handling for it without treating it as an error.
func IsOnceSchedule¶
IsOnceSchedule reports whether expr is Airflow's "@once" โ a DAG that runs exactly one time (on first scheduler sight) and never again.
type AuditLogEntry¶
AuditLogEntry is one recorded action against a resource โ the source for the UI's Audit Log table. ResourceID carries the DAG id for dag-scoped events.
type AuditLogEntry struct {
ID int64
When time.Time
Action string
ResourceType string
ResourceID string
Owner string
Extra string
}
type BuildConfig¶
BuildConfig controls how the container image is built from the project.
type BuildConfig struct {
Dockerfile string `json:"dockerfile,omitempty" yaml:"dockerfile,omitempty"`
Context string `json:"context,omitempty" yaml:"context,omitempty"`
Platforms []string `json:"platforms,omitempty" yaml:"platforms,omitempty"`
Labels map[string]string `json:"labels,omitempty" yaml:"labels,omitempty"`
}
type ConfigDefaults¶
ConfigDefaults holds task defaults applied to every task generated from the project at compile time.
type ConfigDefaults struct {
Retries int `json:"retries,omitempty" yaml:"retries,omitempty"`
RetryDelaySeconds int `json:"retry_delay_seconds,omitempty" yaml:"retry_delay_seconds,omitempty"`
ExecutionTimeoutSeconds int `json:"execution_timeout_seconds,omitempty" yaml:"execution_timeout_seconds,omitempty"`
Resources *DefaultResources `json:"resources,omitempty" yaml:"resources,omitempty"`
}
type Connection¶
Connection is an Airflow-style connection: credentials/endpoints for operators, managed from the Admin UI. Password and Extra are encrypted at rest (ADR 0019); Password is write-only and never returned by the API.
type Connection struct {
ConnID string
ConnType string
Host string
Schema string
Login string
Password string
Port *int
Extra string
Description string
}
type DAG¶
DAG is a registered DAG with its scheduling metadata (distinct from DAGSpec, which is the compiled artifact).
type DAG struct {
DagID string
Description string
Owner string
Tags []string
Schedule *string
ScheduleTZ string
StartDate *time.Time
IsPaused bool
IsActive bool
MaxActiveRuns int
Catchup bool
LastParsedTime *time.Time
}
type DAGSpec¶
DAGSpec is the canonical serialized representation of a DAG consumed by the control plane. It mirrors docs/api/dag-schema.json.
type DAGSpec struct {
SchemaVersion string `json:"schema_version"`
DagID string `json:"dag_id"`
DagVersion string `json:"dag_version"`
Image string `json:"image"`
Description string `json:"description,omitempty"`
Owner string `json:"owner,omitempty"`
Tags []string `json:"tags,omitempty"`
Schedule *string `json:"schedule,omitempty"`
ScheduleTZ string `json:"schedule_timezone,omitempty"`
StartDate string `json:"start_date,omitempty"`
EndDate *string `json:"end_date,omitempty"`
MaxActiveRuns int `json:"max_active_runs,omitempty"`
Catchup bool `json:"catchup,omitempty"`
DefaultArgs *DefaultArgs `json:"default_args,omitempty"`
// Staging, when enabled, requests an ephemeral RWX volume shared by the run's
// tasks at /staging (ADR 0022). nil/disabled means no staging volume.
Staging *StagingConfig `json:"staging,omitempty"`
Tasks []TaskSpec `json:"tasks"`
// Source is the original dag.py text, captured at compile time so the UI's
// Code tab can show the Python a human wrote (not the compiled spec). It is
// part of the artifact: changing it produces a new version.
Source string `json:"source,omitempty"`
}
func (*DAGSpec) CanonicalHash¶
CanonicalHash returns the SHA-256 of the spec's canonical JSON encoding. Go's struct marshaling is deterministic (fixed field order, sorted map keys), so identical specs hash identically โ used to deduplicate DAG versions.
func (*DAGSpec) Validate¶
Validate checks the DAGSpec against the canonical dag.json schema and returns a joined error describing every schema violation, or nil when valid.
func (*DAGSpec) ValidateInlineExecution¶
ValidateInlineExecution rejects inline http_api tasks whose execution_timeout_seconds exceeds the server's inline duration cap. Such a task must declare execution_mode: pod. maxInlineSeconds is the server limit.
func (*DAGSpec) ValidateSchedule¶
ValidateSchedule checks that a DAG's cron schedule is parseable. An empty or absent schedule (manual-only) and the recognized non-cron Airflow schedules (@once, @continuous) are valid. A malformed cron expression โ a 4-field cron, a typo โ is rejected here so it fails loudly at compile time; otherwise the scheduler silently can't parse it and the DAG simply never runs, with no error surfaced anywhere (the worst failure mode). The parser is robfig/cron's ParseStandard, the same one the scheduler uses, so what validates here is exactly what the scheduler can run (see scheduler/cron.go).
type DagRun¶
DagRun is an execution of a DAG, identified by dag_id + run_id.
type DagRun struct {
DagID string
RunID string
LogicalDate time.Time
State DagRunState
RunType string
QueuedAt time.Time
StartedAt *time.Time
EndedAt *time.Time
Note string
}
type DagRunState¶
DagRunState is the lifecycle state of a DagRun. The values mirror the dag_run_state enum in the database (migration 003).
const (
// DagRunStateQueued means the run has been created but not started.
DagRunStateQueued DagRunState = "queued"
// DagRunStateRunning means at least one task instance is active.
DagRunStateRunning DagRunState = "running"
// DagRunStateSuccess means every leaf task reached a successful terminal state.
DagRunStateSuccess DagRunState = "success"
// DagRunStateFailed means the run finished with at least one failure.
DagRunStateFailed DagRunState = "failed"
)
func (DagRunState) IsTerminal¶
IsTerminal reports whether the dag run state is final.
type DagStats¶
DagStats holds the home dashboard's DAG counters: the number of active DAGs and how many have a latest run in each state.
type DagVersion¶
DagVersion is a registered version of a DAG. VersionNumber is the 1-based ordinal the UI uses (the stored version label is free-form).
type DagVersion struct {
ID string
VersionNumber int
CreatedAt time.Time
// Version is the deployment label that produced this snapshot: a git describe
// (tag/SHA) in production, or "dev-<timestamp>" in dev. It is the stable
// per-deployment identifier under a stable dag_id.
Version string
}
type DefaultArgs¶
DefaultArgs holds retry and timeout defaults applied to every task in a DAG.
type DefaultArgs struct {
Retries int `json:"retries,omitempty"`
RetryDelaySeconds int `json:"retry_delay_seconds,omitempty"`
ExecutionTimeoutSeconds int `json:"execution_timeout_seconds,omitempty"`
}
type DefaultResources¶
DefaultResources expresses default CPU and memory for generated tasks.
type DefaultResources struct {
CPU string `json:"cpu,omitempty" yaml:"cpu,omitempty"`
Memory string `json:"memory,omitempty" yaml:"memory,omitempty"`
}
type Execution¶
Execution carries executor-specific placement hints for a task.
type Execution struct {
NodeSelector map[string]string `json:"node_selector,omitempty" yaml:"node_selector,omitempty"`
Tolerations []map[string]any `json:"tolerations,omitempty" yaml:"tolerations,omitempty"`
ServiceAccount string `json:"service_account,omitempty" yaml:"service_account,omitempty"`
ImagePullPolicy string `json:"image_pull_policy,omitempty" yaml:"image_pull_policy,omitempty"`
}
type ExecutionMode¶
ExecutionMode selects how a task runs. It is only meaningful for http_api tasks; python and bash tasks always run in a pod.
Supported execution modes. See docs/api/dag-schema.json and ADR 0002.
const (
// ExecutionModeInline runs an http_api task as a goroutine in the control
// plane, capped at the server's inline duration limit.
ExecutionModeInline ExecutionMode = "inline"
// ExecutionModePod runs a task inside a worker pod via the agent.
ExecutionModePod ExecutionMode = "pod"
)
type HTTPRequest¶
HTTPRequest is the request executed directly by the control plane for http_api tasks.
type HTTPRequest struct {
Method string `json:"method"`
URL string `json:"url"`
Headers map[string]string `json:"headers,omitempty"`
Body any `json:"body,omitempty"`
TimeoutSeconds int `json:"timeout_seconds,omitempty"`
SuccessStatusCodes []int `json:"success_status_codes,omitempty"`
}
type HistoricalMetrics¶
HistoricalMetrics holds run- and task-instance counts grouped by state over a time window, keyed by the Leoflow state name (e.g. "success", "up_for_retry").
type ImportError¶
ImportError is a DAG parse/compile failure surfaced as Airflow's "Import Errors" banner on the home dashboard. It is keyed by Filename; a successful re-import of the same file clears it. The `leoflow dev` watcher writes these on a failed compile and removes them on the next good compile.
type ImportError struct {
// ID is the stable identifier of the error record.
ID string
// Filename is the DAG source path that failed to import.
Filename string
// StackTrace is the human-readable parse/compile error (traceback).
StackTrace string
// BundleName is the originating bundle (empty when unknown).
BundleName string
// Timestamp is when the error was recorded.
Timestamp time.Time
}
type LeoflowConfig¶
LeoflowConfig is the developer-facing project configuration parsed from leoflow.yaml. It mirrors docs/api/leoflow-yaml-schema.json and is consumed by `leoflow compile` to build an image and emit a DAGSpec.
type LeoflowConfig struct {
SchemaVersion string `json:"schema_version,omitempty" yaml:"schema_version,omitempty"`
DagID string `json:"dag_id" yaml:"dag_id"`
Description string `json:"description,omitempty" yaml:"description,omitempty"`
Owner string `json:"owner,omitempty" yaml:"owner,omitempty"`
Tags []string `json:"tags,omitempty" yaml:"tags,omitempty"`
PythonVersion string `json:"python_version,omitempty" yaml:"python_version,omitempty"`
BaseImage string `json:"base_image,omitempty" yaml:"base_image,omitempty"`
Dependencies []string `json:"dependencies,omitempty" yaml:"dependencies,omitempty"`
SystemPackages []string `json:"system_packages,omitempty" yaml:"system_packages,omitempty"`
DagSource string `json:"dag_source,omitempty" yaml:"dag_source,omitempty"`
IncludePaths []string `json:"include_paths,omitempty" yaml:"include_paths,omitempty"`
ExcludePaths []string `json:"exclude_paths,omitempty" yaml:"exclude_paths,omitempty"`
Build *BuildConfig `json:"build,omitempty" yaml:"build,omitempty"`
Registry *RegistryConfig `json:"registry,omitempty" yaml:"registry,omitempty"`
Defaults *ConfigDefaults `json:"defaults,omitempty" yaml:"defaults,omitempty"`
// Staging requests the opt-in per-DAG-run shared volume (ADR 0022). It is a
// Leoflow deployment concern (not an Airflow DAG attribute), so it lives in
// leoflow.yaml and the compiler overlays it onto the produced dag.json.
Staging *StagingConfig `json:"staging,omitempty" yaml:"staging,omitempty"`
// Tasks holds per-task overrides bound by task_id (ADR 0023). Each entry's
// key must match a task_id in the compiled DAG; the compiler errors on an
// unknown id rather than silently dropping it.
Tasks map[string]*TaskConfig `json:"tasks,omitempty" yaml:"tasks,omitempty"`
}
func (*LeoflowConfig) ApplyDefaults¶
ApplyDefaults fills zero-valued fields with the defaults declared in the canonical JSON Schema (internal/domain/schemas/leoflow-yaml-schema.json). Explicit user-set values are preserved; nested structs (Build, Registry) are instantiated when nil so their own defaults can be applied. The method is idempotent: a second call after the first is a no-op.
Centralizing defaults here (instead of scattered `if x == ""` fallbacks at each consumer) is what lets the multi-DAG workspace synthesize a working config when a subdir ships no leoflow.yaml, while keeping the resolved values debuggable from one place.
func (*LeoflowConfig) Validate¶
Validate checks the LeoflowConfig against the canonical leoflow.yaml schema and returns a joined error describing every violation, or nil when valid.
type RegistryConfig¶
RegistryConfig describes where the built image is pushed and how it is tagged.
type RegistryConfig struct {
URL string `json:"url,omitempty" yaml:"url,omitempty"`
AuthMethod string `json:"auth_method,omitempty" yaml:"auth_method,omitempty"`
ImageName string `json:"image_name,omitempty" yaml:"image_name,omitempty"`
TagStrategy string `json:"tag_strategy,omitempty" yaml:"tag_strategy,omitempty"`
}
type ResourceQuantity¶
ResourceQuantity expresses CPU and memory in Kubernetes notation.
type ResourceQuantity struct {
CPU string `json:"cpu,omitempty" yaml:"cpu,omitempty"`
Memory string `json:"memory,omitempty" yaml:"memory,omitempty"`
}
type Resources¶
Resources holds Kubernetes-style resource requests and limits for a task.
type Resources struct {
Requests *ResourceQuantity `json:"requests,omitempty" yaml:"requests,omitempty"`
Limits *ResourceQuantity `json:"limits,omitempty" yaml:"limits,omitempty"`
}
type Secret¶
Secret references a credential injected into the worker at run time.
type Secret struct {
Name string `json:"name"`
Source string `json:"source"`
Reference string `json:"reference,omitempty"`
}
type StagingConfig¶
StagingConfig is the opt-in per-DAG-run shared staging volume (ADR 0022). Size is a Kubernetes quantity (e.g. "5Gi"); StorageClass empty uses the cluster default RWX class.
type StagingConfig struct {
Enabled bool `json:"enabled" yaml:"enabled"`
Size string `json:"size,omitempty" yaml:"size,omitempty"`
StorageClass string `json:"storage_class,omitempty" yaml:"storage_class,omitempty"`
}
type StagingVolumeState¶
StagingVolumeState is a tracked per-run staging volume joined with its DAG run's state, used by the GC to decide deletion (ADR 0022). RunState is empty when the run row is gone (orphan); RunEndedAt is the run's terminal time, used for the post-terminal TTL on failed runs.
type StagingVolumeState struct {
// PVCName is the staging PersistentVolumeClaim's name.
PVCName string
// RunState is the DAG run's state ("success", "failed", "running", โฆ), or
// empty when the run no longer exists.
RunState string
// RunEndedAt is when the run reached a terminal state, if known.
RunEndedAt *time.Time
// CreatedAt is when the volume was provisioned. The GC never deletes a volume
// younger than the TTL when its run cannot be resolved, so a lookup miss can
// never reclaim an active run's fresh volume.
CreatedAt time.Time
}
type TaskConfig¶
TaskConfig holds the leoflow.yaml per-task overrides bound by task_id (ADR 0023). Every field is optional; a set field overrides the value compiled from the DAG (most specific wins: task override > DAG default_args). These are Leoflow deployment concerns, not Airflow operator attributes.
type TaskConfig struct {
Retries *int `json:"retries,omitempty" yaml:"retries,omitempty"`
RetryDelaySeconds *int `json:"retry_delay_seconds,omitempty" yaml:"retry_delay_seconds,omitempty"`
ExecutionTimeoutSeconds *int `json:"execution_timeout_seconds,omitempty" yaml:"execution_timeout_seconds,omitempty"`
Env map[string]string `json:"env,omitempty" yaml:"env,omitempty"`
Resources *Resources `json:"resources,omitempty" yaml:"resources,omitempty"`
Execution *Execution `json:"execution,omitempty" yaml:"execution,omitempty"`
}
type TaskInstance¶
TaskInstance is an execution of a task within a DagRun.
type TaskInstance struct {
DagID string
RunID string
TaskID string
MapIndex int
TryNumber int
MaxTries int
State TaskState
Operator string
// ScheduledAt and QueuedAt record when the instance first entered the
// scheduled and queued states (Airflow's scheduled_when / queued_when).
ScheduledAt *time.Time
QueuedAt *time.Time
StartedAt *time.Time
EndedAt *time.Time
Duration *float64
Hostname string
// Note is operational context shown in the UI's task panel โ e.g. why a task
// is queued but not running (no executor available).
Note string
}
type TaskSpec¶
TaskSpec describes a single unit of work within a DAG.
type TaskSpec struct {
TaskID string `json:"task_id"`
Type TaskType `json:"type"`
DependsOn []string `json:"depends_on,omitempty"`
TriggerRule TriggerRule `json:"trigger_rule,omitempty"`
Retries *int `json:"retries,omitempty"`
RetryDelaySeconds *int `json:"retry_delay_seconds,omitempty"`
ExecutionTimeoutSeconds *int `json:"execution_timeout_seconds,omitempty"`
ExecutionMode ExecutionMode `json:"execution_mode,omitempty"`
Entrypoint string `json:"entrypoint,omitempty"`
HTTPRequest *HTTPRequest `json:"http_request,omitempty"`
Env map[string]string `json:"env,omitempty"`
Secrets []Secret `json:"secrets,omitempty"`
Resources *Resources `json:"resources,omitempty"`
Execution *Execution `json:"execution,omitempty"`
XComInput map[string][]string `json:"xcom_input,omitempty"`
XComSchema map[string]any `json:"xcom_schema,omitempty"`
// CallArgs carries TaskFlow literal call arguments captured at compile time
// (#115). The agent serializes the whole map as a single env var
// LEOFLOW_CALL_ARGS_JSON; the runtime decodes and delivers each value to
// the user function. XCom upstreams take precedence at runtime over a
// same-name literal (the deterministic merge owned by leoflow_runtime).
// Named call_args (not params) to leave the term free for Airflow's
// DAG-run params semantic (#148).
CallArgs map[string]any `json:"call_args,omitempty"`
}
func (TaskSpec) EffectiveExecutionMode¶
EffectiveExecutionMode returns the task's execution mode, applying the defaults: http_api tasks default to inline, every other type runs in a pod.
type TaskState¶
TaskState is the lifecycle state of a TaskInstance. The values mirror the task_state enum in the database (migration 003).
const (
// TaskStateNone is the initial state: the task has not been considered yet.
TaskStateNone TaskState = "none"
// TaskStateScheduled means dependencies are satisfied and the task is queued for dispatch.
TaskStateScheduled TaskState = "scheduled"
// TaskStateQueued means the executor has been asked to start the task.
TaskStateQueued TaskState = "queued"
// TaskStateRunning means the task is executing.
TaskStateRunning TaskState = "running"
// TaskStateSuccess means the task finished successfully.
TaskStateSuccess TaskState = "success"
// TaskStateFailed means the task finished with an error.
TaskStateFailed TaskState = "failed"
// TaskStateSkipped means the task was deliberately not run.
TaskStateSkipped TaskState = "skipped"
// TaskStateUpstreamFailed means a required upstream failed, so the task cannot run.
TaskStateUpstreamFailed TaskState = "upstream_failed"
// TaskStateUpForRetry means the task failed but has retries remaining.
TaskStateUpForRetry TaskState = "up_for_retry"
)
func (TaskState) IsTerminal¶
IsTerminal reports whether the task state is final (no further automatic transitions occur from it).
type TaskType¶
TaskType enumerates the kinds of work a task can perform.
Supported task types. See docs/api/dag-schema.json.
const (
// TaskTypePython runs a Python callable identified by an entrypoint.
TaskTypePython TaskType = "python"
// TaskTypeBash runs a shell command supplied as the entrypoint.
TaskTypeBash TaskType = "bash"
// TaskTypeHTTPAPI performs an outbound HTTP request from the control plane.
TaskTypeHTTPAPI TaskType = "http_api"
)
type TriggerRule¶
TriggerRule decides whether a task runs based on its upstreams' states.
Supported trigger rules for the MVP. See docs/api/dag-schema.json.
const (
// TriggerRuleAllSuccess runs when every upstream succeeded (default).
TriggerRuleAllSuccess TriggerRule = "all_success"
// TriggerRuleAllFailed runs when every upstream failed.
TriggerRuleAllFailed TriggerRule = "all_failed"
// TriggerRuleAllDone runs once every upstream finished, regardless of state.
TriggerRuleAllDone TriggerRule = "all_done"
// TriggerRuleOneSuccess runs as soon as one upstream succeeds.
TriggerRuleOneSuccess TriggerRule = "one_success"
// TriggerRuleOneFailed runs as soon as one upstream fails.
TriggerRuleOneFailed TriggerRule = "one_failed"
)
type Variable¶
Variable is a tenant-scoped key/value setting consumed by DAGs and managed from the Admin UI. Value is stored as-is (plaintext for the MVP); the API masks values of secret-ish keys.
type XComEntryMeta¶
XComEntryMeta is the metadata for one stored XCom value (without the value payload) โ the source for a task instance's XCom list. Leoflow XComs are unmapped, so MapIndex is -1.
Generated by gomarkdoc