persistence

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package persistence provides a BoltDB implementation of JobStore.

Package persistence provides an in-memory implementation of JobStore.

Package persistence defines the storage interface for GopherQueue.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BoltStore

type BoltStore struct {
	// contains filtered or unexported fields
}

BoltStore is a BoltDB-backed implementation of JobStore.

func NewBoltStore

func NewBoltStore(dataDir string) (*BoltStore, error)

NewBoltStore creates a new BoltDB job store.

func (*BoltStore) ClaimJob

func (s *BoltStore) ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)

ClaimJob atomically claims a job for a worker.

func (*BoltStore) Cleanup

func (s *BoltStore) Cleanup(ctx context.Context, retention time.Duration) (int64, error)

Cleanup removes old completed/failed jobs.

func (*BoltStore) Close

func (s *BoltStore) Close() error

Close releases resources.

func (*BoltStore) Count

func (s *BoltStore) Count(ctx context.Context, filter JobFilter) (int64, error)

Count returns the number of jobs matching the filter.

func (*BoltStore) Create

func (s *BoltStore) Create(ctx context.Context, job *core.Job) error

Create persists a new job.

func (*BoltStore) Delete

func (s *BoltStore) Delete(ctx context.Context, id uuid.UUID) error

Delete removes a job.

func (*BoltStore) Get

func (s *BoltStore) Get(ctx context.Context, id uuid.UUID) (*core.Job, error)

Get retrieves a job by ID.

func (*BoltStore) GetByIdempotencyKey

func (s *BoltStore) GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)

GetByIdempotencyKey retrieves a job by its idempotency key.

func (*BoltStore) GetCheckpoint

func (s *BoltStore) GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)

GetCheckpoint retrieves the last checkpoint for a job.

func (*BoltStore) GetDelayed

func (s *BoltStore) GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)

GetDelayed returns jobs whose scheduled time has passed.

func (*BoltStore) GetPending

func (s *BoltStore) GetPending(ctx context.Context, limit int) ([]*core.Job, error)

GetPending returns jobs ready for scheduling.

func (*BoltStore) GetResult

func (s *BoltStore) GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)

GetResult retrieves the result for a completed job.

func (*BoltStore) GetStuck

func (s *BoltStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)

GetStuck returns stuck jobs.

func (*BoltStore) List

func (s *BoltStore) List(ctx context.Context, filter JobFilter) ([]*core.Job, error)

List returns jobs matching the given filter.

func (*BoltStore) SaveCheckpoint

func (s *BoltStore) SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error

SaveCheckpoint stores a checkpoint for a running job.

func (*BoltStore) SaveResult

func (s *BoltStore) SaveResult(ctx context.Context, result *core.JobResult) error

SaveResult stores the result of a job execution.

func (*BoltStore) Stats

func (s *BoltStore) Stats(ctx context.Context) (*QueueStats, error)

Stats returns queue statistics.

func (*BoltStore) Update

func (s *BoltStore) Update(ctx context.Context, job *core.Job) error

Update atomically updates a job.

func (*BoltStore) UpdateState

func (s *BoltStore) UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error

UpdateState atomically updates only the job state.

type JobFilter

type JobFilter struct {
	// Filter by job types
	Types []string

	// Filter by states
	States []core.JobState

	// Filter by priorities
	Priorities []core.Priority

	// Filter by tags (all must match)
	Tags map[string]string

	// Filter by time range
	CreatedAfter  time.Time
	CreatedBefore time.Time
	UpdatedAfter  time.Time

	// Filter by correlation
	CorrelationID string

	// Pagination
	Cursor string
	Limit  int
	Order  ListOrder
}

JobFilter specifies criteria for listing jobs.

type JobStore

type JobStore interface {
	// Create persists a new job. Returns ErrJobAlreadyExists if
	// a job with the same idempotency key exists.
	Create(ctx context.Context, job *core.Job) error

	// Get retrieves a job by ID. Returns ErrJobNotFound if not found.
	Get(ctx context.Context, id uuid.UUID) (*core.Job, error)

	// GetByIdempotencyKey retrieves a job by its idempotency key.
	// Returns ErrJobNotFound if not found.
	GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)

	// Update atomically updates a job. The job must already exist.
	Update(ctx context.Context, job *core.Job) error

	// UpdateState atomically updates only the job state and updated_at.
	UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error

	// Delete removes a job. This is typically used for cleanup.
	Delete(ctx context.Context, id uuid.UUID) error

	// List returns jobs matching the given filter.
	List(ctx context.Context, filter JobFilter) ([]*core.Job, error)

	// Count returns the number of jobs matching the filter.
	Count(ctx context.Context, filter JobFilter) (int64, error)

	// GetPending returns jobs ready for scheduling, ordered by
	// priority (ascending) then created_at (ascending).
	GetPending(ctx context.Context, limit int) ([]*core.Job, error)

	// GetDelayed returns jobs whose scheduled time has passed.
	GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)

	// GetStuck returns jobs that have been running longer than
	// the visibility timeout and should be reclaimed.
	GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)

	// ClaimJob atomically transitions a job from scheduled to running
	// and assigns it to the specified worker. Returns false if the
	// job is no longer claimable.
	ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)

	// SaveResult stores the result of a job execution.
	SaveResult(ctx context.Context, result *core.JobResult) error

	// GetResult retrieves the result for a completed job.
	GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)

	// SaveCheckpoint stores a checkpoint for a running job.
	SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error

	// GetCheckpoint retrieves the last checkpoint for a job.
	GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)

	// Stats returns queue statistics.
	Stats(ctx context.Context) (*QueueStats, error)

	// Cleanup removes completed/failed jobs older than the retention period.
	Cleanup(ctx context.Context, retention time.Duration) (int64, error)

	// Close releases any resources held by the store.
	Close() error
}

JobStore defines the persistence interface for job storage. Implementations must provide durable storage with ACID guarantees.

type ListOrder

type ListOrder string

ListOrder specifies the ordering for job lists.

const (
	OrderCreatedAsc  ListOrder = "created_asc"
	OrderCreatedDesc ListOrder = "created_desc"
	OrderUpdatedDesc ListOrder = "updated_desc"
	OrderPriorityAsc ListOrder = "priority_asc"
)

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

MemoryStore is an in-memory implementation of JobStore. This is intended for testing and development; use BoltStore for production.

func NewMemoryStore

func NewMemoryStore() *MemoryStore

NewMemoryStore creates a new in-memory job store.

func (*MemoryStore) ClaimJob

func (m *MemoryStore) ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)

ClaimJob atomically claims a job for a worker.

func (*MemoryStore) Cleanup

func (m *MemoryStore) Cleanup(ctx context.Context, retention time.Duration) (int64, error)

Cleanup removes old completed/failed jobs.

func (*MemoryStore) Close

func (m *MemoryStore) Close() error

Close releases resources.

func (*MemoryStore) Count

func (m *MemoryStore) Count(ctx context.Context, filter JobFilter) (int64, error)

Count returns the number of jobs matching the filter.

func (*MemoryStore) Create

func (m *MemoryStore) Create(ctx context.Context, job *core.Job) error

Create persists a new job.

func (*MemoryStore) Delete

func (m *MemoryStore) Delete(ctx context.Context, id uuid.UUID) error

Delete removes a job.

func (*MemoryStore) Get

func (m *MemoryStore) Get(ctx context.Context, id uuid.UUID) (*core.Job, error)

Get retrieves a job by ID.

func (*MemoryStore) GetByIdempotencyKey

func (m *MemoryStore) GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)

GetByIdempotencyKey retrieves a job by its idempotency key.

func (*MemoryStore) GetCheckpoint

func (m *MemoryStore) GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)

GetCheckpoint retrieves the last checkpoint for a job.

func (*MemoryStore) GetDelayed

func (m *MemoryStore) GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)

GetDelayed returns jobs whose scheduled time has passed.

func (*MemoryStore) GetPending

func (m *MemoryStore) GetPending(ctx context.Context, limit int) ([]*core.Job, error)

GetPending returns jobs ready for scheduling.

func (*MemoryStore) GetResult

func (m *MemoryStore) GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)

GetResult retrieves the result for a completed job.

func (*MemoryStore) GetStuck

func (m *MemoryStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)

GetStuck returns stuck jobs.

func (*MemoryStore) List

func (m *MemoryStore) List(ctx context.Context, filter JobFilter) ([]*core.Job, error)

List returns jobs matching the given filter.

func (*MemoryStore) SaveCheckpoint

func (m *MemoryStore) SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error

SaveCheckpoint stores a checkpoint for a running job.

func (*MemoryStore) SaveResult

func (m *MemoryStore) SaveResult(ctx context.Context, result *core.JobResult) error

SaveResult stores the result of a job execution.

func (*MemoryStore) Stats

func (m *MemoryStore) Stats(ctx context.Context) (*QueueStats, error)

Stats returns queue statistics.

func (*MemoryStore) Update

func (m *MemoryStore) Update(ctx context.Context, job *core.Job) error

Update atomically updates a job.

func (*MemoryStore) UpdateState

func (m *MemoryStore) UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error

UpdateState atomically updates only the job state.

type QueueStats

type QueueStats struct {
	// Current state counts
	Pending    int64 `json:"pending"`
	Scheduled  int64 `json:"scheduled"`
	Running    int64 `json:"running"`
	Delayed    int64 `json:"delayed"`
	Completed  int64 `json:"completed"`
	Failed     int64 `json:"failed"`
	DeadLetter int64 `json:"dead_letter"`
	Cancelled  int64 `json:"cancelled"`

	// Total jobs
	TotalJobs int64 `json:"total_jobs"`

	// Storage info
	StorageBytes int64 `json:"storage_bytes"`

	// Timestamp
	Timestamp time.Time `json:"timestamp"`
}

QueueStats contains aggregate queue statistics.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL