qsync

package module
v2.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2026 License: MIT Imports: 12 Imported by: 0

README

Простая и эффективная распределенная очередь задач в Go

GitHub go.mod Go version GitHub License Go Report Card GitHub top language GitHub code size in bytes GitHub Release

Qsync — это библиотека Go для постановки задач в очередь и их асинхронной обработки с помощью обработчиков.

Краткий обзор работы:

  • Клиент ставит задачи в очередь.
  • Сервер извлекает задачи из очередей и запускает рабочую процедуру для каждой задачи.
  • Задачи обрабатываются одновременно несколькими работниками.

Очереди задач используются как механизм распределения работы между несколькими машинами. Система может состоять из нескольких рабочих серверов, обеспечивая высокую доступность и горизонтальное масштабирование.

Установка

go get -u github.com/alnovi/qsync/v2

Функции

  • Гарантировано хотя бы одно выполнение задачи.
  • Планирование задач.
  • Повторные попытки неудачных задач.
  • Автоматическое восстановление задач в случае сбоя работника.
  • Очереди с взвешенным приоритетом.
  • Низкая задержка при добавлении задачи, поскольку запись в Redis выполняется быстро.
  • Дедупликация задач с использованием уникальной опции.
  • Разрешить тайм-аут и крайний срок для каждой задачи.
  • Совместимость с кластером Redis

Использование

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/alnovi/qsync/v2"
    "github.com/redis/go-redis/v9"
)

func main() {
    queue, err := qsync.New(redis.NewClient(&redis.Options{Addr: "localhost:6379"}))
    must(err, "failed to initialize queue")

    task := qsync.NewTask("reindex-file", []byte("task payload"),
        qsync.WithRetry(3),
        qsync.WithDelay(30 * time.Second),
        qsync.WithRetryDelay(5 * time.Second),
    )

	client := queue.NewClient()
    err = client.Enqueue(context.Background(), qsync.Default, task)
    must(err, "failed enqueue task")

	mux := qsync.NewMux()
	err = mux.HandleFunc("reindex-file", handle)
	must(err, "failed attach handle")
	
	server, err := queue.NewServer(mux)
	must(err, "failed to initialize server")

	err = server.Start(context.Background())
	must(err, "failed to start server")
	defer server.Stop(context.Background())
	
    time.Sleep(time.Minute)
}

func handle(ctx context.Context, task *qsync.TaskInfo) error {
	fmt.Println(task.Id, string(task.Payload))
	return nil
}

func must(err error, msg string) {
    if err != nil {
        panic(fmt.Errorf("%s: %w", msg, err))
    }
}

Documentation

Index

Constants

View Source
const (
	Critical = "critical"
	Default  = "default"
	Lower    = "lower"
)

Variables

View Source
var (
	ErrHandlerOverlap  = errors.New("handler overlap")
	ErrHandlerNotFound = errors.New("handler not found")
)
View Source
var (
	ErrMuxIsEmpty    = errors.New("mux is empty")
	ErrMatrixIsEmpty = errors.New("matrix is empty")
)
View Source
var (
	ErrTaskIsNil       = errors.New("task is nil")
	ErrTaskTypeIsEmpty = errors.New("task type is empty")
	ErrTaskIsExists    = errors.New("task is exists")
	ErrTaskFailEncode  = errors.New("fail encode task")
	ErrTaskIsDeadline  = errors.New("task is deadline")
)

Functions

This section is empty.

Types

type Client

type Client interface {
	Enqueue(ctx context.Context, queue string, task *Task) error
}

type HandleFunc

type HandleFunc func(ctx context.Context, task *TaskInfo) error

type Handler

type Handler interface {
	ProcessTask(ctx context.Context, task *TaskInfo) error
}

type Logger

type Logger interface {
	Debug(msg string, args ...any)
	Info(msg string, args ...any)
	Warn(msg string, args ...any)
	Error(msg string, args ...any)
}

type Metrics added in v2.0.5

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics added in v2.0.5

func NewMetrics(enabled bool, opts ...MetricsOption) *Metrics

func (*Metrics) QueueDequeueErrInc added in v2.0.5

func (m *Metrics) QueueDequeueErrInc(queue string)

func (*Metrics) QueueDequeueOkInc added in v2.0.5

func (m *Metrics) QueueDequeueOkInc(queue string)

func (*Metrics) QueueEnqueueErrInc added in v2.0.5

func (m *Metrics) QueueEnqueueErrInc(queue, task string)

func (*Metrics) QueueEnqueueOkInc added in v2.0.5

func (m *Metrics) QueueEnqueueOkInc(queue, task string)

func (*Metrics) TaskProcessErrInc added in v2.0.5

func (m *Metrics) TaskProcessErrInc(queue string, task *taskMessage)

func (*Metrics) TaskProcessExpiredInc added in v2.0.5

func (m *Metrics) TaskProcessExpiredInc(queue string, task *taskMessage)

func (*Metrics) TaskProcessOkInc added in v2.0.5

func (m *Metrics) TaskProcessOkInc(queue string, task *taskMessage)

type MetricsOption added in v2.0.5

type MetricsOption func(*Metrics)

func WithEnabled added in v2.0.5

func WithEnabled(enabled bool) MetricsOption

func WithNamespace added in v2.0.5

func WithNamespace(namespace string) MetricsOption

func WithRegister added in v2.0.7

func WithRegister(register prometheus.Registerer) MetricsOption

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

func NewMux

func NewMux() *Mux

func (*Mux) Handle

func (m *Mux) Handle(pattern string) (HandleFunc, error)

func (*Mux) HandleFunc

func (m *Mux) HandleFunc(pattern string, handler HandleFunc) error

func (*Mux) Handler

func (m *Mux) Handler(pattern string, handler Handler) error

type Option

type Option func(q *Qsync) error

func WithLogger

func WithLogger(logger Logger) Option

func WithMetrics added in v2.0.5

func WithMetrics(opts ...MetricsOption) Option

func WithPrefix

func WithPrefix(prefix string) Option

type Qsync

type Qsync struct {
	// contains filtered or unexported fields
}

func New

func New(client redis.UniversalClient, opts ...Option) (*Qsync, error)

func (*Qsync) NewClient

func (q *Qsync) NewClient() Client

func (*Qsync) NewServer

func (q *Qsync) NewServer(mux *Mux, opts ...ServerOption) (Server, error)

func (*Qsync) Ping

func (q *Qsync) Ping(ctx context.Context) error

type Server

type Server interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
}

type ServerOption

type ServerOption func(s *server) error

func WithContext

func WithContext(fn func() context.Context) ServerOption

func WithErrorHandler

func WithErrorHandler(fn func(error, *TaskInfo)) ServerOption

func WithMatrix

func WithMatrix(matrix map[string]int) ServerOption

func WithServerLogger

func WithServerLogger(logger *slog.Logger) ServerOption

func WithWait added in v2.0.9

func WithWait(wait time.Duration) ServerOption

type Task

type Task struct {
	// contains filtered or unexported fields
}

func NewTask

func NewTask(typename string, payload []byte, opts ...TaskOption) *Task

type TaskInfo

type TaskInfo struct {
	Id      string
	Type    string
	Payload []byte
	Retry   int
	Retried int
}

type TaskOption

type TaskOption func(*Task)

func WithDeadline

func WithDeadline(deadline time.Time) TaskOption

func WithDelay

func WithDelay(delay time.Duration) TaskOption

func WithId

func WithId(id string) TaskOption

func WithProcessAt

func WithProcessAt(processAt time.Time) TaskOption

func WithRetry

func WithRetry(retry int) TaskOption

func WithRetryDelay

func WithRetryDelay(delay time.Duration) TaskOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL