redis

package
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2025 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HeartbeatData

type HeartbeatData struct {
	WorkerID    string    `redis:"worker_id"`
	StartedAt   time.Time `redis:"started_at"`
	HeartbeatAt time.Time `redis:"heartbeat_at"`
	Queues      []byte    `redis:"queues"`
	PID         int       `redis:"pid"`
	MemoryUsage float64   `redis:"memory_usage"`
	CPUUsage    float64   `redis:"cpu_usage"`
}

type Heartbeater

type Heartbeater struct {
	// contains filtered or unexported fields
}

func NewHeartBeater

func NewHeartBeater(rc redis.UniversalClient, opts ...OptFunc) *Heartbeater

func (*Heartbeater) LastHeartbeats

func (s *Heartbeater) LastHeartbeats(ctx context.Context) ([]taskqueue.HeartbeatData, error)

func (*Heartbeater) RemoveHeartbeat

func (s *Heartbeater) RemoveHeartbeat(ctx context.Context, workerID string) error

func (*Heartbeater) SendHeartbeat

func (s *Heartbeater) SendHeartbeat(ctx context.Context, data taskqueue.HeartbeatData) error

type Job

type Job struct {
	ID            string    `redis:"id"`
	QueueName     string    `redis:"queue_name"`
	Payload       []byte    `redis:"payload"`
	CreatedAt     time.Time `redis:"created_at"`
	StartedAt     time.Time `redis:"started_at"`
	UpdatedAt     time.Time `redis:"updated_at"`
	Attempts      int       `redis:"attempts"`
	FailureReason string    `redis:"failure_reason"`
	Status        JobStatus `redis:"status"`
	ProcessedBy   string    `redis:"processed_by"`
}

type JobStatus

type JobStatus int8

func (JobStatus) MarshalBinary

func (j JobStatus) MarshalBinary() (data []byte, err error)

func (*JobStatus) ScanRedis

func (j *JobStatus) ScanRedis(s string) error

type JobStorage

type JobStorage interface {
	CreateOrUpdate(ctx context.Context, job *taskqueue.Job) error
	GetJob(ctx context.Context, jobID string) (*taskqueue.Job, error)
	DeleteJob(ctx context.Context, jobID string) error
	UpdateJobStatus(ctx context.Context, jobID string, status taskqueue.JobStatus) error
}

type MetricsBackend

type MetricsBackend struct {
	// contains filtered or unexported fields
}

func NewMetricsBackend

func NewMetricsBackend(client redis.UniversalClient, opts ...OptFunc) *MetricsBackend

func (*MetricsBackend) GaugeValue

func (*MetricsBackend) IncrementCounter

func (m *MetricsBackend) IncrementCounter(ctx context.Context, mt taskqueue.Metric, count int, ts time.Time) error

func (*MetricsBackend) QueryRangeCounterValues

func (m *MetricsBackend) QueryRangeCounterValues(ctx context.Context, mt taskqueue.Metric, start, end time.Time) (taskqueue.MetricRangeValue, error)

func (*MetricsBackend) QueryRangeGaugeValues

func (m *MetricsBackend) QueryRangeGaugeValues(ctx context.Context, mt taskqueue.Metric, start, end time.Time) (taskqueue.MetricRangeValue, error)

func (*MetricsBackend) RecordGauge

func (m *MetricsBackend) RecordGauge(ctx context.Context, mt taskqueue.Metric, value float64, ts time.Time) error

type OptFunc

type OptFunc func(*Options)

func WithCompletedJobTTL

func WithCompletedJobTTL(completedJobTTL time.Duration) OptFunc

func WithNamespace

func WithNamespace(namespace string) OptFunc

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue(client redis.UniversalClient, opts ...OptFunc) *Queue

func (*Queue) Ack

func (q *Queue) Ack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.AckOptions) error

func (*Queue) DeleteJobFromDeadQueue

func (q *Queue) DeleteJobFromDeadQueue(ctx context.Context, queueName string, jobID string) error

func (*Queue) Dequeue

func (q *Queue) Dequeue(ctx context.Context, opts *taskqueue.DequeueOptions, count int) ([]*taskqueue.Job, error)

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, job *taskqueue.Job, opts *taskqueue.EnqueueOptions) error

func (*Queue) ListDeadQueues

func (q *Queue) ListDeadQueues(ctx context.Context) ([]*taskqueue.QueueInfo, error)

func (*Queue) ListPendingQueues

func (q *Queue) ListPendingQueues(ctx context.Context) ([]*taskqueue.QueueInfo, error)

func (*Queue) Nack

func (q *Queue) Nack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.NackOptions) error

func (*Queue) PageDeadQueue

func (q *Queue) PageDeadQueue(ctx context.Context, queueName string, p taskqueue.Pagination) (*taskqueue.QueueDetails, error)

func (*Queue) PagePendingQueue

func (q *Queue) PagePendingQueue(ctx context.Context, queueName string, p taskqueue.Pagination) (*taskqueue.QueueDetails, error)

func (*Queue) PausePendingQueue

func (q *Queue) PausePendingQueue(ctx context.Context, queueName string) error

func (*Queue) ResumePendingQueue

func (q *Queue) ResumePendingQueue(ctx context.Context, queueName string) error

type QueueWithExternalJobStorage

type QueueWithExternalJobStorage struct {
	// contains filtered or unexported fields
}

func NewQueueWithExternalJobStorage

func NewQueueWithExternalJobStorage(qclient redis.UniversalClient, s JobStorage, opts ...OptFunc) *QueueWithExternalJobStorage

func (*QueueWithExternalJobStorage) Ack

func (*QueueWithExternalJobStorage) DeleteJobFromDeadQueue

func (q *QueueWithExternalJobStorage) DeleteJobFromDeadQueue(ctx context.Context, queueName string, jobID string) error

func (*QueueWithExternalJobStorage) Dequeue

func (*QueueWithExternalJobStorage) Enqueue

func (*QueueWithExternalJobStorage) ListDeadQueues

func (q *QueueWithExternalJobStorage) ListDeadQueues(ctx context.Context) ([]*taskqueue.QueueInfo, error)

func (*QueueWithExternalJobStorage) ListPendingQueues

func (q *QueueWithExternalJobStorage) ListPendingQueues(ctx context.Context) ([]*taskqueue.QueueInfo, error)

func (*QueueWithExternalJobStorage) Nack

func (*QueueWithExternalJobStorage) PageDeadQueue

func (*QueueWithExternalJobStorage) PagePendingQueue

func (*QueueWithExternalJobStorage) PausePendingQueue

func (q *QueueWithExternalJobStorage) PausePendingQueue(ctx context.Context, queueName string) error

func (*QueueWithExternalJobStorage) ResumePendingQueue

func (q *QueueWithExternalJobStorage) ResumePendingQueue(ctx context.Context, queueName string) error

type Store

type Store struct {
	// contains filtered or unexported fields
}

func NewStore

func NewStore(client redis.UniversalClient, opts ...OptFunc) *Store

func (*Store) CreateOrUpdate

func (s *Store) CreateOrUpdate(ctx context.Context, job *taskqueue.Job) error

func (*Store) DeleteJob

func (s *Store) DeleteJob(ctx context.Context, jobID string) error

func (*Store) GetJob

func (s *Store) GetJob(ctx context.Context, jobID string) (*taskqueue.Job, error)

func (*Store) UpdateJobStatus

func (s *Store) UpdateJobStatus(ctx context.Context, jobID string, status taskqueue.JobStatus) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL