buses

package
v0.0.0-...-d5b48cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2018 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Dispatcher  = "dispatcher"
	Provisioner = "provisioner"
)

Variables

View Source
var ErrUnknownChannel = errors.New("unknown channel")

ErrUnknownChannel is returned when a message is received by a bus for a channel that does not exist.

Functions

This section is empty.

Types

type BusReference

type BusReference struct {
	Name      string
	Namespace string
}

func (*BusReference) String

func (r *BusReference) String() string

type ChannelReference

type ChannelReference struct {
	Name      string
	Namespace string
}

func (*ChannelReference) String

func (r *ChannelReference) String() string

type DispatchDefaults

type DispatchDefaults struct {
	Namespace string
	ReplyTo   string
}

DispatchDefaults provides default parameter values used when dispatching a message.

type Message

type Message struct {

	// Headers provide metadata about the message payload. All header keys
	// should be lowercase.
	Headers map[string]string

	// Payload is the raw binary content of the message. The payload format is
	// often described by the 'content-type' header.
	Payload []byte
}

Message represents an chunk of data within a bus. The message contains both a map of string headers and a binary payload.

A message may represent a CloudEvent.

type MessageDispatcher

type MessageDispatcher struct {
	// contains filtered or unexported fields
}

MessageDispatcher dispatches messages to a destination over HTTP.

func NewMessageDispatcher

func NewMessageDispatcher() *MessageDispatcher

NewMessageDispatcher creates a new message dispatcher that can dispatch messages to HTTP destinations.

func (*MessageDispatcher) DispatchMessage

func (d *MessageDispatcher) DispatchMessage(message *Message, destination string, defaults DispatchDefaults) error

DispatchMessage dispatches a message to a destination over HTTP.

The destination and replyTo are DNS names. For names with a single label, the default namespace is used to expand it into a fully qualified name within the cluster.

type MessageReceiver

type MessageReceiver struct {
	// contains filtered or unexported fields
}

MessageReceiver starts a server to receive new messages for the bus. The new message is emitted via the receiver function.

func NewMessageReceiver

func NewMessageReceiver(receiverFunc func(*ChannelReference, *Message) error) *MessageReceiver

NewMessageReceiver creates a message receiver passing new messages to the receiverFunc.

func (*MessageReceiver) HandleRequest

func (r *MessageReceiver) HandleRequest(res http.ResponseWriter, req *http.Request)

HandleRequest is an http Handler function. The request is converted to a Message and emitted to the receiver func.

The response status codes:

202 - the message was sent to subscibers
404 - the request was for an unknown channel
500 - an error occured processing the request

func (*MessageReceiver) Run

func (r *MessageReceiver) Run(stopCh <-chan struct{})

Run starts receiving messages for the receiver.

Only HTTP POST requests to the root path (/) are accepted. If other paths or methods are needed, use the HandleRequest method directly with another HTTP server.

This method will block until a message is received on the stop channel.

type Monitor

type Monitor struct {
	// contains filtered or unexported fields
}

Monitor is a utility mix-in intended to be used by Bus authors to easily write provisioners and dispatchers for buses. Bus provisioners are responsible for managing the storage asset(s) that back a channel. Bus dispatchers are responsible for dispatching events on the Channel to the Channel's Subscriptions. Monitor handles setting up informers that watch a Bus, its Channels, and their Subscriptions and allows Bus authors to register event handler functions to be called when Provision/Unprovision and Subscribe/Unsubscribe happen.

func NewMonitor

func NewMonitor(
	component, masterURL, kubeconfig string,
	handler MonitorEventHandlerFuncs,
) *Monitor

NewMonitor creates a monitor for a bus given:

component: the name of the component this monitor should use in created k8s events masterURL: the URL of the API server the monitor should communicate with kubeconfig: the path of a kubeconfig file to create a client connection to the masterURL with handler: a MonitorEventHandlerFuncs with handler functions for the monitor to call

func (*Monitor) Channel

func (m *Monitor) Channel(name string, namespace string) *channelsv1alpha1.Channel

Channel returns the provisioned Channel with the given name and namespace, or nil if such a Channel hasn't been provisioned.

func (*Monitor) RequeueSubscription

func (m *Monitor) RequeueSubscription(subscription *channelsv1alpha1.Subscription)

func (*Monitor) Run

func (m *Monitor) Run(busNamespace, busName string, threadiness int, stopCh <-chan struct{}) error

Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until stopCh is closed, at which point it will shutdown the workqueue and wait for workers to finish processing their current work items.

func (*Monitor) Subscription

func (m *Monitor) Subscription(name string, namespace string) *channelsv1alpha1.Subscription

Subscription returns the provisioned Subscription with the given name and namespace, or nil if such a Subscription hasn't been provisioned.

func (*Monitor) Subscriptions

func (m *Monitor) Subscriptions(channelName string, namespace string) *[]channelsv1alpha1.SubscriptionSpec

Subscriptions returns a slice of SubscriptionSpecs for the Channel with the given name and namespace, or nil if the Channel hasn't been provisioned.

func (*Monitor) WaitForCacheSync

func (m *Monitor) WaitForCacheSync(stopCh <-chan struct{}) error

WaitForCacheSync blocks returning until the monitor's informers have synchronized. It returns an error if the caches cannot sync.

type MonitorEventHandlerFuncs

type MonitorEventHandlerFuncs struct {
	// BusFunc is invoked when the Bus requires sync.
	BusFunc func(bus channelsv1alpha1.GenericBus) error

	// ProvisionFunc is invoked when a new Channel should be provisioned or when
	// the attributes change.
	ProvisionFunc func(channel *channelsv1alpha1.Channel, parameters ResolvedParameters) error

	// UnprovisionFunc in invoked when a Channel should be deleted.
	UnprovisionFunc func(channel *channelsv1alpha1.Channel) error

	// SubscribeFunc is invoked when a new Subscription should be set up or when
	// the attributes change.
	SubscribeFunc func(subscription *channelsv1alpha1.Subscription, parameters ResolvedParameters) error

	// UnsubscribeFunc is invoked when a Subscription should be deleted.
	UnsubscribeFunc func(subscription *channelsv1alpha1.Subscription) error
}

MonitorEventHandlerFuncs is a set of handler functions that are called when a bus requires sync, channels are provisioned/unprovisioned, or a subscription is created or deleted, or if one of the relevant resources is changed.

type ResolvedParameters

type ResolvedParameters = map[string]string

Directories

Path Synopsis
kafka

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL