worker

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2026 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Overview

Package worker loads and runs polling workers that push items into named queues.

Workers are described by *.worker.yaml files. LoadDir reads all files from a directory and returns validated WorkerDefinitions. Definitions whose "type" field is not "http_poll" are rejected at load time.

Supported YAML schema (all fields except name, type, interval, url, and output.queue are optional):

name: my-poller
type: http_poll
interval: 5m
url: "https://api.example.com/items"
headers:
  Authorization: "Bearer {{env:MY_TOKEN}}"
output:
  queue: my-queue
  dedup_key: number

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoadDir

func LoadDir(dir string) ([]model.WorkerDefinition, error)

LoadDir reads all *.worker.yaml files from dir and returns validated WorkerDefinitions. If dir is empty or does not exist, an empty slice is returned without error. Individual files that fail to parse are collected and returned as a combined error.

Types

type HTTPPollWorker

type HTTPPollWorker struct {
	// contains filtered or unexported fields
}

HTTPPollWorker polls an HTTP endpoint at a fixed interval, deduplicates responses by a configured key field, and pushes new items into a named queue.

Deduplication is in-memory; the seen set is initialized from the current queue contents on startup to avoid re-queuing items already pending.

func (*HTTPPollWorker) Run

func (w *HTTPPollWorker) Run(ctx context.Context)

Run loops until ctx is cancelled, calling poll at each interval.

type Supervisor

type Supervisor struct {
	// contains filtered or unexported fields
}

Supervisor manages a set of polling workers, one goroutine per worker. Create with NewSupervisor; call Start to launch all workers.

func NewSupervisor

func NewSupervisor(defs []model.WorkerDefinition, mgr *queue.Manager, log *logging.Logger) *Supervisor

NewSupervisor constructs a Supervisor from a set of WorkerDefinitions. Workers that fail to initialise are logged as warnings and skipped; the remaining workers are started normally.

func (*Supervisor) Drain

func (s *Supervisor) Drain()

Drain blocks until all worker goroutines have exited. It is safe to call Drain even if Start was never called.

func (*Supervisor) Start

func (s *Supervisor) Start(ctx context.Context)

Start launches one goroutine per worker. It returns immediately; the goroutines run until ctx is cancelled. Call Drain to wait for all workers to finish after the context is cancelled.

func (*Supervisor) Workers

func (s *Supervisor) Workers() []WorkerStatus

Workers returns a snapshot of all workers managed by the supervisor.

type WorkerStatus

type WorkerStatus struct {
	Name       string `json:"name"`
	Type       string `json:"type"`
	Interval   string `json:"interval"`
	Queue      string `json:"queue"`
	LastPollAt int64  `json:"last_poll_at"` // Unix timestamp; 0 = never polled
	QueueLen   int    `json:"queue_len"`    // -1 if unavailable
}

WorkerStatus is a snapshot of a single worker's runtime state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL