queue

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MessageOrderDesc = "desc"
	MessageOrderAsc  = "asc"
)

Variables

View Source
var (
	ErrLeaseNotFound  = errors.New("lease not found")
	ErrLeaseExpired   = errors.New("lease expired")
	ErrQueueFull      = errors.New("queue full")
	ErrMemoryPressure = errors.New("memory pressure")
	ErrEnvelopeExists = errors.New("envelope already exists")
)

Functions

func IDMutationTouchesManagedRoute

func IDMutationTouchesManagedRoute(store Store, ids []string, allowedStates map[State]struct{}, managedRoutes map[string]struct{}) (bool, error)

IDMutationTouchesManagedRoute reports whether the ID-scoped mutation request targets at least one managed route item in one of the allowed states.

Types

type AttemptListRequest

type AttemptListRequest struct {
	Route   string
	Target  string
	EventID string
	Outcome AttemptOutcome
	Limit   int
	Before  time.Time
}

type AttemptListResponse

type AttemptListResponse struct {
	Items []DeliveryAttempt
}

type AttemptOutcome

type AttemptOutcome string
const (
	AttemptOutcomeAcked AttemptOutcome = "acked"
	AttemptOutcomeRetry AttemptOutcome = "retry"
	AttemptOutcomeDead  AttemptOutcome = "dead"
)

type BacklogBucket

type BacklogBucket struct {
	Route  string
	Target string
	Queued int

	OldestQueuedReceivedAt time.Time
	EarliestQueuedNextRun  time.Time
	OldestQueuedAge        time.Duration
	ReadyLag               time.Duration
}

type BacklogTrendListRequest

type BacklogTrendListRequest struct {
	Route  string
	Target string
	Since  time.Time
	Until  time.Time
	Limit  int
}

type BacklogTrendListResponse

type BacklogTrendListResponse struct {
	Items     []BacklogTrendSample
	Truncated bool
}

type BacklogTrendOperatorAction

type BacklogTrendOperatorAction struct {
	ID              string
	Severity        string
	AlertRoutingKey string
	Summary         string
	Playbook        string
	Alerts          []string
	MCPTools        []string
	AdminEndpoints  []string
}

func (BacklogTrendOperatorAction) Map

type BacklogTrendSample

type BacklogTrendSample struct {
	CapturedAt time.Time
	Queued     int
	Leased     int
	Dead       int
}

type BacklogTrendSignalConfig

type BacklogTrendSignalConfig struct {
	Window                  time.Duration
	ExpectedCaptureInterval time.Duration
	StaleGraceFactor        int

	SustainedGrowthConsecutive int
	SustainedGrowthMinSamples  int
	SustainedGrowthMinDelta    int

	RecentSurgeMinTotal int
	RecentSurgeMinDelta int
	RecentSurgePercent  int

	DeadShareHighMinTotal int
	DeadShareHighPercent  int

	QueuedPressureMinTotal         int
	QueuedPressurePercent          int
	QueuedPressureLeasedMultiplier int
}

func DefaultBacklogTrendSignalConfig

func DefaultBacklogTrendSignalConfig() BacklogTrendSignalConfig

type BacklogTrendSignalOptions

type BacklogTrendSignalOptions struct {
	Now                     time.Time
	Window                  time.Duration
	ExpectedCaptureInterval time.Duration
	Config                  BacklogTrendSignalConfig
}

type BacklogTrendSignals

type BacklogTrendSignals struct {
	Status                  string
	Window                  time.Duration
	ExpectedCaptureInterval time.Duration
	Since                   time.Time
	Until                   time.Time
	SampleCount             int
	Truncated               bool

	LatestCapturedAt time.Time
	LatestQueued     int
	LatestLeased     int
	LatestDead       int
	LatestTotal      int

	BaselineTotal        int
	DeltaTotal           int
	DeltaPercent         int
	GrowthRatePerMinute  float64
	ConsecutiveIncreases int

	SustainedGrowth bool
	RecentSurge     bool

	DeadSharePercent   int
	DeadShareHigh      bool
	QueuedSharePercent int
	QueuedPressure     bool

	FreshnessSeconds int
	SamplingStale    bool
	ActiveAlerts     []string
}

func AnalyzeBacklogTrendSignals

func AnalyzeBacklogTrendSignals(samples []BacklogTrendSample, truncated bool, opts BacklogTrendSignalOptions) BacklogTrendSignals

func (BacklogTrendSignals) Map

func (s BacklogTrendSignals) Map() map[string]any

func (BacklogTrendSignals) OperatorActions

func (s BacklogTrendSignals) OperatorActions() []BacklogTrendOperatorAction

type BacklogTrendStore

type BacklogTrendStore interface {
	CaptureBacklogTrendSample(at time.Time) error
	ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
}

BacklogTrendStore is an optional extension implemented by stores that can persist and query backlog trend snapshots.

type BatchEnqueuer

type BatchEnqueuer interface {
	EnqueueBatch(items []Envelope) (int, error)
}

BatchEnqueuer is an optional extension for transactional batch enqueue. When supported, all items are committed atomically (all-or-nothing).

type DeadDeleteRequest

type DeadDeleteRequest struct {
	IDs []string
}

type DeadDeleteResponse

type DeadDeleteResponse struct {
	Deleted int
}

type DeadListRequest

type DeadListRequest struct {
	Route          string
	Limit          int
	Before         time.Time
	IncludePayload bool
	IncludeHeaders bool
	IncludeTrace   bool
}

type DeadListResponse

type DeadListResponse struct {
	Items []Envelope
}

type DeadRequeueRequest

type DeadRequeueRequest struct {
	IDs []string
}

type DeadRequeueResponse

type DeadRequeueResponse struct {
	Requeued int
}

type DeliveryAttempt

type DeliveryAttempt struct {
	ID         string
	EventID    string
	Route      string
	Target     string
	Attempt    int
	StatusCode int
	Error      string
	Outcome    AttemptOutcome
	DeadReason string
	CreatedAt  time.Time
}

type DequeueRequest

type DequeueRequest struct {
	Route       string
	Target      string
	Batch       int
	MaxWait     time.Duration
	LeaseTTL    time.Duration
	Now         time.Time
	AllowLeased bool
}

type DequeueResponse

type DequeueResponse struct {
	Items []Envelope
}

type Envelope

type Envelope struct {
	ID            string
	Route         string
	Target        string
	State         State
	ReceivedAt    time.Time
	Attempt       int
	NextRunAt     time.Time
	Payload       []byte
	Headers       map[string]string
	Trace         map[string]string
	DeadReason    string
	SchemaVersion int
	LeaseID       string
	LeaseUntil    time.Time
}

type HistogramBucket added in v1.1.0

type HistogramBucket struct {
	Le    float64
	Count int64
}

type HistogramSnapshot added in v1.1.0

type HistogramSnapshot struct {
	Buckets []HistogramBucket
	Count   int64
	Sum     float64
}

type LeaseBatchConflict added in v1.4.0

type LeaseBatchConflict struct {
	LeaseID string
	Expired bool
}

type LeaseBatchResult added in v1.4.0

type LeaseBatchResult struct {
	Succeeded int
	Conflicts []LeaseBatchConflict
}

type LeaseBatchStore added in v1.4.0

type LeaseBatchStore interface {
	AckBatch(leaseIDs []string) (LeaseBatchResult, error)
	NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
	MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
}

LeaseBatchStore is an optional extension for batched lease operations. Implementations should process the whole batch in one store transaction where possible and return per-lease conflicts for not-found/expired leases.

type MemoryOption

type MemoryOption func(*MemoryStore)

func WithDLQRetention

func WithDLQRetention(maxAge time.Duration, maxDepth int) MemoryOption

func WithDeliveredRetention

func WithDeliveredRetention(maxAge time.Duration) MemoryOption

func WithMemoryPressureLimits added in v1.2.0

func WithMemoryPressureLimits(retainedItems int, retainedBytes int64) MemoryOption

WithMemoryPressureLimits overrides memory pressure limits for the in-memory backend. Positive values enable explicit thresholds.

func WithNowFunc

func WithNowFunc(now func() time.Time) MemoryOption

func WithQueueLimits

func WithQueueLimits(maxDepth int, dropPolicy string) MemoryOption

func WithQueueRetention

func WithQueueRetention(maxAge, pruneInterval time.Duration) MemoryOption

type MemoryPressureRuntimeMetrics added in v1.2.0

type MemoryPressureRuntimeMetrics struct {
	Active             bool
	Reason             string
	RetainedItems      int64
	RetainedBytes      int64
	RetainedItemLimit  int64
	RetainedBytesLimit int64
	RejectTotal        int64
}

type MemoryRuntimeMetrics added in v1.2.0

type MemoryRuntimeMetrics struct {
	ItemsByState           map[State]int64
	RetainedBytesByState   map[State]int64
	RetainedBytesTotal     int64
	EvictionsTotalByReason map[string]int64
	Pressure               MemoryPressureRuntimeMetrics
}

type MemoryStore

type MemoryStore struct {
	// contains filtered or unexported fields
}

func NewMemoryStore

func NewMemoryStore(opts ...MemoryOption) *MemoryStore

func (*MemoryStore) Ack

func (s *MemoryStore) Ack(leaseID string) error

func (*MemoryStore) AckBatch added in v1.4.0

func (s *MemoryStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)

func (*MemoryStore) CancelMessages

func (s *MemoryStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)

func (*MemoryStore) CancelMessagesByFilter

func (s *MemoryStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)

func (*MemoryStore) CaptureBacklogTrendSample

func (s *MemoryStore) CaptureBacklogTrendSample(at time.Time) error

func (*MemoryStore) DeleteDead

func (s *MemoryStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)

func (*MemoryStore) Dequeue

func (s *MemoryStore) Dequeue(req DequeueRequest) (DequeueResponse, error)

func (*MemoryStore) Enqueue

func (s *MemoryStore) Enqueue(env Envelope) error

func (*MemoryStore) EnqueueBatch

func (s *MemoryStore) EnqueueBatch(items []Envelope) (int, error)

EnqueueBatch atomically enqueues all items or none (all-or-nothing). Returns the number of items enqueued (0 or len(items)) and an error.

func (*MemoryStore) Extend

func (s *MemoryStore) Extend(leaseID string, extendBy time.Duration) error

func (*MemoryStore) ListAttempts

func (s *MemoryStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)

func (*MemoryStore) ListBacklogTrend

func (*MemoryStore) ListDead

func (s *MemoryStore) ListDead(req DeadListRequest) (DeadListResponse, error)

func (*MemoryStore) ListMessages

func (s *MemoryStore) ListMessages(req MessageListRequest) (MessageListResponse, error)

func (*MemoryStore) LookupMessages

func (s *MemoryStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)

func (*MemoryStore) MarkDead

func (s *MemoryStore) MarkDead(leaseID string, reason string) error

func (*MemoryStore) MarkDeadBatch added in v1.4.0

func (s *MemoryStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)

func (*MemoryStore) Nack

func (s *MemoryStore) Nack(leaseID string, delay time.Duration) error

func (*MemoryStore) NackBatch added in v1.4.0

func (s *MemoryStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)

func (*MemoryStore) RecordAttempt

func (s *MemoryStore) RecordAttempt(attempt DeliveryAttempt) error

func (*MemoryStore) RequeueDead

func (*MemoryStore) RequeueMessages

func (*MemoryStore) RequeueMessagesByFilter

func (s *MemoryStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)

func (*MemoryStore) ResumeMessages

func (s *MemoryStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)

func (*MemoryStore) ResumeMessagesByFilter

func (s *MemoryStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)

func (*MemoryStore) RuntimeMetrics added in v1.2.0

func (s *MemoryStore) RuntimeMetrics() StoreRuntimeMetrics

func (*MemoryStore) Stats

func (s *MemoryStore) Stats() (Stats, error)

type MessageCancelRequest

type MessageCancelRequest struct {
	IDs []string
}

type MessageCancelResponse

type MessageCancelResponse struct {
	Canceled    int
	Matched     int
	PreviewOnly bool
}

type MessageListRequest

type MessageListRequest struct {
	Route          string
	Target         string
	State          State
	Order          string
	Limit          int
	Before         time.Time
	IncludePayload bool
	IncludeHeaders bool
	IncludeTrace   bool
}

type MessageListResponse

type MessageListResponse struct {
	Items []Envelope
}

type MessageLookupItem

type MessageLookupItem struct {
	ID    string
	Route string
	State State
}

type MessageLookupRequest

type MessageLookupRequest struct {
	IDs []string
}

type MessageLookupResponse

type MessageLookupResponse struct {
	Items []MessageLookupItem
}

type MessageManageFilterRequest

type MessageManageFilterRequest struct {
	Route       string
	Target      string
	State       State
	Limit       int
	Before      time.Time
	PreviewOnly bool
}

type MessageRequeueRequest

type MessageRequeueRequest struct {
	IDs []string
}

type MessageRequeueResponse

type MessageRequeueResponse struct {
	Requeued    int
	Matched     int
	PreviewOnly bool
}

type MessageResumeRequest

type MessageResumeRequest struct {
	IDs []string
}

type MessageResumeResponse

type MessageResumeResponse struct {
	Resumed     int
	Matched     int
	PreviewOnly bool
}

type PostgresOption added in v1.5.0

type PostgresOption func(*PostgresStore)

func WithPostgresDLQRetention added in v1.5.0

func WithPostgresDLQRetention(maxAge time.Duration, maxDepth int) PostgresOption

func WithPostgresDeliveredRetention added in v1.5.0

func WithPostgresDeliveredRetention(maxAge time.Duration) PostgresOption

func WithPostgresNowFunc added in v1.5.0

func WithPostgresNowFunc(now func() time.Time) PostgresOption

func WithPostgresPollInterval added in v1.5.0

func WithPostgresPollInterval(d time.Duration) PostgresOption

func WithPostgresQueueLimits added in v1.5.0

func WithPostgresQueueLimits(maxDepth int, dropPolicy string) PostgresOption

func WithPostgresRetention added in v1.5.0

func WithPostgresRetention(maxAge, pruneInterval time.Duration) PostgresOption

type PostgresStore added in v1.5.0

type PostgresStore struct {
	// contains filtered or unexported fields
}

func NewPostgresStore added in v1.5.0

func NewPostgresStore(dsn string, opts ...PostgresOption) (*PostgresStore, error)

func (*PostgresStore) Ack added in v1.5.0

func (s *PostgresStore) Ack(leaseID string) error

func (*PostgresStore) AckBatch added in v1.5.0

func (s *PostgresStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)

func (*PostgresStore) CancelMessages added in v1.5.0

func (*PostgresStore) CancelMessagesByFilter added in v1.5.0

func (s *PostgresStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)

func (*PostgresStore) CaptureBacklogTrendSample added in v1.5.1

func (s *PostgresStore) CaptureBacklogTrendSample(at time.Time) error

func (*PostgresStore) Close added in v1.5.0

func (s *PostgresStore) Close() error

func (*PostgresStore) DeleteDead added in v1.5.0

func (*PostgresStore) Dequeue added in v1.5.0

func (*PostgresStore) Enqueue added in v1.5.0

func (s *PostgresStore) Enqueue(env Envelope) error

func (*PostgresStore) Extend added in v1.5.0

func (s *PostgresStore) Extend(leaseID string, extendBy time.Duration) error

func (*PostgresStore) ListAttempts added in v1.5.0

func (*PostgresStore) ListBacklogTrend added in v1.5.1

func (*PostgresStore) ListDead added in v1.5.0

func (*PostgresStore) ListMessages added in v1.5.0

func (*PostgresStore) LookupMessages added in v1.5.0

func (*PostgresStore) MarkDead added in v1.5.0

func (s *PostgresStore) MarkDead(leaseID string, reason string) error

func (*PostgresStore) MarkDeadBatch added in v1.5.0

