Skip to content

agentrpc

import "github.com/neochaotic/leoflow/internal/agentrpc"

Package agentrpc implements the control-plane side of the agent gRPC protocol: it authenticates each in-pod agent by its per-task-instance token, serves the task specification, and records the state transitions the agent reports.

Index

func RecoveryStreamInterceptor

func RecoveryStreamInterceptor(logger *slog.Logger) grpc.StreamServerInterceptor

RecoveryStreamInterceptor is RecoveryUnaryInterceptor for streaming handlers (e.g. log streaming), so a panic mid-stream returns Internal instead of crashing the control plane.

func RecoveryUnaryInterceptor

func RecoveryUnaryInterceptor(logger *slog.Logger) grpc.UnaryServerInterceptor

RecoveryUnaryInterceptor recovers panics in unary RPC handlers so a single malformed or unexpected request from an agent cannot crash the control plane. The panic is logged with its stack and translated to a gRPC Internal error; the server keeps serving. It should be the outermost interceptor so it also covers any later interceptor.

type Authenticator

Authenticator verifies an agent bearer token into a task instance identity.

type Authenticator interface {
    AuthenticateAgent(token string) (*auth.AgentIdentity, error)
}

type LogPublisher

LogPublisher fans a log line out for live tailing (optional).

type LogPublisher interface {
    Publish(ctx context.Context, ref logs.Ref, line string) error
}

type LogSink

LogSink opens a writer for a task attempt's streamed logs.

type LogSink interface {
    Open(ref logs.Ref) (logs.LogWriter, error)
}

type SecretsStore

SecretsStore returns a tenant's Variables and Connections for delivery to a task pod (ADR 0021). Connection URIs carry decrypted credentials, so this is only ever served over the authenticated agent channel โ€” never to the UI/API.

type SecretsStore interface {
    SecretVariables(ctx context.Context, tenant string) (map[string]string, error)
    SecretConnectionURIs(ctx context.Context, tenant string) (map[string]string, error)
}

type Server

Server implements agentv1.AgentServiceServer over a Store and Authenticator.

type Server struct {
    agentv1.UnimplementedAgentServiceServer
    // contains filtered or unexported fields
}

func NewServer

func NewServer(authn Authenticator, store Store, xcomSvc XComService) *Server

NewServer builds an AgentService server backed by the given authenticator, store, and XCom service.

func (*Server) FetchXCom

func (s *Server) FetchXCom(ctx context.Context, req *agentv1.FetchXComRequest) (*agentv1.FetchXComResponse, error)

FetchXCom returns an upstream task's value, but only from a task the caller declared as an XCom input within the same run (and, by construction, the same tenant), enforcing cross-tenant and cross-run isolation.

func (*Server) GetConnections

func (s *Server) GetConnections(ctx context.Context, _ *agentv1.GetConnectionsRequest) (*agentv1.GetConnectionsResponse, error)

GetConnections returns the calling task's tenant Connections as Airflow URIs for the agent to export as AIRFLOW_CONN_\<CONN_ID>.

func (*Server) GetTaskSpec

func (s *Server) GetTaskSpec(ctx context.Context, _ *agentv1.GetTaskSpecRequest) (*agentv1.TaskSpec, error)

GetTaskSpec returns the execution spec for the calling task instance.

func (*Server) GetVariables

func (s *Server) GetVariables(ctx context.Context, _ *agentv1.GetVariablesRequest) (*agentv1.GetVariablesResponse, error)

GetVariables returns the calling task's tenant Variables for the agent to export as AIRFLOW_VAR_\<KEY>.

func (*Server) Heartbeat

func (s *Server) Heartbeat(ctx context.Context, _ *agentv1.HeartbeatRequest) (*agentv1.HeartbeatResponse, error)

Heartbeat stamps the per-TI liveness signal (#128) and returns the server clock so the agent can detect skew. A storage error stamping the heartbeat is logged but does not fail the RPC โ€” failing the call would risk the agent terminating itself unnecessarily on a transient DB blip. The scheduler reaper would, in the worst case, fail the TI as agent_lost on the next tick; correct under "do no harm" (ADR 0031).

func (*Server) PushXCom

func (s *Server) PushXCom(ctx context.Context, req *agentv1.PushXComRequest) (*agentv1.PushXComResponse, error)

PushXCom stores a value the task produced, keyed by the caller's identity. Size/schema violations are returned as a rejection, not a transport error, so the agent can fail the task with a clear reason.

func (*Server) Register

func (s *Server) Register(ctx context.Context, _ *agentv1.RegisterRequest) (*agentv1.RegisterResponse, error)

Register acknowledges an agent's startup and returns the server clock.

func (*Server) ReportState

func (s *Server) ReportState(ctx context.Context, req *agentv1.ReportStateRequest) (*agentv1.ReportStateResponse, error)

ReportState records a state transition the agent observed for its task.

func (*Server) SetLogPublisher

func (s *Server) SetLogPublisher(p LogPublisher)

SetLogPublisher attaches the live-tail publisher (optional). When set, StreamLogs publishes each line for the UI's live tail.

func (*Server) SetLogSink

func (s *Server) SetLogSink(sink LogSink)

SetLogSink attaches the log sink that StreamLogs writes to. Without it, StreamLogs reports Unimplemented.

func (*Server) SetSecrets

func (s *Server) SetSecrets(store SecretsStore, allowInsecure bool)

SetSecrets attaches the secrets store. allowInsecure permits serving secrets over a non-TLS channel โ€” for local/dev only; production must use TLS (the handlers fail closed otherwise). See ADR 0021 / issue #58.

func (*Server) StreamLogs

func (s *Server) StreamLogs(stream agentv1.AgentService_StreamLogsServer) (err error)

StreamLogs receives the task's log lines and writes them through the sink, flushing on stream end so the logs survive the pod.

type Store

Store is the server's view of persistent task state.

type Store interface {
    // TaskSpec returns the execution spec for the identified task instance.
    TaskSpec(ctx context.Context, id auth.AgentIdentity) (TaskSpec, error)
    // ReportState records a state transition reported by the agent.
    ReportState(ctx context.Context, id auth.AgentIdentity, state domain.TaskState, exitCode int, errMsg string) error
    // RecordHeartbeat stamps last_heartbeat_at on the identified TI so the
    // scheduler's heartbeat reaper (#128) can tell live tasks from agent-lost
    // ones. The state guard inside the SQL skips already-terminal rows.
    RecordHeartbeat(ctx context.Context, id auth.AgentIdentity) error
}

type TaskSpec

TaskSpec is the execution specification the agent needs to run a task.

type TaskSpec struct {
    Operator         string
    Entrypoint       string
    DagVersion       string
    Environment      map[string]string
    XComInputMapping map[string][]string
    XComSchema       map[string]any
    TimeoutSeconds   int
    // CallArgsJSON carries TaskFlow literal call args captured by the parser
    // (#115). The agent injects this verbatim as LEOFLOW_CALL_ARGS_JSON; the
    // runtime decodes it. Empty when the task has no literals. The name
    // keeps Airflow's DAG-run `params` term free for a future feature (#148).
    CallArgsJSON string
}

type XComService

XComService stores and retrieves XCom values for the agent.

type XComService interface {
    Push(ctx context.Context, key xcom.Key, value []byte, contentType string, schema map[string]any) error
    Fetch(ctx context.Context, key xcom.Key) (xcom.Entry, error)
}

Generated by gomarkdoc