sasynq

package
v1.14.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2025 License: MIT Imports: 9 Imported by: 0

README

sasynq

sasynq is a wrapper around the excellent asynq library. It provides a simpler and more user-friendly SDK while remaining fully compatible with native asynq usage patterns. Its main features include:

  • Support for Redis Cluster and Sentinel for high availability and horizontal scalability.
  • Distributed task queues with support for priority queues, delayed queues, unique tasks (to prevent duplicate execution), and periodic task scheduling.
  • Built-in mechanisms for task retries (with customizable retry counts), timeouts, and deadlines.
  • Flexible scheduling for immediate, delayed, or specific-time execution.
  • Unified logging using zap.

sasynq streamlines asynchronous and distributed task processing in Go, helping you write clean and maintainable background job code quickly and safely.


Example of use

Queues

Defining Task Payloads and Handlers

Here’s how to define task payloads and handlers in sasynq:

// example/common/task.go
package common

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/hibiken/asynq"
	"github.com/go-dev-frame/sponge/pkg/sasynq"
)

// ----------------------------- Definition Method 1 ----------------------------------

const TypeEmailSend = "email:send"

type EmailPayload struct {
	UserID  int    `json:"user_id"`
	Message string `json:"message"`
}

func HandleEmailTask(ctx context.Context, p *EmailPayload) error {
	fmt.Printf("[Email] Task for UserID %d completed successfully\n", p.UserID)
	return nil
}

// ----------------------------- Definition Method  2 ----------------------------------

const TypeSMSSend = "sms:send"

type SMSPayload struct {
	UserID  int    `json:"user_id"`
	Message string `json:"message"`
}

func (p *SMSPayload) ProcessTask(ctx context.Context, t *asynq.Task) error {
	fmt.Printf("[SMS] Task for UserID %d completed successfully\n", p.UserID)
	return nil
}

// ----------------------------- Definition Method  3 ----------------------------------

const TypeMsgNotification = "msg:notification"

type MsgNotificationPayload struct {
	UserID  int    `json:"user_id"`
	Message string `json:"message"`
}

func HandleMsgNotificationTask(ctx context.Context, t *asynq.Task) error {
	var p MsgNotificationPayload
	if err := json.Unmarshal(t.Payload(), &p); err != nil {
		return fmt.Errorf("failed to unmarshal payload: %w", err)
	}
	fmt.Printf("[MSG] Task for UserID %d completed successfully\n", p.UserID)
	return nil
}

Producer Example

A producer enqueues tasks with various options like priority, delays, deadlines, and unique IDs.

// example/producer/main.go
package main

import (
	"fmt"
	"time"

	"github.com/go-dev-frame/sponge/pkg/sasynq"
	"example/common"
)

func runProducer(client *sasynq.Client) error {
	// Immediate enqueue with critical priority
	userPayload1 := &common.EmailPayload{
		UserID:  101,
		Message: "This is a message that is immediately queued, with critical priority",
	}
	_, info, err := client.EnqueueNow(common.TypeEmailSend, userPayload1,
		sasynq.WithQueue("critical"),
		sasynq.WithRetry(5),
	)
	if err != nil {
		return err
	}
	fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)

	// Enqueue after a 5-second delay
	userPayload2 := &common.SMSPayload{
		UserID:  202,
		Message: "This is a message added to the queue after a 5-second delay, with default priority",
	}
	_, info, err = client.EnqueueIn(5*time.Second, common.TypeSMSSend, userPayload2,
		sasynq.WithQueue("default"),
		sasynq.WithRetry(3),
	)
	if err != nil {
		return err
	}
	fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeSMSSend, info.ID, info.Queue)

	// Enqueue to run at a specific time
	userPayload3 := &common.MsgNotificationPayload{
		UserID:  303,
		Message: "This is a message scheduled to run at a specific time, with low priority",
	}
	_, info, err = client.EnqueueAt(time.Now().Add(10*time.Second), common.TypeMsgNotification, userPayload3,
		sasynq.WithQueue("low"),
		sasynq.WithRetry(1),
	)
	if err != nil {
		return err
	}
	fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeMsgNotification, info.ID, info.Queue)

	// Example of using NewTask directly
	userPayload4 := &common.EmailPayload{
		UserID:  404,
		Message: "This is a test message, with low priority, a 15-second deadline, and a unique ID",
	}
	task, err := sasynq.NewTask(common.TypeEmailSend, userPayload4)
	if err != nil {
		return err
	}
	info, err = client.Enqueue(task,
		sasynq.WithQueue("low"),
		sasynq.WithRetry(1),
		sasynq.WithDeadline(time.Now().Add(15*time.Second)),
		sasynq.WithUniqueID("unique-id-xxxx-xxxx"),
	)
	if err != nil {
		return err
	}
	fmt.Printf("enqueued task: type=%s, id=%s, queue=%s\n", common.TypeEmailSend, info.ID, info.Queue)

	return nil
}

func main() {
	cfg := sasynq.RedisConfig{
		Addr: "localhost:6379",
	}
	client := sasynq.NewClient(cfg)

	err := runProducer(client)
	if err != nil {
		panic(err)
	}
	defer client.Close()

	fmt.Println("All tasks enqueued.")
}

Consumer Example

A consumer server can register handlers in three different ways:

package main

import (
	"github.com/go-dev-frame/sponge/pkg/sasynq"
	"example/common"
)

func runConsumer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
	serverCfg := sasynq.DefaultServerConfig() // Uses critical, default, and low queues by default
	srv := sasynq.NewServer(redisCfg, serverCfg)

	// Attach logging middleware
	srv.Use(sasynq.LoggingMiddleware())

	// Register task handlers (three methods available):
	sasynq.RegisterTaskHandler(srv.Mux(), common.TypeEmailSend, sasynq.HandleFunc(common.HandleEmailTask)) // Method 1 (recommended)
	srv.Register(common.TypeSMSSend, &common.SMSPayload{}) // Method 2: register struct as payload
	srv.RegisterFunc(common.TypeMsgNotification, common.HandleMsgNotificationTask) // Method 3: register function directly

	srv.Run()

	return srv, nil
}

func main() {
	cfg := sasynq.RedisConfig{
		Addr: "localhost:6379",
	}
	srv, err := runConsumer(cfg)
	if err != nil {
		panic(err)
	}
	srv.WaitShutdown()
}

Periodic Tasks

sasynq makes scheduling recurring tasks very simple.

package main

import (
	"context"
	"fmt"

	"github.com/go-dev-frame/sponge/pkg/sasynq"
)

const TypeScheduledGet = "scheduled:get"

type ScheduledGetPayload struct {
	URL string `json:"url"`
}

func handleScheduledGetTask(ctx context.Context, p *ScheduledGetPayload) error {
	fmt.Printf("[Get] Task for URL %s completed successfully\n", p.URL)
	return nil
}

// -----------------------------------------------------------------------

func registerSchedulerTasks(scheduler *sasynq.Scheduler) error {
	payload1 := &ScheduledGetPayload{URL: "https://google.com"}
	entryID1, err := scheduler.RegisterTask("@every 2s", TypeScheduledGet, payload1)
	if err != nil {
		return err
	}
	fmt.Printf("Registered periodic task with entry ID: %s\n", entryID1)

	payload2 := &ScheduledGetPayload{URL: "https://bing.com"}
	entryID2, err := scheduler.RegisterTask("@every 3s", TypeScheduledGet, payload2)
	if err != nil {
		return err
	}
	fmt.Printf("Registered periodic task with entry ID: %s\n", entryID2)

	scheduler.Run()

	return nil
}

func runServer(redisCfg sasynq.RedisConfig) (*sasynq.Server, error) {
	serverCfg := sasynq.DefaultServerConfig()
	srv := sasynq.NewServer(redisCfg, serverCfg)
	srv.Use(sasynq.LoggingMiddleware())

	// Register handler for scheduled tasks
	sasynq.RegisterTaskHandler(srv.Mux(), TypeScheduledGet, sasynq.HandleFunc(handleScheduledGetTask))

	srv.Run()

	return srv, nil
}

func main() {
	cfg := sasynq.RedisConfig{
		Addr: "localhost:6379",
	}

	scheduler := sasynq.NewScheduler(cfg)
	err := registerSchedulerTasks(scheduler)
	if err != nil {
		panic(err)
	}

	srv, err := runServer(cfg)
	if err != nil {
		panic(err)
	}
	srv.Shutdown()
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func LoggingMiddleware

func LoggingMiddleware(opts ...LoggerOption) func(next asynq.Handler) asynq.Handler

LoggingMiddleware logs information about each processed task.

func NewTask

func NewTask[P any](typeName string, payload P, opts ...asynq.Option) (*asynq.Task, error)

NewTask creates a new asynq.Task with a typed payload. It automatically marshals the payload into JSON.

func NewZapLogger

func NewZapLogger(l *zap.Logger) asynq.Logger

func RegisterTaskHandler

func RegisterTaskHandler[T any](mux *asynq.ServeMux, typeName string, handler TaskHandler[T])

RegisterTaskHandler registers a generic, type-safe task handler with the server's mux. It automatically unmarshals the JSON payload into the specified type.

func WithDeadline

func WithDeadline(t time.Time) asynq.Option

WithDeadline specifies the deadline for the task.

func WithQueue

func WithQueue(name string) asynq.Option

WithQueue specifies which queue the task should be sent to.

func WithRetry

func WithRetry(maxRetry int) asynq.Option

WithRetry specifies the max number of times the task will be retried.

func WithTimeout

func WithTimeout(timeout time.Duration) asynq.Option

WithTimeout specifies the timeout duration for the task.

func WithUniqueID

func WithUniqueID(id string) asynq.Option

WithUniqueID specifies that the task should be unique for a given period. If another task with the same unique ID is enqueued within the retention period, it will be rejected.

Types

type Client

type Client struct {
	*asynq.Client
}

Client is a wrapper around asynq.Client providing more convenient APIs.

func NewClient

func NewClient(cfg RedisConfig) *Client

NewClient creates a new producer client.

func (*Client) Enqueue

func (c *Client) Enqueue(task *asynq.Task, opts ...asynq.Option) (*asynq.TaskInfo, error)

Enqueue enqueues the given task to a queue.

func (*Client) EnqueueAt

func (c *Client) EnqueueAt(t time.Time, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueAt enqueues a task to be processed at a specific time.

func (*Client) EnqueueIn

func (c *Client) EnqueueIn(delay time.Duration, typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueIn enqueues a task to be processed after a specified delay.

func (*Client) EnqueueNow

func (c *Client) EnqueueNow(typeName string, payload any, opts ...asynq.Option) (*asynq.Task, *asynq.TaskInfo, error)

EnqueueNow enqueues a task for immediate processing, parameter payload should be supported json.Marshal

type LoggerOption

type LoggerOption func(*loggerOptions)

LoggerOption set options.

func WithLogger

func WithLogger(l *zap.Logger) LoggerOption

WithLogger sets the logger to use for logging.

func WithMaxLength

func WithMaxLength(l int) LoggerOption

WithMaxLength sets the maximum length of the payload to log.

type RedisConfig

type RedisConfig struct {
	Mode RedisMode `yaml:"mode"`

	// For Single Mode
	Addr string `yaml:"addr"`

	// For Sentinel Mode
	SentinelAddrs []string `yaml:"sentinelAddrs"`
	MasterName    string   `yaml:"masterName"`

	// For Cluster Mode
	ClusterAddrs []string `yaml:"clusterAddrs"`

	// Common options
	Username string `yaml:"username"`
	Password string `yaml:"password"`
	DB       int    `yaml:"db"`
}

RedisConfig holds all configurations for connecting to Redis.

func (*RedisConfig) GetAsynqRedisConnOpt

func (c *RedisConfig) GetAsynqRedisConnOpt() asynq.RedisConnOpt

GetAsynqRedisConnOpt converts RedisConfig to asynq's RedisConnOpt interface. This is the core of the high-availability switching logic.

type RedisMode

type RedisMode string

RedisMode defines the Redis connection mode.

const (
	// RedisModeSingle uses a single Redis instance.
	RedisModeSingle RedisMode = "single"
	// RedisModeSentinel uses Redis Sentinel for high availability.
	RedisModeSentinel RedisMode = "sentinel"
	// RedisModeCluster uses a Redis Cluster for horizontal scaling.
	RedisModeCluster RedisMode = "cluster"
)

type Scheduler

type Scheduler struct {
	*asynq.Scheduler
}

Scheduler is a wrapper around asynq.Scheduler.

func NewScheduler

func NewScheduler(cfg RedisConfig, opts ...SchedulerOption) *Scheduler

NewScheduler creates a new periodic task scheduler.

func (*Scheduler) Register

func (s *Scheduler) Register(cronSpec string, task *asynq.Task, opts ...asynq.Option) (entryID string, err error)

Register adds a new periodic task.

func (*Scheduler) RegisterTask

func (s *Scheduler) RegisterTask(cronSpec string, typeName string, payload any, opts ...asynq.Option) (entryID string, err error)

RegisterTask adds a new periodic task with a given type name.

func (*Scheduler) Run

func (s *Scheduler) Run()

Run runs the asynq Scheduler in a separate goroutine

func (*Scheduler) Shutdown

func (s *Scheduler) Shutdown()

Shutdown the Scheduler.

func (*Scheduler) Unregister

func (s *Scheduler) Unregister(entryID string) error

Unregister removes a periodic task.

type SchedulerOption

type SchedulerOption func(*schedulerOptions)

SchedulerOption set options.

func WithSchedulerLogLevel

func WithSchedulerLogLevel(level asynq.LogLevel) SchedulerOption

WithSchedulerLogLevel sets the log level for the scheduler.

func WithSchedulerLogger

func WithSchedulerLogger(l *zap.Logger) SchedulerOption

WithSchedulerLogger sets the logger for the scheduler.

func WithSchedulerOptions

func WithSchedulerOptions(opts *asynq.SchedulerOpts) SchedulerOption

WithSchedulerOptions sets the options for the scheduler.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is a wrapper around asynq.Server providing integrated features.

func NewServer

func NewServer(redisCfg RedisConfig, serverCfg ServerConfig) *Server

NewServer creates a new consumer server.

func (*Server) Mux

func (s *Server) Mux() *asynq.ServeMux

Mux returns the underlying ServeMux to register handlers.

func (*Server) Register

func (s *Server) Register(typeName string, handler asynq.Handler)

Register a task processor

func (*Server) RegisterFunc

func (s *Server) RegisterFunc(typeName string, handlerFunc asynq.HandlerFunc)

RegisterFunc a task handler function

func (*Server) Run

func (s *Server) Run()

Run runs the asynq server in a separate goroutine

func (*Server) Shutdown

func (s *Server) Shutdown()

Shutdown the server.

func (*Server) Use

func (s *Server) Use(middlewares ...asynq.MiddlewareFunc)

Use adds middleware to the server's handler chain.

func (*Server) WaitShutdown

func (s *Server) WaitShutdown()

WaitShutdown for interrupt signals for graceful shutdown the server.

type ServerConfig

type ServerConfig struct {
	*asynq.Config
}

ServerConfig holds configurations for the asynq server.

func DefaultServerConfig

func DefaultServerConfig(l ...*zap.Logger) ServerConfig

DefaultServerConfig returns a default server configuration.

type TaskHandleFunc

type TaskHandleFunc[T any] func(ctx context.Context, payload T) error

TaskHandleFunc is a function adapter for TaskHandler.

func (TaskHandleFunc[T]) Handle

func (f TaskHandleFunc[T]) Handle(ctx context.Context, payload T) error

Handle calls the wrapped function.

type TaskHandler

type TaskHandler[T any] interface {
	Handle(ctx context.Context, payload T) error
}

TaskHandler is a generic interface for handling a task with a specific payload type.

func HandleFunc

func HandleFunc[T any](f func(ctx context.Context, payloadType T) error) TaskHandler[T]

HandleFunc creates a TaskHandler from a function.

type ZapLogger

type ZapLogger struct {
	// contains filtered or unexported fields
}

func (*ZapLogger) Debug

func (l *ZapLogger) Debug(args ...interface{})

func (*ZapLogger) Error

func (l *ZapLogger) Error(args ...interface{})

func (*ZapLogger) Fatal

func (l *ZapLogger) Fatal(args ...interface{})

func (*ZapLogger) Info

func (l *ZapLogger) Info(args ...interface{})

func (*ZapLogger) Warn

func (l *ZapLogger) Warn(args ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL