Documentation
¶
Overview ¶
Package persistence provides a BoltDB implementation of JobStore.
Package persistence provides an in-memory implementation of JobStore.
Package persistence defines the storage interface for GopherQueue.
Index ¶
- type BoltStore
- func (s *BoltStore) ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)
- func (s *BoltStore) Cleanup(ctx context.Context, retention time.Duration) (int64, error)
- func (s *BoltStore) Close() error
- func (s *BoltStore) Count(ctx context.Context, filter JobFilter) (int64, error)
- func (s *BoltStore) Create(ctx context.Context, job *core.Job) error
- func (s *BoltStore) Delete(ctx context.Context, id uuid.UUID) error
- func (s *BoltStore) Get(ctx context.Context, id uuid.UUID) (*core.Job, error)
- func (s *BoltStore) GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)
- func (s *BoltStore) GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)
- func (s *BoltStore) GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)
- func (s *BoltStore) GetPending(ctx context.Context, limit int) ([]*core.Job, error)
- func (s *BoltStore) GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)
- func (s *BoltStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)
- func (s *BoltStore) List(ctx context.Context, filter JobFilter) ([]*core.Job, error)
- func (s *BoltStore) SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error
- func (s *BoltStore) SaveResult(ctx context.Context, result *core.JobResult) error
- func (s *BoltStore) Stats(ctx context.Context) (*QueueStats, error)
- func (s *BoltStore) Update(ctx context.Context, job *core.Job) error
- func (s *BoltStore) UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error
- type JobFilter
- type JobStore
- type ListOrder
- type MemoryStore
- func (m *MemoryStore) ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)
- func (m *MemoryStore) Cleanup(ctx context.Context, retention time.Duration) (int64, error)
- func (m *MemoryStore) Close() error
- func (m *MemoryStore) Count(ctx context.Context, filter JobFilter) (int64, error)
- func (m *MemoryStore) Create(ctx context.Context, job *core.Job) error
- func (m *MemoryStore) Delete(ctx context.Context, id uuid.UUID) error
- func (m *MemoryStore) Get(ctx context.Context, id uuid.UUID) (*core.Job, error)
- func (m *MemoryStore) GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)
- func (m *MemoryStore) GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)
- func (m *MemoryStore) GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)
- func (m *MemoryStore) GetPending(ctx context.Context, limit int) ([]*core.Job, error)
- func (m *MemoryStore) GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)
- func (m *MemoryStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)
- func (m *MemoryStore) List(ctx context.Context, filter JobFilter) ([]*core.Job, error)
- func (m *MemoryStore) SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error
- func (m *MemoryStore) SaveResult(ctx context.Context, result *core.JobResult) error
- func (m *MemoryStore) Stats(ctx context.Context) (*QueueStats, error)
- func (m *MemoryStore) Update(ctx context.Context, job *core.Job) error
- func (m *MemoryStore) UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error
- type QueueStats
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BoltStore ¶
type BoltStore struct {
// contains filtered or unexported fields
}
BoltStore is a BoltDB-backed implementation of JobStore.
func NewBoltStore ¶
NewBoltStore creates a new BoltDB job store.
func (*BoltStore) GetByIdempotencyKey ¶
GetByIdempotencyKey retrieves a job by its idempotency key.
func (*BoltStore) GetCheckpoint ¶
GetCheckpoint retrieves the last checkpoint for a job.
func (*BoltStore) GetDelayed ¶
GetDelayed returns jobs whose scheduled time has passed.
func (*BoltStore) GetPending ¶
GetPending returns jobs ready for scheduling.
func (*BoltStore) GetStuck ¶
func (s *BoltStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)
GetStuck returns stuck jobs.
func (*BoltStore) SaveCheckpoint ¶
SaveCheckpoint stores a checkpoint for a running job.
func (*BoltStore) SaveResult ¶
SaveResult stores the result of a job execution.
func (*BoltStore) Stats ¶
func (s *BoltStore) Stats(ctx context.Context) (*QueueStats, error)
Stats returns queue statistics.
type JobFilter ¶
type JobFilter struct {
// Filter by job types
Types []string
// Filter by states
States []core.JobState
// Filter by priorities
Priorities []core.Priority
// Filter by tags (all must match)
Tags map[string]string
// Filter by time range
CreatedAfter time.Time
CreatedBefore time.Time
UpdatedAfter time.Time
// Filter by correlation
CorrelationID string
// Pagination
Cursor string
Limit int
Order ListOrder
}
JobFilter specifies criteria for listing jobs.
type JobStore ¶
type JobStore interface {
// Create persists a new job. Returns ErrJobAlreadyExists if
// a job with the same idempotency key exists.
Create(ctx context.Context, job *core.Job) error
// Get retrieves a job by ID. Returns ErrJobNotFound if not found.
Get(ctx context.Context, id uuid.UUID) (*core.Job, error)
// GetByIdempotencyKey retrieves a job by its idempotency key.
// Returns ErrJobNotFound if not found.
GetByIdempotencyKey(ctx context.Context, key string) (*core.Job, error)
// Update atomically updates a job. The job must already exist.
Update(ctx context.Context, job *core.Job) error
// UpdateState atomically updates only the job state and updated_at.
UpdateState(ctx context.Context, id uuid.UUID, state core.JobState) error
// Delete removes a job. This is typically used for cleanup.
Delete(ctx context.Context, id uuid.UUID) error
// List returns jobs matching the given filter.
List(ctx context.Context, filter JobFilter) ([]*core.Job, error)
// Count returns the number of jobs matching the filter.
Count(ctx context.Context, filter JobFilter) (int64, error)
// GetPending returns jobs ready for scheduling, ordered by
// priority (ascending) then created_at (ascending).
GetPending(ctx context.Context, limit int) ([]*core.Job, error)
// GetDelayed returns jobs whose scheduled time has passed.
GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)
// GetStuck returns jobs that have been running longer than
// the visibility timeout and should be reclaimed.
GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)
// ClaimJob atomically transitions a job from scheduled to running
// and assigns it to the specified worker. Returns false if the
// job is no longer claimable.
ClaimJob(ctx context.Context, id uuid.UUID, workerID string) (bool, error)
// SaveResult stores the result of a job execution.
SaveResult(ctx context.Context, result *core.JobResult) error
// GetResult retrieves the result for a completed job.
GetResult(ctx context.Context, jobID uuid.UUID) (*core.JobResult, error)
// SaveCheckpoint stores a checkpoint for a running job.
SaveCheckpoint(ctx context.Context, jobID uuid.UUID, data []byte) error
// GetCheckpoint retrieves the last checkpoint for a job.
GetCheckpoint(ctx context.Context, jobID uuid.UUID) ([]byte, error)
// Stats returns queue statistics.
Stats(ctx context.Context) (*QueueStats, error)
// Cleanup removes completed/failed jobs older than the retention period.
Cleanup(ctx context.Context, retention time.Duration) (int64, error)
// Close releases any resources held by the store.
Close() error
}
JobStore defines the persistence interface for job storage. Implementations must provide durable storage with ACID guarantees.
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
MemoryStore is an in-memory implementation of JobStore. This is intended for testing and development; use BoltStore for production.
func NewMemoryStore ¶
func NewMemoryStore() *MemoryStore
NewMemoryStore creates a new in-memory job store.
func (*MemoryStore) GetByIdempotencyKey ¶
GetByIdempotencyKey retrieves a job by its idempotency key.
func (*MemoryStore) GetCheckpoint ¶
GetCheckpoint retrieves the last checkpoint for a job.
func (*MemoryStore) GetDelayed ¶
func (m *MemoryStore) GetDelayed(ctx context.Context, now time.Time, limit int) ([]*core.Job, error)
GetDelayed returns jobs whose scheduled time has passed.
func (*MemoryStore) GetPending ¶
GetPending returns jobs ready for scheduling.
func (*MemoryStore) GetStuck ¶
func (m *MemoryStore) GetStuck(ctx context.Context, visibilityTimeout time.Duration) ([]*core.Job, error)
GetStuck returns stuck jobs.
func (*MemoryStore) SaveCheckpoint ¶
SaveCheckpoint stores a checkpoint for a running job.
func (*MemoryStore) SaveResult ¶
SaveResult stores the result of a job execution.
func (*MemoryStore) Stats ¶
func (m *MemoryStore) Stats(ctx context.Context) (*QueueStats, error)
Stats returns queue statistics.
func (*MemoryStore) UpdateState ¶
UpdateState atomically updates only the job state.
type QueueStats ¶
type QueueStats struct {
// Current state counts
Pending int64 `json:"pending"`
Scheduled int64 `json:"scheduled"`
Running int64 `json:"running"`
Delayed int64 `json:"delayed"`
Completed int64 `json:"completed"`
Failed int64 `json:"failed"`
DeadLetter int64 `json:"dead_letter"`
Cancelled int64 `json:"cancelled"`
// Total jobs
TotalJobs int64 `json:"total_jobs"`
// Storage info
StorageBytes int64 `json:"storage_bytes"`
// Timestamp
Timestamp time.Time `json:"timestamp"`
}
QueueStats contains aggregate queue statistics.