func (s *PostgresStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)

func (*PostgresStore) Nack added in v1.5.0

func (s *PostgresStore) Nack(leaseID string, delay time.Duration) error

func (*PostgresStore) NackBatch added in v1.5.0

func (s *PostgresStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)

func (*PostgresStore) RecordAttempt added in v1.5.0

func (s *PostgresStore) RecordAttempt(attempt DeliveryAttempt) error

func (*PostgresStore) RequeueDead added in v1.5.0

func (*PostgresStore) RequeueMessages added in v1.5.0

func (*PostgresStore) RequeueMessagesByFilter added in v1.5.0

func (s *PostgresStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)

func (*PostgresStore) ResumeMessages added in v1.5.0

func (*PostgresStore) ResumeMessagesByFilter added in v1.5.0

func (s *PostgresStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)

func (*PostgresStore) RuntimeMetrics added in v1.5.0

func (s *PostgresStore) RuntimeMetrics() StoreRuntimeMetrics

func (*PostgresStore) Stats added in v1.5.0

func (s *PostgresStore) Stats() (Stats, error)

type RuntimeMetricsProvider added in v1.1.0

type RuntimeMetricsProvider interface {
	RuntimeMetrics() StoreRuntimeMetrics
}

type SQLiteOption

type SQLiteOption func(*SQLiteStore)

func WithSQLiteCheckpointInterval added in v1.1.0

func WithSQLiteCheckpointInterval(d time.Duration) SQLiteOption

func WithSQLiteDLQRetention

func WithSQLiteDLQRetention(maxAge time.Duration, maxDepth int) SQLiteOption

func WithSQLiteDeliveredRetention

func WithSQLiteDeliveredRetention(maxAge time.Duration) SQLiteOption

func WithSQLiteNowFunc

func WithSQLiteNowFunc(now func() time.Time) SQLiteOption

func WithSQLitePollInterval

func WithSQLitePollInterval(d time.Duration) SQLiteOption

func WithSQLiteQueueLimits

func WithSQLiteQueueLimits(maxDepth int, dropPolicy string) SQLiteOption

func WithSQLiteRetention

func WithSQLiteRetention(maxAge, pruneInterval time.Duration) SQLiteOption

type SQLiteRuntimeMetrics added in v1.1.0

type SQLiteRuntimeMetrics struct {
	WriteDurationSeconds      HistogramSnapshot
	DequeueDurationSeconds    HistogramSnapshot
	CheckpointDurationSeconds HistogramSnapshot
	BusyTotal                 int64
	RetryTotal                int64
	TxCommitTotal             int64
	TxRollbackTotal           int64
	CheckpointTotal           int64
	CheckpointErrorTotal      int64
}

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

func NewSQLiteStore

func NewSQLiteStore(dbPath string, opts ...SQLiteOption) (*SQLiteStore, error)

func (*SQLiteStore) Ack

func (s *SQLiteStore) Ack(leaseID string) error

func (*SQLiteStore) AckBatch added in v1.4.0

func (s *SQLiteStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)

func (*SQLiteStore) CancelMessages

func (s *SQLiteStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)

func (*SQLiteStore) CancelMessagesByFilter

func (s *SQLiteStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)

func (*SQLiteStore) CaptureBacklogTrendSample

func (s *SQLiteStore) CaptureBacklogTrendSample(at time.Time) error

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

func (*SQLiteStore) DeleteDead

func (s *SQLiteStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)

func (*SQLiteStore) Dequeue

func (s *SQLiteStore) Dequeue(req DequeueRequest) (DequeueResponse, error)

func (*SQLiteStore) Enqueue

func (s *SQLiteStore) Enqueue(env Envelope) error

func (*SQLiteStore) EnqueueBatch

func (s *SQLiteStore) EnqueueBatch(items []Envelope) (int, error)

EnqueueBatch atomically enqueues all items or none (all-or-nothing) in a single transaction. Returns the number of items enqueued and an error.

func (*SQLiteStore) Extend

func (s *SQLiteStore) Extend(leaseID string, extendBy time.Duration) error

func (*SQLiteStore) ListAttempts

func (s *SQLiteStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)

func (*SQLiteStore) ListBacklogTrend

func (*SQLiteStore) ListDead

func (s *SQLiteStore) ListDead(req DeadListRequest) (DeadListResponse, error)

func (*SQLiteStore) ListMessages

func (s *SQLiteStore) ListMessages(req MessageListRequest) (MessageListResponse, error)

func (*SQLiteStore) LookupMessages

func (s *SQLiteStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)

func (*SQLiteStore) MarkDead

func (s *SQLiteStore) MarkDead(leaseID string, reason string) error

func (*SQLiteStore) MarkDeadBatch added in v1.4.0

func (s *SQLiteStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)

func (*SQLiteStore) Nack

func (s *SQLiteStore) Nack(leaseID string, delay time.Duration) error

func (*SQLiteStore) NackBatch added in v1.4.0

func (s *SQLiteStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)

func (*SQLiteStore) RecordAttempt

func (s *SQLiteStore) RecordAttempt(attempt DeliveryAttempt) error

func (*SQLiteStore) RequeueDead

func (*SQLiteStore) RequeueMessages

func (*SQLiteStore) RequeueMessagesByFilter

func (s *SQLiteStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)

func (*SQLiteStore) ResumeMessages

func (s *SQLiteStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)

func (*SQLiteStore) ResumeMessagesByFilter

func (s *SQLiteStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)

func (*SQLiteStore) RuntimeMetrics added in v1.1.0

func (s *SQLiteStore) RuntimeMetrics() StoreRuntimeMetrics

func (*SQLiteStore) Stats

func (s *SQLiteStore) Stats() (Stats, error)

type State

type State string
const (
	StateQueued    State = "queued"
	StateLeased    State = "leased"
	StateDelivered State = "delivered"
	StateDead      State = "dead"
	StateCanceled  State = "canceled"
)

type Stats

type Stats struct {
	Total   int
	ByState map[State]int

	OldestQueuedReceivedAt time.Time
	EarliestQueuedNextRun  time.Time
	OldestQueuedAge        time.Duration
	ReadyLag               time.Duration

	TopQueued []BacklogBucket
}

type Store

type Store interface {
	Enqueue(env Envelope) error
	Dequeue(req DequeueRequest) (DequeueResponse, error)
	Ack(leaseID string) error
	Nack(leaseID string, delay time.Duration) error
	Extend(leaseID string, extendBy time.Duration) error
	MarkDead(leaseID string, reason string) error
	ListDead(req DeadListRequest) (DeadListResponse, error)
	RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
	DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
	ListMessages(req MessageListRequest) (MessageListResponse, error)
	LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
	CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
	RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
	ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
	CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
	RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
	ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
	Stats() (Stats, error)
	RecordAttempt(attempt DeliveryAttempt) error
	ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
}

type StoreCommonRuntimeMetrics added in v1.5.0

type StoreCommonRuntimeMetrics struct {
	OperationDurationSeconds []StoreOperationDurationRuntimeMetric
	OperationTotal           []StoreOperationCounterRuntimeMetric
	ErrorsTotal              []StoreOperationErrorRuntimeMetric
}

type StoreOperationCounterRuntimeMetric added in v1.5.0

type StoreOperationCounterRuntimeMetric struct {
	Operation string
	Total     int64
}

type StoreOperationDurationRuntimeMetric added in v1.5.0

type StoreOperationDurationRuntimeMetric struct {
	Operation       string
	DurationSeconds HistogramSnapshot
}

type StoreOperationErrorRuntimeMetric added in v1.5.0

type StoreOperationErrorRuntimeMetric struct {
	Operation string
	Kind      string
	Total     int64
}

type StoreRuntimeMetrics added in v1.1.0

type StoreRuntimeMetrics struct {
	Backend string
	Common  StoreCommonRuntimeMetrics
	SQLite  *SQLiteRuntimeMetrics
	Memory  *MemoryRuntimeMetrics
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL