pubsub

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 21, 2020 License: BSD-3-Clause Imports: 4 Imported by: 1

README

GoDoc

Pubsub - A fast concurrent in memory publish subscribe event bus for golang

Pubsub is an in memory event bus, its goals are to be simple yet powerful intended to be used in highly concurrent environments. While there are already quite a few different implementations in go (see below), all proved to be not a good fit in one way or another.

The implementations tries to be scalable by building on a lock free hashtable and a custom scalable list of subscriptions. Currently golang's sync.Map is used, since there is a bug in the lockfree hashtable.

To avoid dead looks and interference across unrelated topics no locks will be held while calling client code, this also allows callbacks to unsubscribe them selves.~~

Examples


bus := pubsub.NewBus()

// subscribe with callback
sub := bus.Subscribe("topic", func(ev MyEvent) {
   ...
})

// publish
bus.Publish("topic", MyEvent{})

// unsubscribe
bus.Unsubscribe(sub)

// subscribe for at most one callback
sub := bus.SubscribeOnce("topic", func(ev MyEvent) {
   ...
})

// unsubscribe from handler if condition is met
sub := bus.SubscribeOnce("topic", func(ev MyEvent, sub *pubsub.Subscription) {
   if (dontcareanymore) {
      bus.Unsubscribe(sub)
   }
   ...
})

// subscribe by sending events to a channel
ch := make(chan MyEvent, 10)
sub := bus.SubscribeChan("topic", ch, pubsub.CloseOnUnsubscribe)
...
// Unsubscribe will also close the channel in a concurrency safe manner
bus.Unsubscribe(sub)


// Block until an event is received or context is cancelled
ctx context.Context = ...
ev, ok := bus.SubscribeOnceWait("topic", ctx.Done())
if ok { // event was received

}

Similar projects

Here is a selection of some existing similar projects

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

The event Bus. Should never be copied!

func NewBus

func NewBus() *Bus

Create a new event bus with a default table size of 512.

func NewSizedBus

func NewSizedBus(size int) *Bus

Create a new event bus with a specific hash table size

func (*Bus) Publish

func (b *Bus) Publish(t Topic, event Event)

Publish an event under a specific topic.

func (*Bus) Subscribe

func (b *Bus) Subscribe(t Topic, cb Callback) *Subscription

Subscribe to a topic with a callback, the callback may be invoked multiple times. The callback will executed in the context of the publishing goroutine, and thus should not block or panic. Callback invocations can overlap if there are overlapping publications to the topic. Use one of the Async variants if any of this is undesirable.

func (*Bus) SubscribeAsync

func (b *Bus) SubscribeAsync(t Topic, cb Callback) *Subscription

Subscribe to a topic with a callback. The callback may be invoked multiple times, each invocation in its own goroutine. Callback invocations can overlap independent of overlapping publishers.

func (*Bus) SubscribeAsyncOnce

func (b *Bus) SubscribeAsyncOnce(t Topic, cb Callback) *Subscription

Subscribe to a topic with a callback. The callback will be invoked in its own goroutine and at most once.

func (*Bus) SubscribeChan

func (b *Bus) SubscribeChan(t Topic, ch WritableChan, close CloseFlag) *Subscription

Subscribe to a topic with a channel. On publish the event will be written to the channel, if the channel is full the event will be dropped.

func (*Bus) SubscribeChanOnce

func (b *Bus) SubscribeChanOnce(t Topic, ch WritableChan, close CloseFlag) *Subscription

Subscribe to a topic with a channel. On publish the event will be written to the channel, if the channel is full the event will be dropped. At most one event will be written to the channel.

close: If close is true the channel will be closed when unsubscribing.

func (*Bus) SubscribeOnce

func (b *Bus) SubscribeOnce(t Topic, cb Callback) *Subscription

Subscribe to a topic with a callback, the handler may be called at most once. See Subscribe(...) for more details.

func (*Bus) SubscribeOnceWait

func (b *Bus) SubscribeOnceWait(t Topic, abort <-chan struct{}) (event Event, ok bool)

Subscribe to a topic with a channel and wait until either an event is published or the abort channel is written to / closed. abort can be a nil channel in which case this method blocks indefinitely until an event is published. Returns the event and a flag indicating whether an event was received.

func (*Bus) Unsubscribe

func (b *Bus) Unsubscribe(sub *Subscription) bool

Unsubscribe from the bus Returns true if the subscription was active and is now unsubscribed Returns false if the subscription was already unsubscribed

type Callback

type Callback = interface{}

A callback func to consume an event For an event of type T the following signatures are allowed:

1. func() Ignore the event if you not interested in the event payload itself

2. func(event T) Process the event and its payload

3. func(event T, subscription *Subscription) Additionally get a pointer to the subscription of this handler, useful to unsubscribe itself if a certain condition is met

type CloseFlag

type CloseFlag bool

whether a channel subscription should close the target channel when unsubscribing

const CloseOnUnsubscribe CloseFlag = true

Close the channel when unsubscribing

const KeepOpen CloseFlag = false

Don't close the channel when unsubscribing

type Event

type Event = interface{}

The event to be published

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

A handle to a bus subscription

func (*Subscription) CallbackArity

func (s *Subscription) CallbackArity() int

Get the number of arguments of the callback attached to this subscription. If the subscription does not have a callback return -1.

func (*Subscription) Subscribed

func (s *Subscription) Subscribed() bool

Test if this subscription is currently subscribed

type Topic

type Topic = string

A simple string used to describe a topic

type WritableChan

type WritableChan = interface{}

A writable chan: chan T or chan<- T

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL