ebus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: MIT Imports: 4 Imported by: 0

README

ebus

A generic, thread-safe, high-performance event bus for Go. Zero allocations during pub/sub.

Please note:

  • Publishers will be blocked until all subscribers of the event are done. Use background workers for any slow work.
  • Any variable that is published in an event is NOT safe for a subscriber to keep after return.

Install

go get github.com/webmafia/ebus

Usage

Only events

When just an anonymous event is needed.

const (
    _  ebus.Event = iota

    MyEvent
)

bus := ebus.NewEventBus()

// Subscribe for MyEvent
sub := bus.Sub(MyEvent, func() {
    fmt.Println("recieved event")
})

// Publish MyEvent
bus.Pub(MyEvent)

// Unsubscribe for MyEvent
bus.Unsub(MyEvent, sub)
Events + variable types

This is a powerful way to combine events and variable types

const (
    _  ebus.Event = iota

    Created // A generic event that "something was created"
)

// These types usually belongs to the domain
type (
    User struct{
        Name string
    }

    Order struct{
        Id int
    }
)

bus := ebus.NewEventBus()

// Subscribe for created users
ebus.Sub(bus, Created, func(u *User) {
    fmt.Println("user", u.Name, "was created")

    // Note that it's NOT safe to keep `u` after return
})

// Subscribe for created orders
ebus.Sub(bus, Created, func(o *Order) {
    fmt.Println("order", o.Id, "was created")

    // Note that it's NOT safe to keep `o` after return
})

// Publish that a user was created
ebus.Pub(bus, Created, &User{
    Name: "John Doe"
})

// Publish that an order was created
ebus.Pub(bus, Created, &Order{
    Id: 123456
})
Background workers

As publishers are blocked until all subscribers of the event are done, a subscriber should do its work rather fast. Any slow work should be put in the background - this is an example for how.

const (
    _  ebus.Event = iota

    Created // A generic event that "something was created"
)

type Order struct{
    Id int
}

bus := ebus.NewEventBus()

// Create a buffered channel
ch := make(chan Order, 8)

go func(ch <-chan Order) {
    for {
        o, ok := <-ch

        if !ok {
            return
        }

        // Do any slow work, e.g. synchronize the order to a third-party API
        doAnySlowWork(o)
    }
}(ch)

// Subscribe for created orders and forward them to the channel
ebus.SubToChan(bus, Created, ch)

// Publish that an order was created
ebus.Pub(bus, Created, &Order{
    Id: 123456
})

// Close channel when done
close(ch)

Benchmark

This gives you an idea of the performance, but your mileage may vary. Do your own benchmark.

goos: darwin
goarch: arm64
pkg: github.com/webmafia/ebus
cpu: Apple M1 Pro
BenchmarkEventBus/01_subscribers-10                   65841814     17.590 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/02_subscribers-10                   60300748     20.120 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/04_subscribers-10                   47710713     25.010 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/08_subscribers-10                   34022806     35.360 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/16_subscribers-10                   16668036     72.400 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/32_subscribers-10                   10365789    115.000 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus/64_subscribers-10                    5991422    200.300 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/01_subscribers-10        279230622      4.252 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/02_subscribers-10        259206556      4.594 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/04_subscribers-10        191008094      5.891 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/08_subscribers-10        148670781      8.141 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/16_subscribers-10         91247522     14.100 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/32_subscribers-10         62021378     19.390 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Parallell/64_subscribers-10         41622828     28.570 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/01_subscribers-10               69984446     16.860 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/02_subscribers-10               60236678     19.920 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/04_subscribers-10               45867614     26.180 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/08_subscribers-10               31973780     37.550 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/16_subscribers-10               16673738     71.790 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/32_subscribers-10               10464192    114.600 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var/64_subscribers-10                5991487    199.800 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/01_subscribers-10    262930784      4.562 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/02_subscribers-10    262294675      4.904 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/04_subscribers-10    192289729      6.090 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/08_subscribers-10    144890751      8.254 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/16_subscribers-10     90990375     13.760 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/32_subscribers-10     62902694     18.760 ns/op    0 B/op    0 allocs/op
BenchmarkEventBus_Var_Parallell/64_subscribers-10     41338268     28.810 ns/op    0 B/op    0 allocs/op
PASS
ok      github.com/webmafia/ebus       39.693s

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Pub

func Pub[T any](eb *EventBus, event Event, val *T)

Publish an event with a variable. Function will block until all subscribers are done. Subscribers must subscribe for the specific variable type.

func Sub

func Sub[T any](eb *EventBus, event Event, fn func(*T)) func(*T)

Subscribe for an event with a variable. Any publisher will be blocked until all subsribers are done, so please keep your subscriber fast and run anything slow in e.g. a background worker. Subscribers should NOT keep the variable after return. Publishers must publish the specific variable type.

func SubToChan

func SubToChan[T any](eb *EventBus, event Event, ch chan<- T) func(*T)

Convenient method for subscribing for an event and pushling to a channel, so that slow subscribers can do their work in a goroutine. If channel is full, any publisher will block. Publishers must publish the specific variable type.

func Unsub

func Unsub[T any](eb *EventBus, event Event, fn func(*T)) (unsubscribed bool)

Unsubscribe an event. Returns whether there was a subscription or not.

Types

type Event

type Event uint32

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

A thread-safe event bus. The zero EventBus is empty and ready for use. An EventBus must not be copied after first use.

bus := NewEventBus()

// Either send only event
bus.Pub(123)

// Or also send variable
ebus.Pub(bus, 123, &myVar)
Example
const MyEvent Event = 1

eb := NewEventBus()

subA := func(v *int) {
	fmt.Println("Subscriber A:", *v)
}

subB := func(v *int) {
	fmt.Println("Subscriber B:", *v)
}

subC := func(v *int) {
	fmt.Println("Subscriber C:", *v)
}

Sub(eb, MyEvent, subA)
Sub(eb, MyEvent, subB)
Sub(eb, MyEvent, subC)

fmt.Println("Subscribers:", eb.Subscribers())

i := 123

Pub(eb, MyEvent, &i)

fmt.Println("--------")
Unsub(eb, MyEvent, subB)
fmt.Println("Subscribers:", eb.Subscribers())
Pub(eb, MyEvent, &i)
Output:


Subscribers: 3
Subscriber C: 123
Subscriber B: 123
Subscriber A: 123
--------
Subscribers: 2
Subscriber C: 123
Subscriber A: 123

func NewEventBus

func NewEventBus() *EventBus

Convenient function to create a new event bus.

func (*EventBus) Pub

func (eb *EventBus) Pub(event Event)

Publish an event. Function will block until all subscribers are done. Also consider the function:

ebus.Pub(bus, 123, &myVar)

func (*EventBus) Sub

func (eb *EventBus) Sub(event Event, fn func()) func()

Subscribe for an event. Any publisher will be blocked until all subsribers are done, so please keep your subscriber fast and run anything slow in e.g. a background worker. Also consider the function:

ebus.Sub(bus, 123, func(myVar *myType) { ... })

func (*EventBus) Subscribers

func (eb *EventBus) Subscribers() (n int64)

Returns the number of current subscribers

func (*EventBus) Unsub

func (eb *EventBus) Unsub(event Event, fn func()) (unsubscribed bool)

Unsubscribe an event. Returns whether there was a subscription or not. Also consider the function:

ebus.Unsub(bus, 123, mySubscriber)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL