Skip to content

agent

import "github.com/neochaotic/leoflow/internal/agent"

Package agent contains the worker-side logic that runs inside the task container: building the user process command, injecting XCom inputs, reading the return value, and retry backoff. The gRPC client lives in cmd/leoflow-agent.

Index

func Backoff

func Backoff(attempt int) (delay time.Duration, ok bool)

Backoff returns the delay before retry attempt n (1-based: 1s, 2s, 4s, 8s, 16s). ok is false once the maximum number of attempts is exceeded.

func BuildCommand

func BuildCommand(operator, entrypoint string) ([]string, error)

BuildCommand returns the argv to execute the user's task for the given operator. http_api tasks are executed by the control plane, not the agent.

func Dial

func Dial(addr, token string, allowInsecure bool, caFile string) (agentv1.AgentServiceClient, *grpc.ClientConn, error)

Dial connects to the control plane's AgentService, attaching the bearer token to every RPC. When allowInsecure is true (local development against a cluster without TLS) the transport is unencrypted; otherwise TLS 1.2+ is required. When caFile is set, the server certificate is verified against that CA (a self-signed / cluster CA); otherwise the system roots are used.

func NewReturnValuePath

func NewReturnValuePath() (path string, cleanup func() error, err error)

NewReturnValuePath returns a unique, agent-owned path for this task's return value, plus a cleanup. The agent runs one task per process, so a per-process temp dir keeps concurrent tasks and other users from ever sharing a single /tmp/leoflow_return_value.json (which collided โ€” permission denied across uids, clobbered across parallel tasks). The runtime is pointed here via the LEOFLOW_RETURN_VALUE_PATH env the runner injects.

func ReadReturnValue

func ReadReturnValue(path string) (value []byte, ok bool, err error)

ReadReturnValue reads the optional return-value file. ok is false (no error) when the file does not exist.

func XComEnvVar

func XComEnvVar(name string, value []byte) string

XComEnvVar formats an XCom input as a LEOFLOW_XCOM_\<NAME>=\<json> env entry.

type CommandRunner

CommandRunner executes the user task process, writing its stdout and stderr to the supplied writers and returning the process exit code.

type CommandRunner interface {
    Run(ctx context.Context, argv, env []string, stdout, stderr io.Writer) (exitCode int, err error)
}

func NewExecRunner

func NewExecRunner() CommandRunner

NewExecRunner returns a CommandRunner that executes tasks as child processes.

type LogSink

LogSink receives log lines produced by the user task. Sends are best-effort.

type LogSink interface {
    Send(line *agentv1.LogLine) error
    Close() error
}

func OpenLogSink

func OpenLogSink(ctx context.Context, client agentv1.AgentServiceClient) (LogSink, error)

OpenLogSink starts the StreamLogs RPC and returns a sink that forwards lines to it. It is the agent's first RPC, so it uses WaitForReady: with the lazy connection of grpc.NewClient the channel may not be established yet, and without this the stream would fail fast on a cold connection (the "opening log stream" EOF in #36) rather than waiting for the control plane to be reachable.

type NoopLogSink

NoopLogSink discards log lines. The agent falls back to it when the control plane log stream is unavailable (e.g. StreamLogs not yet implemented), so a task still runs even though its logs are not shipped this run.

type NoopLogSink struct{}

func (NoopLogSink) Close

func (NoopLogSink) Close() error

Close is a no-op.

func (NoopLogSink) Send

func (NoopLogSink) Send(*agentv1.LogLine) error

Send discards the line.

type Runner

Runner orchestrates a single task execution inside the worker container: it registers with the control plane, fetches the task spec and XCom inputs, runs the user process while streaming logs, pushes the return value, and reports the terminal state.

type Runner struct {
    Client     agentv1.AgentServiceClient
    Cmd        CommandRunner
    Sink       LogSink
    Hostname   string
    Version    string
    Env        []string // base process environment (typically os.Environ())
    ReturnPath string   // file the task writes its return value to; empty disables push
    // HeartbeatInterval is how often to ping the control plane while the task
    // runs; zero disables heartbeats.
    HeartbeatInterval time.Duration
}

func (*Runner) Run

func (r *Runner) Run(ctx context.Context) error

Run executes the task lifecycle and returns an error if the task failed.

Generated by gomarkdoc