Documentation
¶
Overview ¶
Package dispatch implements the Phase 2 run-owner push-dispatch machinery.
Two internal HTTP endpoints are defined:
POST /internal/dispatch – owner → worker: push a ready task to a specific worker. POST /internal/complete – worker → owner: report task outcome back to owner.
Both endpoints are guarded by the existing CAESIUM_INTERNAL_WAKEUP_TOKEN bearer-token check and run on the dedicated internal mTLS listener when owner mode is enabled.
When CAESIUM_RUN_OWNER_ENABLED=false (default), these handlers are never registered and the system behaves byte-identically to Phase 1.
Index ¶
- Constants
- Variables
- func ClientTLSConfig(c MTLSConfig) (*tls.Config, error)
- func ClientTLSConfigFromHolder(holder *dispatchpki.MaterialHolder) (*tls.Config, error)
- func ConfigureInternalMTLS(clientTLS *tls.Config)
- func PostDispatch(ctx context.Context, targetURL, token string, req DispatchRequest) (bool, error)
- func ServerTLSConfig(c MTLSConfig) (*tls.Config, error)
- func ServerTLSConfigFromHolder(holder *dispatchpki.MaterialHolder) (*tls.Config, error)
- func WarnIfNoToken(token string)
- type CompleteRequest
- type CompleteResponse
- type DispatchLoop
- type DispatchLoopConfig
- type DispatchRequest
- type ErrorResponse
- type Handler
- type InboundDispatch
- type InternalServer
- type MTLSConfig
- type OwnerReader
- type PeerLister
- type PeerListerFunc
- type TaskPendingReader
- type WorkerSubmitter
Constants ¶
const ( ReasonStaleGeneration = "stale_generation" ReasonWrongWorker = "wrong_worker" ReasonInvalidStatus = "invalid_status" ReasonTaskNotRunning = "task_not_running" ReasonNotOwner = "not_owner" ReasonMissingRun = "missing_run" ReasonMalformed = "malformed" // ReasonContention labels caesium_complete_retryable_total when the owner // could not apply a completion because of transient dqlite contention and // answered 503 so the worker retries. It is NOT a fence violation. ReasonContention = "contention" )
Rejection reason labels for caesium_complete_rejected_total.
const ( DispatchReasonNetworkError = "network_error" DispatchReasonWorkerRejected = "worker_rejected" DispatchReasonNoPeers = "no_peers" // peer discovery returned empty list (bootstrap) DispatchReasonPeerDiscoveryError = "peer_discovery_error" // peer discovery RPC failed )
DispatchRejectionReason labels for caesium_dispatch_rejected_total.
Variables ¶
var ErrOwnerBusy = errors.New("owner busy: retryable")
ErrOwnerBusy is returned by PostComplete when the owner answered 503 Service Unavailable: it could not apply the completion because of transient dqlite contention and is asking the worker to retry the identical request. This is distinct from a fence rejection (409), which is terminal — callers should retry on ErrOwnerBusy and give up on any other error.
var ValidCompleteStatuses = map[string]bool{ string(run.TaskStatusSucceeded): true, string(run.TaskStatusFailed): true, string(run.TaskStatusCached): true, }
ValidCompleteStatuses are the only task statuses workers may report. "skipped" is deliberately excluded — skipping is an owner-side DAG decision.
Functions ¶
func ClientTLSConfig ¶
func ClientTLSConfig(c MTLSConfig) (*tls.Config, error)
ClientTLSConfig builds the *tls.Config used when this node POSTs to a peer's internal endpoints (PostDispatch / PostComplete): it presents this node's certificate and verifies the peer's server certificate was signed by the CA.
Hostname verification is deliberately disabled: cluster peers are reached by dynamic pod IPs / node addresses that a long-lived certificate can't enumerate in its SANs, so the built-in name check would reject valid peers. Peer identity instead rests on (a) the certificate being signed by the shared internal CA, verified here against the chain, and (b) the application-layer owner-generation + worker-node fence on every dispatch/complete. We therefore set InsecureSkipVerify (which only disables the name/standard verification) and re-implement chain verification in VerifyPeerCertificate.
func ClientTLSConfigFromHolder ¶
func ClientTLSConfigFromHolder(holder *dispatchpki.MaterialHolder) (*tls.Config, error)
func ConfigureInternalMTLS ¶
ConfigureInternalMTLS replaces the shared internal client with one that presents this node's client certificate and verifies peers against the configured CA. Called once at startup when run-owner mode is enabled, before any dispatch or completion POST is issued. Subsequent calls are no-ops. Peer internal endpoints are reached over https on the internal port (see DispatchLoopConfig.InternalPort).
func PostDispatch ¶
PostDispatch sends a DispatchRequest to the target worker node and returns whether the worker accepted (202) or rejected (409). On rejection or network error, the caller should fall back to writing the task to the DB with claimed_by="" for ClaimNext recovery.
func ServerTLSConfig ¶
func ServerTLSConfig(c MTLSConfig) (*tls.Config, error)
ServerTLSConfig builds the *tls.Config for the internal listener: it presents this node's certificate and requires + verifies a client certificate signed by the configured CA on every connection. A peer with no certificate, or one signed by an unknown CA, fails the TLS handshake before reaching a handler.
func ServerTLSConfigFromHolder ¶
func ServerTLSConfigFromHolder(holder *dispatchpki.MaterialHolder) (*tls.Config, error)
func WarnIfNoToken ¶
func WarnIfNoToken(token string)
WarnIfNoToken emits a startup warning when owner mode is on but the internal wakeup token is not set. Without the token, the dispatch and complete endpoints reject every request (bearer-token check fails closed), so run-owner dispatch is silently inert — adding lease overhead with zero benefit. Warn-only: unlike the mTLS material (a hard startup error), a missing token is recoverable by setting it without regenerating certs.
Types ¶
type CompleteRequest ¶
type CompleteRequest struct {
RunID uuid.UUID `json:"run_id"`
TaskID uuid.UUID `json:"task_id"`
OwnerGeneration int64 `json:"owner_generation"`
Attempt int `json:"attempt"`
WorkerNode string `json:"worker_node"`
Status string `json:"status"`
Result string `json:"result,omitempty"`
Outputs map[string]string `json:"outputs,omitempty"`
// BranchSelections carries the downstream branch names a `type: branch`
// task chose at runtime. The owner uses this to propagate `skipped` to the
// non-selected branches. Empty for non-branch tasks.
BranchSelections []string `json:"branch_selections,omitempty"`
Error string `json:"error,omitempty"`
}
CompleteRequest is the envelope sent by a worker back to the owner when a task execution finishes.
type CompleteResponse ¶
type CompleteResponse struct {
// Accepted is true when the completion was applied.
Accepted bool `json:"accepted"`
Reason string `json:"reason,omitempty"`
}
CompleteResponse is the JSON body returned by /internal/complete.
func PostComplete ¶
func PostComplete(ctx context.Context, ownerURL, token string, req CompleteRequest) (*CompleteResponse, error)
PostComplete sends a CompleteRequest from a worker to the owner node.
type DispatchLoop ¶
type DispatchLoop struct {
// contains filtered or unexported fields
}
DispatchLoop is the per-node push-dispatch goroutine for Phase A2. Call Run(ctx) in a goroutine; it exits cleanly when ctx is cancelled.
func NewDispatchLoop ¶
func NewDispatchLoop(cfg DispatchLoopConfig) *DispatchLoop
NewDispatchLoop constructs a DispatchLoop from cfg.
func (*DispatchLoop) Run ¶
func (l *DispatchLoop) Run(ctx context.Context)
Run starts the polling loop. It blocks until ctx is cancelled.
type DispatchLoopConfig ¶
type DispatchLoopConfig struct {
// NodeID is this node's canonical address (CAESIUM_NODE_ADDRESS). Used
// as the identity for OwnedRuns and included in the round-robin peer list.
NodeID string
// APIPort is the HTTP API port (CAESIUM_PORT). Used to build the dispatch
// URL from peer node addresses when InternalPort is unset (tests / non-mTLS).
APIPort int
// InternalPort is the dedicated internal mTLS listener port
// (CAESIUM_INTERNAL_PORT). When > 0, peer and owner base URLs are built as
// https://host:InternalPort so dispatch/complete traffic flows over the
// mutually-authenticated internal listener instead of the public API port.
InternalPort int
// Token is the CAESIUM_INTERNAL_WAKEUP_TOKEN bearer token.
Token string
// Interval is the polling tick interval (CAESIUM_RUN_OWNER_DISPATCH_INTERVAL).
Interval time.Duration
// BatchSize caps the number of tasks dispatched per tick per run
// (CAESIUM_RUN_OWNER_DISPATCH_BATCH).
BatchSize int
// Deadline is added to time.Now() to produce the DispatchRequest.Deadline
// (CAESIUM_RUN_OWNER_DISPATCH_DEADLINE).
Deadline time.Duration
// LeaseTTL is the run-lease TTL (CAESIUM_RUN_LEASE_TTL), used as the new
// expiry when this node takes over an expired lease in the failover sweep.
LeaseTTL time.Duration
// LeaseStore provides ownership queries.
LeaseStore OwnerReader
// Store provides pending-task queries.
Store TaskPendingReader
// Peers resolves the current peer list.
Peers PeerLister
// PeerBaseURL maps a raw peer node address (host:dqlitePort) to the HTTP
// base URL the dispatch loop POSTs to (http://host:apiPort). Optional;
// tests override it to route multiple distinct peer node IDs to a single
// mux server. Production leaves it nil and the loop falls back to the
// default (build URL from APIPort).
PeerBaseURL func(nodeAddr string) string
// OwnerManager, when set (CAESIUM_RUN_OWNER_IN_MEMORY=true), is the source of
// truth for ready tasks: the loop dispatches from the in-memory ready queue
// and records dispatches/recoveries on it, instead of polling the DB for
// pending tasks. Nil keeps the proven B2 DB-poll path.
OwnerManager *run.OwnerManager
}
DispatchLoopConfig holds all parameters for the dispatch loop goroutine.
type DispatchRequest ¶
type DispatchRequest struct {
RunID uuid.UUID `json:"run_id"`
TaskID uuid.UUID `json:"task_id"`
OwnerGeneration int64 `json:"owner_generation"`
Attempt int `json:"attempt"`
WorkerNode string `json:"worker_node"`
// OwnerBaseURL is the owner's own HTTP API base URL
// (http://<owner-host>:<apiPort>). The receiving worker POSTs its task
// completion back to OwnerBaseURL + "/internal/complete" so the owner
// remains the single writer for its run's hot rows. Set by the dispatch
// loop from the owner's node address + API port.
OwnerBaseURL string `json:"owner_base_url"`
Deadline time.Time `json:"deadline"`
}
DispatchRequest is the envelope pushed by the owner to a worker to ask it to execute a specific task.
type ErrorResponse ¶
ErrorResponse is a structured 409 body with a rejection reason label.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler holds the dependencies needed to serve the dispatch and complete endpoints.
func NewHandler ¶
NewHandler constructs a Handler. store is the run.Store; leaseStore is the run-lease store used to verify ownership; nodeID is this node's address; token is the CAESIUM_INTERNAL_WAKEUP_TOKEN value used for bearer-token auth.
func (*Handler) HandleComplete ¶
func (h *Handler) HandleComplete(w http.ResponseWriter, r *http.Request)
HandleComplete handles POST /internal/complete.
Validation rules (any mismatch → 409):
- This node currently owns the run (run_leases.owner_node == self && !expired).
- The envelope's owner_generation matches the current lease generation.
- worker_node matches claimed_by on the task_runs row.
- The task is currently in "running" status.
- status ∈ {succeeded, failed, cached} — "skipped" is rejected.
On success, the owner applies the completion via the existing CompleteTaskClaimed / CacheHitTaskClaimed / FailTaskClaimed path and returns 200 with {"accepted": true}.
func (*Handler) HandleDispatch ¶
func (h *Handler) HandleDispatch(w http.ResponseWriter, r *http.Request)
HandleDispatch handles POST /internal/dispatch.
The worker accepts the dispatch by:
- Parsing and validating the envelope.
- Calling StartTaskClaimed to transition the task to "running".
- Returning 202 ACK.
If the worker cannot accept (task already claimed, owner mismatch, etc.) it returns 409 and the owner falls back to writing the task to the DB with claimed_by="" for ClaimNext recovery.
func (*Handler) WithOwnerManager ¶
func (h *Handler) WithOwnerManager(m *run.OwnerManager) *Handler
WithOwnerManager enables the in-memory advancement path: completions are applied to the owner's RunState and persisted as terminal-only rows, instead of the SQL-advancement path. Returns the handler for chaining.
func (*Handler) WithWorkerSubmitter ¶
func (h *Handler) WithWorkerSubmitter(s WorkerSubmitter) *Handler
WithWorkerSubmitter wires the local worker's submit seam into the handler so accepted dispatches flow onto the worker's shared execution pool. Returns the handler for chaining at construction time.
type InboundDispatch ¶
type InboundDispatch struct {
// Task is the full task_runs row to execute (image/command/engine/etc.).
Task *models.TaskRun
// OwnerBaseURL is the owner's API base URL; the worker POSTs its completion
// to OwnerBaseURL + "/internal/complete".
OwnerBaseURL string
// OwnerGeneration / Attempt / WorkerNode are the fencing fields the owner
// validates on the completion envelope.
OwnerGeneration int64
Attempt int
WorkerNode string
}
InboundDispatch is a task accepted by this node for execution plus the owner metadata it needs to report completion back to the owner. HandleDispatch builds one of these and hands it to the worker via the WorkerSubmitter.
type InternalServer ¶
type InternalServer struct {
// contains filtered or unexported fields
}
InternalServer hosts the run-owner coordination endpoints (/internal/dispatch, /internal/complete) on a dedicated, mutually authenticated TLS listener — separate from the public API server, which stays plain HTTP behind the operator's own proxy. Node-to-node coordination traffic therefore never touches the public surface and every peer is authenticated by client certificate at the TLS layer.
func NewInternalServer ¶
func NewInternalServer(handler *Handler, addr string, tlsConfig *tls.Config) *InternalServer
NewInternalServer builds the internal mTLS server for handler, bound to addr (e.g. ":8443") with the supplied server TLS config (from ServerTLSConfig).
type MTLSConfig ¶
MTLSConfig holds the file paths for the internal mutual-TLS material (CAESIUM_INTERNAL_MTLS_CA/CERT/KEY). The same key pair is this node's identity in both directions: the server certificate on its internal listener and the client certificate it presents when POSTing to a peer.
func (MTLSConfig) Configured ¶
func (c MTLSConfig) Configured() bool
Configured reports whether all three material paths are set. Run-owner mode requires this to be true (enforced at startup).
type OwnerReader ¶
type OwnerReader interface {
OwnedRunsWithGenerations(ctx context.Context, ownerNode string) (map[uuid.UUID]int64, error)
// AcquireExpiredLeases takes over leases whose owner let them expire,
// reassigning them to ownerNode with an incremented generation. Used by the
// in-memory failover sweep so a peer recovers a dead owner's runs.
AcquireExpiredLeases(ctx context.Context, newOwner string, ttl time.Duration) (int64, error)
}
OwnerReader provides run-lease ownership queries used by the dispatch loop.
OwnedRunsWithGenerations returns owned runIDs mapped to their current lease generation in a single query — used per-tick to avoid an N+1 GetLease pattern as the owned set grows.
type PeerLister ¶
PeerLister provides the current set of dispatch-eligible peer node addresses. The production implementation delegates to the dqlite cluster member list; tests inject a stub. Addresses are returned as "host:dqlitePort" strings — the same format as CAESIUM_NODE_ADDRESS / dqlite.Cluster results.
type PeerListerFunc ¶
PeerListerFunc is a function-valued implementation of PeerLister.
func (PeerListerFunc) DispatchPeers ¶
func (f PeerListerFunc) DispatchPeers(ctx context.Context) ([]string, error)
type TaskPendingReader ¶
type TaskPendingReader interface {
PendingTasksForDispatch(ctx context.Context, runID uuid.UUID, limit int) ([]models.TaskRun, error)
}
TaskPendingReader provides pending-task queries used by the dispatch loop.
type WorkerSubmitter ¶
type WorkerSubmitter interface {
SubmitDispatched(d InboundDispatch) error
}
WorkerSubmitter is the seam the dispatch handler uses to hand an accepted task to the local worker's execution pool. The worker implementation (worker.Worker.SubmitDispatched) enqueues the task onto its inbound channel for the Run loop to drain onto the shared pool. It returns an error when the worker cannot accept the task (inbound buffer full or worker not running) so HandleDispatch can roll back the claim and let the owner re-dispatch.
It is an interface so dispatch tests can inject a fake without standing up a real worker + pool.