task

package
v0.0.0-...-e13c845 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var WithMaxRetryAlias = abstract.WithMaxRetry
View Source
var WithTimeoutAlias = abstract.WithTimeout
View Source
var WithUniqueAlias = abstract.WithUnique

Functions

func MustMarshal

func MustMarshal(v any) []byte

func MustUnmarshal

func MustUnmarshal(data []byte, v any) error

Types

type AsynqAdapter

type AsynqAdapter struct {
	// contains filtered or unexported fields
}

func (*AsynqAdapter) Enqueue

func (q *AsynqAdapter) Enqueue(task *QueueTask, opts ...TaskOption) error

func (*AsynqAdapter) EnqueueAt

func (q *AsynqAdapter) EnqueueAt(task *QueueTask, at time.Time, opts ...TaskOption) error

func (*AsynqAdapter) EnqueueDelayed

func (q *AsynqAdapter) EnqueueDelayed(task *QueueTask, delay time.Duration, opts ...TaskOption) error

func (*AsynqAdapter) Name

func (q *AsynqAdapter) Name() string

func (*AsynqAdapter) RegisterHandler

func (q *AsynqAdapter) RegisterHandler(taskType string, handler TaskHandler) error

func (*AsynqAdapter) Start

func (q *AsynqAdapter) Start(ctx context.Context) error

func (*AsynqAdapter) Stats

func (q *AsynqAdapter) Stats() QueueStats

func (*AsynqAdapter) Stop

func (q *AsynqAdapter) Stop(ctx context.Context) error

type AsynqConfig

type AsynqConfig struct {
	RedisAddr     string
	RedisPassword string
	RedisDB       int
	Concurrency   int
	Queues        map[string]int
}

type AsynqSchedulerAdapter

type AsynqSchedulerAdapter struct {
	// contains filtered or unexported fields
}

func (*AsynqSchedulerAdapter) AddIntervalJob

func (s *AsynqSchedulerAdapter) AddIntervalJob(interval time.Duration, name string, handler JobHandler) error

func (*AsynqSchedulerAdapter) AddJob

func (s *AsynqSchedulerAdapter) AddJob(cronExpr string, name string, handler JobHandler) error

func (*AsynqSchedulerAdapter) Jobs

func (s *AsynqSchedulerAdapter) Jobs() []JobInfo

func (*AsynqSchedulerAdapter) RemoveJob

func (s *AsynqSchedulerAdapter) RemoveJob(name string) error

func (*AsynqSchedulerAdapter) RunJob

func (s *AsynqSchedulerAdapter) RunJob(name string) error

func (*AsynqSchedulerAdapter) Start

func (s *AsynqSchedulerAdapter) Start() error

func (*AsynqSchedulerAdapter) Stop

type CronScheduler

type CronScheduler interface {
	AddJob(cronExpr string, name string, handler JobHandler) error
	AddIntervalJob(interval time.Duration, name string, handler JobHandler) error
	RemoveJob(name string) error
	RunJob(name string) error
	Start() error
	Stop(ctx context.Context) error
	Jobs() []JobInfo
}

func NewAsynqScheduler

func NewAsynqScheduler(redisAddr string) CronScheduler

func NewMemoryCronScheduler

func NewMemoryCronScheduler() CronScheduler

func NewRobfigCronScheduler

func NewRobfigCronScheduler(opts ...RobfigCronOption) CronScheduler

type CronSchedulerAlias

type CronSchedulerAlias = abstract.CronScheduler

type JobHandler

type JobHandler func(ctx context.Context) error

type JobHandlerAlias

type JobHandlerAlias = abstract.JobHandler

type JobInfo

type JobInfo struct {
	Name       string
	Schedule   string
	NextRun    time.Time
	LastRun    time.Time
	Running    bool
	RunCount   int64
	ErrorCount int64
}

type QueueConfig

type QueueConfig struct {
	Workers     int
	MaxRetry    int
	Concurrency int
	Timeout     time.Duration
	Queues      map[string]int
}

type QueueConfigAlias

type QueueConfigAlias = abstract.QueueConfig

type QueueFactory

type QueueFactory interface {
	CreateQueue(name string, opts QueueConfig) (TaskQueue, error)
	CreateScheduler(opts SchedulerConfig) (CronScheduler, error)
}

func NewMemoryQueueFactory

func NewMemoryQueueFactory() QueueFactory

type QueueFactoryAlias

type QueueFactoryAlias = abstract.QueueFactory

type QueueStats

type QueueStats struct {
	Name        string
	Pending     int64
	Active      int64
	Scheduled   int64
	Retry       int64
	Processed   int64
	Failed      int64
	ProcessedAt time.Time
}

type QueueTask

type QueueTask struct {
	ID        string
	Type      string
	Payload   []byte
	Priority  int
	Queue     string
	Metadata  map[string]string
	CreatedAt time.Time
}

type ResultStore

type ResultStore interface {
	Set(ctx context.Context, taskID string, result *TaskResult, ttl time.Duration) error
	Get(ctx context.Context, taskID string) (*TaskResult, error)
	Delete(ctx context.Context, taskID string) error
}

type ResultStoreAlias

type ResultStoreAlias = abstract.ResultStore

type RobfigCronAdapter

type RobfigCronAdapter struct {
	// contains filtered or unexported fields
}

func (*RobfigCronAdapter) AddIntervalJob

func (s *RobfigCronAdapter) AddIntervalJob(interval time.Duration, name string, handler JobHandler) error

func (*RobfigCronAdapter) AddJob

func (s *RobfigCronAdapter) AddJob(cronExpr string, name string, handler JobHandler) error

func (*RobfigCronAdapter) Jobs

func (s *RobfigCronAdapter) Jobs() []JobInfo

func (*RobfigCronAdapter) RemoveJob

func (s *RobfigCronAdapter) RemoveJob(name string) error

func (*RobfigCronAdapter) RunJob

func (s *RobfigCronAdapter) RunJob(name string) error

func (*RobfigCronAdapter) Start

func (s *RobfigCronAdapter) Start() error

func (*RobfigCronAdapter) Stop

func (s *RobfigCronAdapter) Stop(ctx context.Context) error

type RobfigCronOption

type RobfigCronOption func(*robfigCronOptions)

func WithCronLogger

func WithCronLogger(logger cron.Logger) RobfigCronOption

func WithSeconds

func WithSeconds() RobfigCronOption

func WithTimezone

func WithTimezone(tz *time.Location) RobfigCronOption

type SchedulerConfig

type SchedulerConfig struct {
	Timezone    string
	Concurrency int
	Timeout     time.Duration
}

type SchedulerConfigAlias

type SchedulerConfigAlias = abstract.SchedulerConfig

type TaskHandler

type TaskHandler func(ctx context.Context, task *QueueTask) error

type TaskHandlerAlias

type TaskHandlerAlias = abstract.TaskHandler

type TaskOption

type TaskOption func(*TaskOptions)

func WithMaxRetry

func WithMaxRetry(n int) TaskOption

func WithTimeout

func WithTimeout(d time.Duration) TaskOption

func WithUnique

func WithUnique() TaskOption

type TaskOptionAlias

type TaskOptionAlias = abstract.TaskOption

type TaskOptions

type TaskOptions struct {
	MaxRetry  int
	Timeout   time.Duration
	Unique    bool
	Retention time.Duration
}

type TaskOptionsAlias

type TaskOptionsAlias = abstract.TaskOptions

type TaskQueue

type TaskQueue interface {
	Enqueue(task *QueueTask, opts ...TaskOption) error
	EnqueueDelayed(task *QueueTask, delay time.Duration, opts ...TaskOption) error
	EnqueueAt(task *QueueTask, at time.Time, opts ...TaskOption) error
	RegisterHandler(taskType string, handler TaskHandler) error
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	Stats() QueueStats
	Name() string
}

func NewAsynqQueue

func NewAsynqQueue(name string, config AsynqConfig) TaskQueue

func NewMemoryTaskQueue

func NewMemoryTaskQueue(name string, workers int, bufferSize int) TaskQueue

type TaskQueueAlias

type TaskQueueAlias = abstract.TaskQueue

type TaskResult

type TaskResult struct {
	TaskID string
	Status string
	Result []byte
	Error  string
	At     time.Time
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL