agent¶
Package agent contains the worker-side logic that runs inside the task container: building the user process command, injecting XCom inputs, reading the return value, and retry backoff. The gRPC client lives in cmd/leoflow-agent.
Index¶
- func Backoff(attempt int) (delay time.Duration, ok bool)
- func BuildCommand(operator, entrypoint string) ([]string, error)
- func Dial(addr, token string, allowInsecure bool, caFile string) (agentv1.AgentServiceClient, *grpc.ClientConn, error)
- func NewReturnValuePath() (path string, cleanup func() error, err error)
- func ReadReturnValue(path string) (value []byte, ok bool, err error)
- func XComEnvVar(name string, value []byte) string
- type CommandRunner
- func NewExecRunner() CommandRunner
- type LogSink
- func OpenLogSink(ctx context.Context, client agentv1.AgentServiceClient) (LogSink, error)
- type NoopLogSink
- func (NoopLogSink) Close() error
- func (NoopLogSink) Send(*agentv1.LogLine) error
- type Runner
- func (r *Runner) Run(ctx context.Context) error
func Backoff¶
Backoff returns the delay before retry attempt n (1-based: 1s, 2s, 4s, 8s, 16s). ok is false once the maximum number of attempts is exceeded.
func BuildCommand¶
BuildCommand returns the argv to execute the user's task for the given operator. http_api tasks are executed by the control plane, not the agent.
func Dial¶
func Dial(addr, token string, allowInsecure bool, caFile string) (agentv1.AgentServiceClient, *grpc.ClientConn, error)
Dial connects to the control plane's AgentService, attaching the bearer token to every RPC. When allowInsecure is true (local development against a cluster without TLS) the transport is unencrypted; otherwise TLS 1.2+ is required. When caFile is set, the server certificate is verified against that CA (a self-signed / cluster CA); otherwise the system roots are used.
func NewReturnValuePath¶
NewReturnValuePath returns a unique, agent-owned path for this task's return value, plus a cleanup. The agent runs one task per process, so a per-process temp dir keeps concurrent tasks and other users from ever sharing a single /tmp/leoflow_return_value.json (which collided โ permission denied across uids, clobbered across parallel tasks). The runtime is pointed here via the LEOFLOW_RETURN_VALUE_PATH env the runner injects.
func ReadReturnValue¶
ReadReturnValue reads the optional return-value file. ok is false (no error) when the file does not exist.
func XComEnvVar¶
XComEnvVar formats an XCom input as a LEOFLOW_XCOM_\<NAME>=\<json> env entry.
type CommandRunner¶
CommandRunner executes the user task process, writing its stdout and stderr to the supplied writers and returning the process exit code.
type CommandRunner interface {
Run(ctx context.Context, argv, env []string, stdout, stderr io.Writer) (exitCode int, err error)
}
func NewExecRunner¶
NewExecRunner returns a CommandRunner that executes tasks as child processes.
type LogSink¶
LogSink receives log lines produced by the user task. Sends are best-effort.
func OpenLogSink¶
OpenLogSink starts the StreamLogs RPC and returns a sink that forwards lines to it. It is the agent's first RPC, so it uses WaitForReady: with the lazy connection of grpc.NewClient the channel may not be established yet, and without this the stream would fail fast on a cold connection (the "opening log stream" EOF in #36) rather than waiting for the control plane to be reachable.
type NoopLogSink¶
NoopLogSink discards log lines. The agent falls back to it when the control plane log stream is unavailable (e.g. StreamLogs not yet implemented), so a task still runs even though its logs are not shipped this run.
func (NoopLogSink) Close¶
Close is a no-op.
func (NoopLogSink) Send¶
Send discards the line.
type Runner¶
Runner orchestrates a single task execution inside the worker container: it registers with the control plane, fetches the task spec and XCom inputs, runs the user process while streaming logs, pushes the return value, and reports the terminal state.
type Runner struct {
Client agentv1.AgentServiceClient
Cmd CommandRunner
Sink LogSink
Hostname string
Version string
Env []string // base process environment (typically os.Environ())
ReturnPath string // file the task writes its return value to; empty disables push
// HeartbeatInterval is how often to ping the control plane while the task
// runs; zero disables heartbeats.
HeartbeatInterval time.Duration
}
func (*Runner) Run¶
Run executes the task lifecycle and returns an error if the task failed.
Generated by gomarkdoc