rate_envelope_queue

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2025 License: MIT Imports: 10 Imported by: 0

README

rate-envelope-queue

A lightweight, goroutine-safe wrapper around k8s.io/client-go/util/workqueue for managing tasks (envelopes) with a fixed worker pool, periodic scheduling, deadlines, hooks, and stamps (middleware). Adds a safe queue lifecycle (Start/Stop/Start), buffering before first start, and queue capacity limiting.

Under the hood it uses workqueue.TypedRateLimitingInterface. Deduplication happens by pointer to the element: repeated Add of the same pointer while it is in-flight is ignored.


Table of Contents


Features

Goroutine-safe in-memory queue for your service:

  • All public methods are safe to call from multiple goroutines. Send can be called concurrently. Multiple workers are supported via limit. Start/Stop are serialized internally.
  • This is not a distributed queue: there are no guarantees across processes/hosts. Ensure your hooks/invokers are thread-safe around shared state.

What you get:

  • A clear lifecycle FSM: init → running → stopping → stopped
  • Both one-off and periodic tasks
  • Middleware chain via Stamp
  • Hooks: before/after/failure/success
  • Capacity accounting (quota)
  • Graceful or fast stop (Drain / Stop) and restartable queues (Start() after Stop())

Error/Hook semantics:

  • ErrStopEnvelope — intentional stop of a specific envelope:
    • the envelope is forgotten, not rescheduled;
    • if raised in beforeHook/invoke, the afterHook still runs (within a time-bounded context); successHook does not run.
  • context.Canceled / context.DeadlineExceeded — not a success:
    • envelope is forgotten; periodic ones are rescheduled, one-off ones are not.
  • Any other error:
    • periodic → rescheduled (if queue is alive);
    • one-off → defer to failureHook decision (RetryNow / RetryAfter / Drop).
  • Each hook runs with its own timeout: a fraction frac=0.5 of the envelope's deadline, but at least hardHookLimit (800ms). Hook timeouts are derived from the task context tctx, so hooks never outlive the envelope deadline.

Concurrency controls (brief):

  • stateMu guards the FSM state (RLock read / Lock write)
  • lifecycleMu serializes Start/Stop/queue swap
  • queueMu guards the inner workqueue pointer
  • pendingMu guards the pre-start buffer
  • run is an atomic fast flag for “queue alive”
  • Capacity accounting is atomic via tryReserve/inc/dec/unreserve

Other highlights:

  • Worker pool via WithLimitOption(n)
  • Start/Stop/Start: tasks sent before first start are buffered and flushed on Start()
  • Periodic vs one-off: interval > 0 means periodic; interval == 0 means one-off
  • Deadlines: deadline > 0 bounds invoke time via context.WithTimeout in the worker
  • Stamps: both global (queue-level) and per-envelope (task-level), with predictable execution order
  • Panic safety: panics inside task are handled (Forget+Done) and logged with stack; worker keeps running
  • Prometheus metrics: use client-go workqueue metrics

Installation

go get github.com/PavelAgarkov/rate-envelope-queue

Recommended pins (compatible with this package):

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

Requires: Go 1.24+.


Quick Start

See full examples in examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Capacity scenarios (accounting correctness):

Drain + waiting=true — wait for all workers; all dec() happen; no remainder.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true — after wg.Wait() we subtract the “tail” (cur - pend), the counter converges.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

Unlimited capacityWithAllowedCapacityOption(0) removes admission limits; the currentCapacity metric still reflects actual occupancy.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

Minimal API sketch:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // one-off
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // periodic
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // restart if needed

Concepts & Contracts

Envelope
e, err := NewEnvelope(
    WithId(123), // optional, for logs
    WithType("my_task"), // optional, for logs
    WithScheduleModeInterval(time.Second), // 0 = one-off
    WithDeadline(500*time.Millisecond),    // 0 = no deadline
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // required
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // Drop by default
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Validation:

  • invoke is required; interval >= 0; deadline >= 0
  • For periodic: deadline must not exceed intervalErrAdditionEnvelopeToQueueBadIntervals

Special error:

  • ErrStopEnvelope — gracefully stops this envelope only (no reschedule)
Queue
q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = unlimited
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded on overflow
q.Stop()

Pre-start buffer. In init, Send() pushes envelopes into an internal buffer; on Start() they are flushed into the workqueue.

Stamps (middleware)
type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

Order: global stamps (outer) wrap per-envelope stamps (inner), then Envelope.invoke.

A sample LoggingStamp() is provided for demonstration.


Worker Behavior

Result / condition Queue action
invoke returns nil Forget; if interval > 0 and alive → AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; if periodic and alive → AddAfter(interval)
ErrStopEnvelope Forget; no reschedule
Error on periodic Forget; if alive → AddAfter(interval)
Error on one-off + failureHook Use decision: RetryNow / RetryAfter(d) / Drop
Panic in task Forget + Done + stack log; worker continues

“Queue is alive” = run == true, state is running, base context not done, and workqueue not shutting down.


Stop Modes

Waiting \ StopMode Drain (graceful) Stop (fast)
true Wait for workers; ShutDownWithDrain() Wait for workers; ShutDown()
false No wait; ShutDownWithDrain() Immediate stop; ShutDown()

After Stop() you can call Start() again: a fresh inner workqueue will be created.


Capacity Limiting

WithAllowedCapacityOption(cap) limits the total number of in-flight/queued/delayed items (including reschedules).
If the limit would be exceeded, Send() returns ErrAllowedQueueCapacityExceeded.
currentCapacity is updated on add, reschedule, and completion.

  • cap == 0unlimited admission; the currentCapacity metric still tracks actual occupancy.
  • Stop + waiting=false + StopMode=Stop — documented tail leakage in accounting. Use Drain or waiting=true for accurate capacity convergence.

Benchmarks

Command examples:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Numbers provided by the author (your CPU/env will vary):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

Metrics (Prometheus)

Workqueue metrics are enabled via blank import:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Your queue name (QueueConfig.Name) is included in workqueue metric labels (workqueue_*: adds, depth, work_duration, retries, etc.).


Examples

See the examples/ folder for runnable snippets covering one-off jobs, periodic schedules, combined modes, and dynamic dispatch.


License

MIT — see LICENSE.



rate-envelope-queue (Русская версия)

Лёгкая, потокобезопасная обёртка над k8s.io/client-go/util/workqueue для управления задачами (envelopes) с фиксированным пулом воркеров, периодическим планированием, дедлайнами, хуками и stamps (middleware). Добавляет безопасный жизненный цикл очереди (Start/Stop/Start), буферизацию задач до старта и ограничение ёмкости очереди.

В основе — workqueue.TypedRateLimitingInterface. Дедупликация происходит по указателю на элемент: повторный Add того же указателя пока он в обработке — игнорируется.


Содержание


Ключевые возможности

Потокобезопасная локальная очередь в памяти приложения:

  • Все публичные методы безопасны при вызовах из нескольких горутин. Send можно вызывать параллельно. Воркеров может быть несколько (limit). Вызовы Start/Stop сериализуются внутри.
  • Это не распределённая очередь: гарантий между разными процессами/узлами нет. Код хуков/инвокеров должен сам обеспечивать потокобезопасность при доступе к общим ресурсам.

Что внутри:

  • Прозрачный автомат состояний: init → running → stopping → stopped
  • Одноразовые и периодические задачи
  • Цепочка middleware через Stamp
  • Хуки: before/after/failure/success
  • Учёт ёмкости (quota)
  • Мягкий или быстрый останов (Drain / Stop) и повторный старт (Start() после Stop())

Семантика ошибок и хуков:

  • ErrStopEnvelope — намеренная остановка конкретного конверта:
    • конверт забывается, не перепланируется;
    • если ошибка возникла в beforeHook/invoke, afterHook всё равно вызовется (с ограниченным временем); successHook не вызывается.
  • context.Canceled / context.DeadlineExceeded — это не успех:
    • конверт забывается; периодический — перепланируется, одноразовый — нет.
  • Любая другая ошибка:
    • периодическая → перепланируется (если очередь «жива»);
    • одноразовая → решение через failureHook (RetryNow / RetryAfter / Drop).
  • Каждый хук выполняется с собственным таймаутом: доля frac=0.5 от deadline конверта, но не меньше hardHookLimit (800мс). Таймауты «висят» на tctx, т.е. хуки никогда не переживут дедлайн конверта.

Потокобезопасность (коротко):

  • stateMu — чтение/запись состояния
  • lifecycleMu — сериализация Start/Stop/смены очереди
  • queueMu — доступ к внутренней очереди
  • pendingMu — буфер задач до старта
  • run — атомарный флаг «жива ли очередь»
  • Учёт ёмкости — атомарные операции tryReserve/inc/dec/unreserve

Прочее:

  • Пул воркеров: WithLimitOption(n)
  • Start/Stop/Start: задачи, добавленные до первого Start(), буферизуются и переливаются в очередь при старте
  • Периодические / одноразовые: interval > 0 — периодическая; interval == 0 — одноразовая
  • Дедлайны: deadline > 0 ограничивает время invoke через context.WithTimeout
  • Stamps: глобальные и на уровне конверта, порядок выполнения предсказуем
  • Защита от паник: паника в задаче → Forget+Done и лог стека; воркер продолжает работу
  • Метрики Prometheus: из client-go workqueue

Установка

go get github.com/PavelAgarkov/rate-envelope-queue

Рекомендуемые версии:

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

Требования: Go 1.24+.


Быстрый старт

Смотрите каталог examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Сценарии ёмкости (корректность учёта):

Drain + waiting=true — дожидаемся всех воркеров; все dec() прошли; остатка нет.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true — после wg.Wait() снимается «хвост» (cur - pend), счётчик сходится.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

Безлимитная ёмкостьWithAllowedCapacityOption(0) убирает ограничение приёма; метрика currentCapacity отражает фактическую занятость.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

Мини‑пример:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // одноразовая
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // периодическая
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // при необходимости можно снова стартовать

Концепции и контракты

Envelope
e, err := NewEnvelope(
    WithId(123),                   // опционально, для логов
    WithType("my_task"),           // опционально, для логов
    WithScheduleModeInterval(time.Second), // 0 = одноразовая
    WithDeadline(500*time.Millisecond),    // 0 = без дедлайна
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // обязательно
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // по умолчанию Drop
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Валидация:

  • invoke обязателен; interval >= 0; deadline >= 0
  • Для периодических: deadline не должен превышать intervalErrAdditionEnvelopeToQueueBadIntervals

Спец‑ошибка:

  • ErrStopEnvelope — корректно прекращает только этот конверт (без перепланирования)
Очередь
q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = без лимита
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded при переполнении
q.Stop()

Буфер до старта. В состоянии init Send() складывает задачи во внутренний буфер; при Start() — они переливаются в workqueue.

Stamps (middleware)
type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

Порядок: сначала глобальные stamps (внешние), затем per‑envelope (внутренние), после — Envelope.invoke.

LoggingStamp() — пример для иллюстрации (не «серебряная пуля» для продакшена).


Поведение воркера

Событие / результат Действие очереди
invoke вернул nil Forget; если interval > 0 и очередь «жива» → AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; если периодическая и очередь «жива» → AddAfter(interval)
ErrStopEnvelope Forget; не перепланируем
Ошибка у периодической Forget; если очередь «жива» → AddAfter(interval)
Ошибка у одноразовой + failureHook Решение пользователя: RetryNow / RetryAfter(d) / Drop
Паника в задаче Forget + Done + лог стека; воркер продолжает работу

«Очередь жива» = run == true, state == running, базовый контекст не завершён и workqueue не в shutdown.


Режимы остановки

Waiting \ StopMode Drain (мягкая) Stop (жёсткая)
true Ждать воркеров; ShutDownWithDrain() Ждать воркеров; ShutDown()
false Без ожидания воркеров; ShutDownWithDrain() Мгновенный останов; ShutDown()

После Stop() можно вызывать Start() повторно: создаётся новый внутренний workqueue.


Ограничение ёмкости

WithAllowedCapacityOption(cap) ограничивает суммарное число элементов в системе (включая перепланированные).
При попытке превышения лимита Send() возвращает ErrAllowedQueueCapacityExceeded.
currentCapacity обновляется при добавлении, перепланировании и завершении обработки.

  • cap == 0безлимит по приёму; метрика currentCapacity отражает фактическую занятость.
  • Stop + waiting=false + StopMode=Stop — документированная утечка «хвоста» в учёте. Для точной сходимости используйте Drain или waiting=true.

Бенчмарки

Как запускать:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Цифры автора (зависят от CPU/окружения):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

Метрики (Prometheus)

Метрики workqueue активируются бланк‑импортом:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Имя очереди (QueueConfig.Name) попадает в лейблы метрик (workqueue_*: adds, depth, work_duration, retries и т.д.).


Примеры

Смотрите каталог examples/ — там есть готовые варианты для одноразовых задач, периодических расписаний, комбинированных сценариев и динамического диспатча.


Лицензия

MIT — см. LICENSE.

Documentation

Index

Constants

View Source
const (
	DestinationStateField                    = "type"
	PayloadAfterField                        = "after"
	DecisionStateRetryNow   destinationState = "retry_now"
	DecisionStateRetryAfter destinationState = "retry_after"
	DecisionStateDrop       destinationState = "drop"
)

Variables

View Source
var (
	ErrStopEnvelope                        = errors.New(fmt.Sprintf("%s: stop envelope", service))
	ErrEnvelopeQueueIsNotRunning           = errors.New(fmt.Sprintf("%s: regect envelope, queue is not running or init", service))
	ErrQueueIsTerminated                   = errors.New(fmt.Sprintf("%s: regect envelope, queue is terminated", service))
	ErrAdditionEnvelopeToQueueBadFields    = errors.New(fmt.Sprintf("%s: addition envelope to queue has bad fields", service))
	ErrAdditionEnvelopeToQueueBadIntervals = errors.New(fmt.Sprintf("%s: addition envelope to queue has bad intervals", service))
	ErrAllowedQueueCapacityExceeded        = errors.New(fmt.Sprintf("%s: allowed queue capacity exceeded", service))
)

Functions

func WithAfterHook

func WithAfterHook(hook Invoker) func(*Envelope)

func WithAllowedCapacityOption

func WithAllowedCapacityOption(capacity uint64) func(*RateEnvelopeQueue)

func WithBeforeHook

func WithBeforeHook(hook Invoker) func(*Envelope)

func WithDeadline

func WithDeadline(d time.Duration) func(*Envelope)

func WithFailureHook

func WithFailureHook(hook func(ctx context.Context, envelope *Envelope, err error) Decision) func(*Envelope)

func WithId

func WithId(id uint64) func(*Envelope)

func WithInvoke

func WithInvoke(invoke Invoker) func(*Envelope)

func WithLimitOption

func WithLimitOption(limit int) func(*RateEnvelopeQueue)

func WithLimiterOption

func WithLimiterOption(limiter workqueue.TypedRateLimiter[*Envelope]) func(*RateEnvelopeQueue)

func WithPayload

func WithPayload(p interface{}) func(*Envelope)

func WithScheduleModeInterval

func WithScheduleModeInterval(d time.Duration) func(*Envelope)

WithScheduleModeInterval 0 means run once, not a schedule

func WithStamps

func WithStamps(stamps ...Stamp) func(*RateEnvelopeQueue)

func WithStampsPerEnvelope

func WithStampsPerEnvelope(stamps ...Stamp) func(*Envelope)

func WithStopModeOption

func WithStopModeOption(mode StopMode) func(*RateEnvelopeQueue)

func WithSuccessHook

func WithSuccessHook(hook func(ctx context.Context, envelope *Envelope)) func(*Envelope)

func WithType

func WithType(t string) func(*Envelope)

func WithWaitingOption

func WithWaitingOption(waiting bool) func(*RateEnvelopeQueue)

Types

type Decision

type Decision interface {
	Payload() Payload
}

func DefaultOnceDecision

func DefaultOnceDecision() Decision

func RetryOnceAfterDecision

func RetryOnceAfterDecision(after time.Duration) Decision

func RetryOnceNowDecision

func RetryOnceNowDecision() Decision

type DefaultDecision

type DefaultDecision struct {
	// contains filtered or unexported fields
}

func (*DefaultDecision) Payload

func (d *DefaultDecision) Payload() Payload

type Envelope

type Envelope struct {
	// contains filtered or unexported fields
}

func NewDynamicEnvelope

func NewDynamicEnvelope(deadline time.Duration, invoke Invoker, payload interface{}) (*Envelope, error)

func NewEnvelope

func NewEnvelope(opt ...func(*Envelope)) (*Envelope, error)

func NewScheduleEnvelope

func NewScheduleEnvelope(interval, deadline time.Duration, invoke Invoker, payload interface{}) (*Envelope, error)

func (*Envelope) GetId

func (envelope *Envelope) GetId() uint64

func (*Envelope) GetPayload

func (envelope *Envelope) GetPayload() interface{}

func (*Envelope) GetStamps

func (envelope *Envelope) GetStamps() []Stamp

func (*Envelope) GetType

func (envelope *Envelope) GetType() string

func (*Envelope) UpdatePayload

func (envelope *Envelope) UpdatePayload(p interface{})

type Invoker

type Invoker func(ctx context.Context, envelope *Envelope) error

type Payload

type Payload map[string]interface{}

type QueueState

type QueueState int32

внутренний автомат состояний

const (
	StateInit     QueueState = iota // создана, ещё не стартовала; Add() — буферизуется
	StateRunning                    // работает; Add() — сразу в workqueue
	StateStopping                   // идёт останов; Add() — ошибка
	StateStopped                    // остановлена; Add() — ошибка; возможен повторный Start()
	StateTerminate
)

type RateEnvelopeQueue

type RateEnvelopeQueue struct {
	// contains filtered or unexported fields
}

func (*RateEnvelopeQueue) CurrentState

func (q *RateEnvelopeQueue) CurrentState() QueueState

func (*RateEnvelopeQueue) Send

func (q *RateEnvelopeQueue) Send(envelopes ...*Envelope) error

func (*RateEnvelopeQueue) Start

func (q *RateEnvelopeQueue) Start()

func (*RateEnvelopeQueue) Stop

func (q *RateEnvelopeQueue) Stop()

func (*RateEnvelopeQueue) Terminate

func (q *RateEnvelopeQueue) Terminate()

type RetryOnceDecision

type RetryOnceDecision struct {
	// contains filtered or unexported fields
}

func (*RetryOnceDecision) Payload

func (d *RetryOnceDecision) Payload() Payload

type SingleQueuePool

type SingleQueuePool interface {
	Send(envelopes ...*Envelope) error
	Start()
	Stop()
	Terminate()
	CurrentState() QueueState
}

func NewRateEnvelopeQueue

func NewRateEnvelopeQueue(base context.Context, name string, options ...func(*RateEnvelopeQueue)) SingleQueuePool

NewRateEnvelopeQueue по умолчанию workqueue теряет задачи в режиме AddAfter при любой остановке очереди. ---------------------------------------------------------------------------------- аккуратный режим остановки, прочитаем все что в очереди WithWaitingOption(true), WithStopModeOption(Drain), ---------------------------------------------------------------------------------- корректно, если нужен «почти drain», но без жёсткого ожидания всех воркеров WithWaitingOption(false), WithStopModeOption(Drain), ---------------------------------------------------------------------------------- корректно для «быстрого, но чистого» останова с ожиданием WithWaitingOption(true), WithStopModeOption(Stop), ---------------------------------------------------------------------------------- мгновернный останов без ожидания, все теряем WithWaitingOption(false), WithStopModeOption(Stop), ----------------------------------------------------------------------------------

func NewSimpleDrainQueue

func NewSimpleDrainQueue(base context.Context, queueName string, queueRate int) SingleQueuePool

func NewSimpleStopQueue

func NewSimpleStopQueue(base context.Context, queueName string, queueRate int) SingleQueuePool

type Stamp

type Stamp func(next Invoker) Invoker

func LoggingStamp

func LoggingStamp() Stamp

type StopMode

type StopMode string
const (
	Drain StopMode = "drain"
	Stop  StopMode = "stop"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL