Documentation
¶
Overview ¶
Package worker implements the task execution layer for metalog.
The Prefetcher batch-claims tasks from the database queue using SELECT ... FOR UPDATE SKIP LOCKED and feeds them into a buffered channel. Multiple Core goroutines consume tasks from the channel, execute archive creation via the Archiver interface, and report results back through the TaskCompleter.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Archiver ¶
type Archiver interface {
CreateArchive(ctx context.Context,
irBackend string, irBuckets []string, irPaths []string,
archiveBackend, archiveBucket, archivePath string,
) (int64, error)
// DeleteArchive removes an archive from object storage. Used for
// orphan cleanup when archive creation fails after a partial upload.
DeleteArchive(ctx context.Context, backend, bucket, path string) error
}
Archiver creates archives from IR files.
type Core ¶
type Core struct {
// contains filtered or unexported fields
}
Core is the worker task execution loop.
func NewCore ¶
func NewCore( taskQueue TaskCompleter, archiveCreator Archiver, prefetcher *Prefetcher, log *zap.Logger, ) *Core
NewCore creates a worker Core.
type Prefetcher ¶
type Prefetcher struct {
// contains filtered or unexported fields
}
Prefetcher batch-claims tasks from the database and feeds them into a channel.
func NewPrefetcher ¶
func NewPrefetcher(tq TaskClaimer, workerID string, batchSize int, log *zap.Logger) *Prefetcher
NewPrefetcher creates a Prefetcher.
func (*Prefetcher) Done ¶
func (pf *Prefetcher) Done() <-chan struct{}
Done returns a channel that is closed when the prefetcher's Run loop exits. Workers can select on this to detect when no more tasks will arrive.
func (*Prefetcher) Run ¶
func (pf *Prefetcher) Run(ctx context.Context)
Run polls for tasks until ctx is canceled, then closes the task channel.
func (*Prefetcher) SetMeter ¶
func (pf *Prefetcher) SetMeter(m metric.Meter)
SetMeter configures OpenTelemetry metrics. Must be called before Run.
func (*Prefetcher) Tasks ¶
func (pf *Prefetcher) Tasks() <-chan *taskqueue.Task
Tasks returns the channel from which workers consume tasks.
type TaskClaimer ¶
type TaskClaimer interface {
ClaimTasks(ctx context.Context, tableName string, workerID string, batchSize int) ([]*taskqueue.Task, error)
}
TaskClaimer claims tasks from a task store. Extracted as an interface so the Prefetcher can be unit-tested with a mock instead of a real database.