messaging

package
v5.10.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2019 License: MIT, MIT, MIT Imports: 5 Imported by: 88

Documentation

Overview

Package messaging provides the means of coordination between the different components of the Sensu backend.

Index

Constants

View Source
const (
	// TopicEvent is the topic for events that have been written to Etcd and
	// normalized by eventd.
	TopicEvent = "sensu:event"

	// TopicKeepalive is the topic for keepalive events.
	TopicKeepalive = "sensu:keepalive"

	// TopicEventRaw is the Session -> Eventd channel -- for raw events directly
	// from agents, subscribe to this.
	TopicEventRaw = "sensu:event-raw"

	// TopicSubscriptions is the topic prefix for each subscription
	TopicSubscriptions = "sensu:check"

	// TopicTessen is the topic prefix for tessen api events to Tessend.
	TopicTessen = "sensu:tessen"

	// TopicTessenMetric is the topic prefix for tessen api metrics to Tessend.
	TopicTessenMetric = "sensu:tessen-metric"
)

Variables

This section is empty.

Functions

func SubscriptionTopic

func SubscriptionTopic(namespace, sub string) string

SubscriptionTopic is a helper to determine the proper topic name for a subscription based on the namespace

Types

type MessageBus

type MessageBus interface {
	daemon.Daemon

	// Subscribe allows a consumer to subscribe to a topic,
	// binding a specific Subscriber to the topic. Topic messages
	// are delivered to the subscriber as type `interface{}`.
	Subscribe(topic string, consumer string, subscriber Subscriber) (Subscription, error)

	// Publish sends a message to a topic.
	Publish(topic string, message interface{}) error
}

MessageBus is the interface to the internal messaging system.

The MessageBus is a simple implementation of Event Sourcing where you have one or more producers publishing events and multiple consumers receiving all of the events produced. We've adopted AMQPs "topic" concept allowing the bus to route multiple types of messages.

Consumers should be careful to send buffered channels to the MessageBus in the Subscribe() method, as Subscribe attempts a non-blocking send to the provided channel. If there is no receiver / or if the receiver is not ready for the message, _the message will be lost_. Events published to the bus are fanned out linearly to all (i.e. ordered) to all subscribers.

type Subscriber

type Subscriber interface {

	// Receiver returns the channel a subscriber uses to receive messages.
	Receiver() chan<- interface{}
}

A Subscriber receives messages via a channel.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

A Subscription is a cancellable subscription to a WizardTopic.

func (Subscription) Cancel

func (t Subscription) Cancel() error

Cancel a WizardSubscription.

type WizardBus

type WizardBus struct {
	// contains filtered or unexported fields
}

WizardBus is a message bus.

For every topic, WizardBus creates a new goroutine responsible for fanning messages out to each subscriber for a given topic. Any type can be passed across a WizardTopic and it is up to the consumers/producers to coordinate around a particular topic type. Care should be taken not to send multiple message types over a single topic, however, as we do not want to introduce a dependency on reflection to determine the type of the received interface{}.

func NewWizardBus

func NewWizardBus(cfg WizardBusConfig, opts ...WizardOption) (*WizardBus, error)

NewWizardBus creates a new WizardBus.

func (*WizardBus) Err

func (b *WizardBus) Err() <-chan error

Err ...

func (*WizardBus) Name

func (b *WizardBus) Name() string

Name returns the daemon name

func (*WizardBus) Publish

func (b *WizardBus) Publish(topic string, msg interface{}) error

Publish publishes a message to a topic. If the topic does not exist, this is a noop.

func (*WizardBus) Start

func (b *WizardBus) Start() error

Start ...

func (*WizardBus) Stop

func (b *WizardBus) Stop() error

Stop ...

func (*WizardBus) Subscribe

func (b *WizardBus) Subscribe(topic string, consumer string, sub Subscriber) (Subscription, error)

Subscribe to a WizardBus topic. This function locks the WizardBus mutex (RW), fetches the appropriate WizardTopic (or creates it if missing), unlocks the WizardBus mutex, locks the WizardTopic's mutex (RW), adds the consumer channel to the WizardTopic's bindings, and unlocks the WizardTopics mutex.

WARNING:

Messages received over a topic should be considered IMMUTABLE by consumers. Modifying received messages will introduce data races. While these _may_ be detected by the Golang race detector, this is not always the case and is only exacerbated by the fact that we test each package individually.

type WizardBusConfig

type WizardBusConfig struct{}

WizardBusConfig configures a WizardBus

type WizardOption

type WizardOption func(*WizardBus) error

WizardOption is a functional option.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL