eventqueue

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 30, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

README

PagerDuty Agent: Eventqueue Package

An in-memory event queue for for use on top of the eventsapi package.

Features include:

  • Ensuring ordering on a per-routing key basis.
  • Handling back-pressure.

For example usage see:

Documentation

Index

Constants

View Source
const DefaultBufferSize = 1000
View Source
const MaxRetryTimeout = 30 * time.Second

Variables

View Source
var DefaultProcessor = EventProcessor
View Source
var ErrAPIError = errors.New("An API error was encountered while processing events.")
View Source
var ErrJobStopped = errors.New("Job stopped while retrying.")

Functions

func EventProcessor

func EventProcessor(job Job, stop chan bool)

EventProcessor is a Job processor for use by an EventQueue specifically designed to send and receive from the PagerDuty Events V1 or V2 API.

It accepts a Job containing an EventContainer

Types

type ErrBufferOverflow

type ErrBufferOverflow struct {
	// contains filtered or unexported fields
}

func (*ErrBufferOverflow) Error

func (e *ErrBufferOverflow) Error() string

type EventQueue

type EventQueue struct {
	Processor Processor
	// contains filtered or unexported fields
}

EventQueues are a basic thread-safe queue for processing PagerDuty events.

Each EventQueue is internally composed of several individual queues segmented by routing key, ensuring that events are in-order on a per routing key basis. Each of these queues has a single dedicated worker.

All responses occur through a single user-provided channel when enqueuing events.

EventQueues also have a configurable, synchronous processor. By default this processor sends events to PagerDuty's events API..

Example usage:

    queue := eventqueue.NewEventQueue()

    event := eventsapi.EventV2{
		   RoutingKey:  "NN2EZIJQPVVQF3KIKN2MDMSUV7E6GLFN",
		   EventAction: "trigger",
		   Payload:     eventsapi.PayloadV2{
			   Summary:  "Test summary",
			   Source:   "Test source",
			   Severity: "Error",
		   },
    }

    rawEvent, _ := json.Marshal(event)

    eventContainer := eventsapi.EventContainer{
      EventVersion: event.Version(),
      EventData:    rawEvent,
    }

    respChan = make(chan eventqueue.Response)

    queue.Enqueue(&eventContainer, respChan)

    resp := <-respChan

    // When you're done with the queue.
    queue.Shutdown()

func NewEventQueue

func NewEventQueue() *EventQueue

NewEventQueue initializes a new default EventQueue.

func (*EventQueue) Enqueue

func (q *EventQueue) Enqueue(eventContainer *eventsapi.EventContainer, respChan chan<- Response) error

Enqueue a PagerDuty event for processing.

Accepts an event and a channel over which to communicate responses. Errors come in two flavors: Synchronous errors (e.g. event is invalid and never queued) as a return value and asynchronous errors (e.g. server error) that are part of the channel Response.

func (*EventQueue) Shutdown

func (q *EventQueue) Shutdown()

Shutdown the queue and all associated workers.

There may be a blocking delay while any running workers or processors attempt to complete their current tasks.

type Job

type Job struct {
	EventContainer *eventsapi.EventContainer
	ResponseChan   chan<- Response
	Logger         *zap.SugaredLogger
}

type Processor

type Processor func(Job, chan bool)

type Response

type Response struct {
	Response eventsapi.Response
	Error    error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL