Documentation ¶
Overview ¶
Package eventbus provides a simple event bus implementation.
The event bus provided by this package supports asynchronous and synchronous publishing and a flexible event matching mechanism. Each handler registered with Bus.Subscribe has its own event queue used to store events and its own goroutine to process events. Each handler processes events in the order they are published. When an asynchronous event is published on the bus and the event can't be stored in the event queue of a handler because its queue is full, the event is dropped for this handler and a Dropped event is generated by the bus. A slow handler doesn't impact other handlers when publishing asynchronous events.
Example ¶
package main import ( "fmt" "time" "github.com/montag451/go-eventbus" ) type ProcessStarted struct { Pid int } func (ProcessStarted) Name() eventbus.EventName { return "process.started" } type ProcessSignaled struct { Pid int Signal int } func (ProcessSignaled) Name() eventbus.EventName { return "process.signaled" } type ProcessExited struct { Pid int ExitCode int } func (ProcessExited) Name() eventbus.EventName { return "process.exited" } func main() { closed := make(chan struct{}) b := eventbus.New(eventbus.WithClosedHandler(func() { close(closed) })) b.Subscribe(eventbus.WildcardPattern("process.*"), func(e eventbus.Event, t time.Time) { switch e := e.(type) { case ProcessStarted: fmt.Printf("Process %d started\n", e.Pid) case ProcessSignaled: fmt.Printf("Process %d has been signaled with signal %d\n", e.Pid, e.Signal) case ProcessExited: fmt.Printf("Process %d exited with code %d\n", e.Pid, e.ExitCode) } }) b.Publish(ProcessStarted{12000}) b.PublishSync(ProcessSignaled{12000, 9}) b.PublishAsync(ProcessExited{12000, 0}) b.Close() <-closed }
Output: Process 12000 started Process 12000 has been signaled with signal 9 Process 12000 exited with code 0
Index ¶
- Constants
- Variables
- type Bus
- func (b *Bus) Close()
- func (b *Bus) HasSubscribers(name EventName) (bool, error)
- func (b *Bus) Publish(e Event) error
- func (b *Bus) PublishAsync(e Event) error
- func (b *Bus) PublishSync(e Event) error
- func (b *Bus) Subscribe(p EventNamePattern, fn HandlerFunc, options ...SubscribeOption) (*Handler, error)
- func (b *Bus) Unsubscribe(h *Handler) error
- type BusOption
- type Dropped
- type Event
- type EventName
- type EventNamePattern
- type Handler
- type HandlerFunc
- type SubscribeOption
Examples ¶
Constants ¶
const DroppedEventName = EventName("_bus.dropped")
DroppedEventName is the name of the Dropped event
Variables ¶
var ErrBusClosed = errors.New("bus is closed")
ErrBusClosed is the error returned by all bus methods (except Bus.Close) if called on a closed bus. It is the only error returned by this package.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus represents an event bus. A Bus is safe for use by multiple goroutines simultaneously.
func (*Bus) Close ¶
func (b *Bus) Close()
Close closes the event bus and starts draining, in the background, the event queue of all handlers that has not been registered with the WithNoDrain option. When all the handlers have been drained, the callback set with WithClosedHandler is called.
func (*Bus) HasSubscribers ¶
HasSubscribers returns true if the given event name has subscribers otherwise it returns false.
func (*Bus) Publish ¶
Publish publishes an event on the bus and returns when the event has been put in the event queue of all the handlers subscribed to the event.
func (*Bus) PublishAsync ¶
PublishAsync publishes an event asynchronously. It returns as soon as the event has been put in the event queue of all the handlers subscribed to the event. If the event queue of a handler is full, the event is dropped for this handler and a Dropped event is generated.
func (*Bus) PublishSync ¶
PublishSync publishes an event synchronously. It returns when the event has been processed by all the handlers subscribed to the event.
func (*Bus) Subscribe ¶
func (b *Bus) Subscribe(p EventNamePattern, fn HandlerFunc, options ...SubscribeOption) (*Handler, error)
Subscribe subscribes to all events matching the given event name pattern. It returns a Handler instance representing the subscription.
func (*Bus) Unsubscribe ¶
Unsubscribe unsubscribes the given handler for all events matching the handler pattern and starts draining, in the background, the handler event queue if the given handler has not been registered with the WithNoDrain option. When the handler has been drained, the callback set with WithUnsubscribedHandler is called.
type BusOption ¶
type BusOption func(*busOptions)
BusOption configures a Bus as returned by New.
func WithClosedHandler ¶
func WithClosedHandler(f func()) BusOption
WithClosedHandler sets the handler that will be called after the bus has been closed and all subscribed handlers have been drained.
type Dropped ¶
Dropped is the event published internally by the bus to signal that an event has been dropped. This event occurs only when an event is published asynchronously and the handler queue is full. Dropped events can themselves be dropped (no Dropped event is generated in this case) if a handler queue is full.
type Event ¶
type Event interface {
Name() EventName
}
Event is the interface implemented by all events that are published on the bus.
type EventName ¶
type EventName string
EventName represents the name of an event. It implements EventNamePattern so it can be used as a pattern in a call to Bus.Subscribe.
type EventNamePattern ¶
EventNamePattern is the interface implemented by all event patterns.
func RegexPattern ¶
func RegexPattern(re *regexp.Regexp) EventNamePattern
RegexPattern returns a pattern to match against event names. It matches all events that match the given regex.
func WildcardPattern ¶
func WildcardPattern(pattern string) EventNamePattern
WildcardPattern returns a pattern to match against event names. Only '*' has a special meaning in a pattern, it matches any string, including the empty string.
type Handler ¶
type Handler struct {
// contains filtered or unexported fields
}
Handler represents a subscription to some events.
func (*Handler) Pattern ¶
func (h *Handler) Pattern() EventNamePattern
Pattern returns the handler pattern.
func (*Handler) PendingEvents ¶
PendingEvents returns the number of events in the handler queue.
type HandlerFunc ¶
HandlerFunc is the type of the function called by the bus to process events.
The e argument is the event to process. The t argument is the time when the event has been generated.
type SubscribeOption ¶
type SubscribeOption func(*subscribeOptions)
SubscribeOption configures a Handler as returned by Bus.Subscribe.
func WithCallOnce ¶
func WithCallOnce() SubscribeOption
WithCallOnce ensures that the handler will be called only once
func WithNoDrain ¶
func WithNoDrain() SubscribeOption
WithNoDrain prevents Bus.Close and Bus.Unsubscribe to drain the handler event queue.
func WithQueueSize ¶
func WithQueueSize(size int) SubscribeOption
WithQueueSize sets the queue size of the handler.
func WithUnsubscribedHandler ¶
func WithUnsubscribedHandler(f func()) SubscribeOption
WithUnsubscribedHandler sets the handler that will be called after the handler has been unsubscribed and drained.