eventbus

package module
v0.0.0-...-24f718a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2023 License: MIT Imports: 5 Imported by: 0

README

Description

The eventbus package provides a simple event bus for Go. It features:

  • async/sync publishing
  • flexible event matching mechanism (exact match, wildcard, regex, ...)

Usage

See eventbus_example_test.go for an example of how to use this package

Documentation

You can find the documentation of the package here

Documentation

Overview

Package eventbus provides a simple event bus implementation.

The event bus provided by this package supports asynchronous and synchronous publishing and a flexible event matching mechanism. Each handler registered with Bus.Subscribe has its own event queue used to store events and its own goroutine to process events. Each handler processes events in the order they are published. When an asynchronous event is published on the bus and the event can't be stored in the event queue of a handler because its queue is full, the event is dropped for this handler and a Dropped event is generated by the bus. A slow handler doesn't impact other handlers when publishing asynchronous events.

Example
package main

import (
	"fmt"
	"time"

	"github.com/montag451/go-eventbus"
)

type ProcessStarted struct {
	Pid int
}

func (ProcessStarted) Name() eventbus.EventName {
	return "process.started"
}

type ProcessSignaled struct {
	Pid    int
	Signal int
}

func (ProcessSignaled) Name() eventbus.EventName {
	return "process.signaled"
}

type ProcessExited struct {
	Pid      int
	ExitCode int
}

func (ProcessExited) Name() eventbus.EventName {
	return "process.exited"
}

func main() {
	closed := make(chan struct{})
	b := eventbus.New(eventbus.WithClosedHandler(func() {
		close(closed)
	}))
	b.Subscribe(eventbus.WildcardPattern("process.*"), func(e eventbus.Event, t time.Time) {
		switch e := e.(type) {
		case ProcessStarted:
			fmt.Printf("Process %d started\n", e.Pid)
		case ProcessSignaled:
			fmt.Printf("Process %d has been signaled with signal %d\n", e.Pid, e.Signal)
		case ProcessExited:
			fmt.Printf("Process %d exited with code %d\n", e.Pid, e.ExitCode)
		}
	})
	b.Publish(ProcessStarted{12000})
	b.PublishSync(ProcessSignaled{12000, 9})
	b.PublishAsync(ProcessExited{12000, 0})
	b.Close()
	<-closed
}
Output:

Process 12000 started
Process 12000 has been signaled with signal 9
Process 12000 exited with code 0

Index

Examples

Constants

View Source
const DroppedEventName = EventName("_bus.dropped")

DroppedEventName is the name of the Dropped event

Variables

View Source
var ErrBusClosed = errors.New("bus is closed")

ErrBusClosed is the error returned by all bus methods (except Bus.Close) if called on a closed bus. It is the only error returned by this package.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus represents an event bus. A Bus is safe for use by multiple goroutines simultaneously.

func New

func New(options ...BusOption) *Bus

New creates a new event bus, ready to be used.

func (*Bus) Close

func (b *Bus) Close()

Close closes the event bus and starts draining, in the background, the event queue of all handlers that has not been registered with the WithNoDrain option. When all the handlers have been drained, the callback set with WithClosedHandler is called.

func (*Bus) HasSubscribers

func (b *Bus) HasSubscribers(name EventName) (bool, error)

HasSubscribers returns true if the given event name has subscribers otherwise it returns false.

func (*Bus) Publish

func (b *Bus) Publish(e Event) error

Publish publishes an event on the bus and returns when the event has been put in the event queue of all the handlers subscribed to the event.

func (*Bus) PublishAsync

func (b *Bus) PublishAsync(e Event) error

PublishAsync publishes an event asynchronously. It returns as soon as the event has been put in the event queue of all the handlers subscribed to the event. If the event queue of a handler is full, the event is dropped for this handler and a Dropped event is generated.

func (*Bus) PublishSync

func (b *Bus) PublishSync(e Event) error

PublishSync publishes an event synchronously. It returns when the event has been processed by all the handlers subscribed to the event.

func (*Bus) Subscribe

func (b *Bus) Subscribe(p EventNamePattern, fn HandlerFunc, options ...SubscribeOption) (*Handler, error)

Subscribe subscribes to all events matching the given event name pattern. It returns a Handler instance representing the subscription.

func (*Bus) Unsubscribe

func (b *Bus) Unsubscribe(h *Handler) error

Unsubscribe unsubscribes the given handler for all events matching the handler pattern and starts draining, in the background, the handler event queue if the given handler has not been registered with the WithNoDrain option. When the handler has been drained, the callback set with WithUnsubscribedHandler is called.

type BusOption

type BusOption func(*busOptions)

BusOption configures a Bus as returned by New.

func WithClosedHandler

func WithClosedHandler(f func()) BusOption

WithClosedHandler sets the handler that will be called after the bus has been closed and all subscribed handlers have been drained.

type Dropped

type Dropped struct {
	Handler   *Handler
	EventTime time.Time
	Event     Event
}

Dropped is the event published internally by the bus to signal that an event has been dropped. This event occurs only when an event is published asynchronously and the handler queue is full. Dropped events can themselves be dropped (no Dropped event is generated in this case) if a handler queue is full.

func (Dropped) Name

func (Dropped) Name() EventName

type Event

type Event interface {
	Name() EventName
}

Event is the interface implemented by all events that are published on the bus.

type EventName

type EventName string

EventName represents the name of an event. It implements EventNamePattern so it can be used as a pattern in a call to Bus.Subscribe.

func (EventName) Match

func (p EventName) Match(n EventName) bool

type EventNamePattern

type EventNamePattern interface {
	Match(n EventName) bool
}

EventNamePattern is the interface implemented by all event patterns.

func RegexPattern

func RegexPattern(re *regexp.Regexp) EventNamePattern

RegexPattern returns a pattern to match against event names. It matches all events that match the given regex.

func WildcardPattern

func WildcardPattern(pattern string) EventNamePattern

WildcardPattern returns a pattern to match against event names. Only '*' has a special meaning in a pattern, it matches any string, including the empty string.

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

Handler represents a subscription to some events.

func (*Handler) Name

func (h *Handler) Name() string

Name returns the handler name.

func (*Handler) Pattern

func (h *Handler) Pattern() EventNamePattern

Pattern returns the handler pattern.

func (*Handler) PendingEvents

func (h *Handler) PendingEvents() int

PendingEvents returns the number of events in the handler queue.

func (*Handler) QueueSize

func (h *Handler) QueueSize() int

QueueSize returns the handler queue size.

type HandlerFunc

type HandlerFunc func(e Event, t time.Time)

HandlerFunc is the type of the function called by the bus to process events.

The e argument is the event to process. The t argument is the time when the event has been generated.

type SubscribeOption

type SubscribeOption func(*subscribeOptions)

SubscribeOption configures a Handler as returned by Bus.Subscribe.

func WithCallOnce

func WithCallOnce() SubscribeOption

WithCallOnce ensures that the handler will be called only once

func WithName

func WithName(name string) SubscribeOption

WithName sets the name of the handler.

func WithNoDrain

func WithNoDrain() SubscribeOption

WithNoDrain prevents Bus.Close and Bus.Unsubscribe to drain the handler event queue.

func WithQueueSize

func WithQueueSize(size int) SubscribeOption

WithQueueSize sets the queue size of the handler.

func WithUnsubscribedHandler

func WithUnsubscribedHandler(f func()) SubscribeOption

WithUnsubscribedHandler sets the handler that will be called after the handler has been unsubscribed and drained.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL