Documentation
¶
Overview ¶
Package worker loads and runs polling workers that push items into named queues.
Workers are described by *.worker.yaml files. LoadDir reads all files from a directory and returns validated WorkerDefinitions. Definitions whose "type" field is not "http_poll" are rejected at load time.
Supported YAML schema (all fields except name, type, interval, url, and output.queue are optional):
name: my-poller
type: http_poll
interval: 5m
url: "https://api.example.com/items"
headers:
Authorization: "Bearer {{env:MY_TOKEN}}"
output:
queue: my-queue
dedup_key: number
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func LoadDir ¶
func LoadDir(dir string) ([]model.WorkerDefinition, error)
LoadDir reads all *.worker.yaml files from dir and returns validated WorkerDefinitions. If dir is empty or does not exist, an empty slice is returned without error. Individual files that fail to parse are collected and returned as a combined error.
Types ¶
type HTTPPollWorker ¶
type HTTPPollWorker struct {
// contains filtered or unexported fields
}
HTTPPollWorker polls an HTTP endpoint at a fixed interval, deduplicates responses by a configured key field, and pushes new items into a named queue.
Deduplication is in-memory; the seen set is initialized from the current queue contents on startup to avoid re-queuing items already pending.
func (*HTTPPollWorker) Run ¶
func (w *HTTPPollWorker) Run(ctx context.Context)
Run loops until ctx is cancelled, calling poll at each interval.
type Supervisor ¶
type Supervisor struct {
// contains filtered or unexported fields
}
Supervisor manages a set of polling workers, one goroutine per worker. Create with NewSupervisor; call Start to launch all workers.
func NewSupervisor ¶
func NewSupervisor(defs []model.WorkerDefinition, mgr *queue.Manager, log *logging.Logger) *Supervisor
NewSupervisor constructs a Supervisor from a set of WorkerDefinitions. Workers that fail to initialise are logged as warnings and skipped; the remaining workers are started normally.
func (*Supervisor) Drain ¶
func (s *Supervisor) Drain()
Drain blocks until all worker goroutines have exited. It is safe to call Drain even if Start was never called.
func (*Supervisor) Start ¶
func (s *Supervisor) Start(ctx context.Context)
Start launches one goroutine per worker. It returns immediately; the goroutines run until ctx is cancelled. Call Drain to wait for all workers to finish after the context is cancelled.
func (*Supervisor) Workers ¶
func (s *Supervisor) Workers() []WorkerStatus
Workers returns a snapshot of all workers managed by the supervisor.
type WorkerStatus ¶
type WorkerStatus struct {
Name string `json:"name"`
Type string `json:"type"`
Interval string `json:"interval"`
Queue string `json:"queue"`
LastPollAt int64 `json:"last_poll_at"` // Unix timestamp; 0 = never polled
QueueLen int `json:"queue_len"` // -1 if unavailable
}
WorkerStatus is a snapshot of a single worker's runtime state.