persistentqueue

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: Apache-2.0 Imports: 10 Imported by: 0

README

PagerDuty Agent: Persistentqueue Package

Provides on-disk persistence for an underlying event queue from the eventqueue package.

This persistence is primarily leveraged during startup to ensure that any pending events from a previous shutdown are still processed and to provide queue analysis.

For example usage see:

Documentation

Index

Constants

View Source
const StatusError = "error"
View Source
const StatusPending = "pending"
View Source
const StatusSuccess = "success"

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	ID           int    `storm:"id,increment"`
	Key          string `storm:"index"`
	RoutingKey   string `storm:"index"`
	Status       string `storm:"index"`
	Event        *eventsapi.EventContainer
	ResponseBody []byte
	CreatedAt    time.Time `storm:"index"`
	UpdatedAt    time.Time `storm:"index"`
}

Event represents an queued or processed event.

func FindEventByKey

func FindEventByKey(db storm.Node, key string) (*Event, error)

func NewEvent

func NewEvent(eventContainer *eventsapi.EventContainer) (*Event, error)

func (*Event) Create

func (e *Event) Create(db storm.Node) error

Create an event within the specified Queue.

Main convenience is ensuring that CreatedAt and UpdatedAt are set.

func (*Event) Update

func (e *Event) Update(db storm.Node) error

Update an event within the specified Queue.

Main convenience is ensuring that UpdatedAt is updated.

type EventQueue

type EventQueue interface {
	Enqueue(*eventsapi.EventContainer, chan<- eventqueue.Response) error
	Shutdown()
}

type Option

type Option func(*PersistentQueue)

func WithEventQueue

func WithEventQueue(eq EventQueue) Option

func WithFile

func WithFile(path string) Option

type PersistentQueue

type PersistentQueue struct {
	DB         *storm.DB
	Events     storm.Node
	EventQueue EventQueue
	// contains filtered or unexported fields
}

func NewPersistentQueue

func NewPersistentQueue(options ...Option) *PersistentQueue

func (*PersistentQueue) Enqueue

func (q *PersistentQueue) Enqueue(eventContainer *eventsapi.EventContainer) (string, error)

Enqueue adds an event to the persistent queue for processing.

Returns the event record's key along with any synchronous errors.

Only synchronous errors (e.g. invalid event) are supported as there are cases where we might not have a per-event response channel (e.g. processing a backlog).

func (*PersistentQueue) Retry

func (q *PersistentQueue) Retry(routingKey string) (int, error)

Retries events that are in an error state, either for an routing key or for all events in error if none is provided.

func (*PersistentQueue) Shutdown

func (q *PersistentQueue) Shutdown() error

Stop a `PersistentQueue`, performing any necessary cleanup.

func (*PersistentQueue) Start

func (q *PersistentQueue) Start() error

func (*PersistentQueue) Status

func (q *PersistentQueue) Status(routingKey string) ([]StatusItem, error)

Returns aggregate stats per routing key for pending and enqueued events.

type StatusItem

type StatusItem struct {
	RoutingKey string `json:"routing_key"`
	Pending    int    `json:"pending"`
	Success    int    `json:"success"`
	Error      int    `json:"error"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL