worker

package
v0.0.0-...-ae8c3b3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package worker implements the task execution layer for metalog.

The Prefetcher batch-claims tasks from the database queue using SELECT ... FOR UPDATE SKIP LOCKED and feeds them into a buffered channel. Multiple Core goroutines consume tasks from the channel, execute archive creation via the Archiver interface, and report results back through the TaskCompleter.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Archiver

type Archiver interface {
	CreateArchive(ctx context.Context,
		irBackend string, irBuckets []string, irPaths []string,
		archiveBackend, archiveBucket, archivePath string,
	) (int64, error)

	// DeleteArchive removes an archive from object storage. Used for
	// orphan cleanup when archive creation fails after a partial upload.
	DeleteArchive(ctx context.Context, backend, bucket, path string) error
}

Archiver creates archives from IR files.

type Core

type Core struct {
	// contains filtered or unexported fields
}

Core is the worker task execution loop.

func NewCore

func NewCore(
	taskQueue TaskCompleter,
	archiveCreator Archiver,
	prefetcher *Prefetcher,
	log *zap.Logger,
) *Core

NewCore creates a worker Core.

func (*Core) Run

func (c *Core) Run(ctx context.Context)

Run processes tasks from the prefetcher until ctx is canceled.

func (*Core) SetMeter

func (c *Core) SetMeter(m metric.Meter)

SetMeter configures OpenTelemetry metrics. Must be called before Run.

type Prefetcher

type Prefetcher struct {
	// contains filtered or unexported fields
}

Prefetcher batch-claims tasks from the database and feeds them into a channel.

func NewPrefetcher

func NewPrefetcher(tq TaskClaimer, workerID string, batchSize int, log *zap.Logger) *Prefetcher

NewPrefetcher creates a Prefetcher.

func (*Prefetcher) Done

func (pf *Prefetcher) Done() <-chan struct{}

Done returns a channel that is closed when the prefetcher's Run loop exits. Workers can select on this to detect when no more tasks will arrive.

func (*Prefetcher) Run

func (pf *Prefetcher) Run(ctx context.Context)

Run polls for tasks until ctx is canceled, then closes the task channel.

func (*Prefetcher) SetMeter

func (pf *Prefetcher) SetMeter(m metric.Meter)

SetMeter configures OpenTelemetry metrics. Must be called before Run.

func (*Prefetcher) Tasks

func (pf *Prefetcher) Tasks() <-chan *taskqueue.Task

Tasks returns the channel from which workers consume tasks.

type TaskClaimer

type TaskClaimer interface {
	ClaimTasks(ctx context.Context, tableName string, workerID string, batchSize int) ([]*taskqueue.Task, error)
}

TaskClaimer claims tasks from a task store. Extracted as an interface so the Prefetcher can be unit-tested with a mock instead of a real database.

type TaskCompleter

type TaskCompleter interface {
	CompleteTask(ctx context.Context, taskID int64, output []byte) (int64, error)
	FailTask(ctx context.Context, taskID int64) (int64, error)
}

TaskCompleter handles task lifecycle operations (complete/fail).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL