deterbus

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2023 License: MIT Imports: 4 Imported by: 0

README

deterbus

Go Report Card Travis CI

deterbus is a deterministic event bus for Go. Things that make this different from other event bus implementations:

  • There is a single event queue for all topics. Events are processed one-at-a-time.
  • Compile-time checks ensure your events send the types of arguments your handlers expect.
  • All subscribers receive all events sent to the topic they listen to.
  • Subscription and Unsubscription are events under the hood!
  • An event doesn't finish processing until after all subscribed handlers have returned.
  • 100% test coverage.

If you add a subscriber while the queue is not empty, you won't get events that should have happened before the subscriber was added. Similarly, when you unsubscribe you won't miss events that were already coming your way. This also prevents a lot of headache when you add or remove handlers from within a handler callback.

Example

package deterbus_test

import (
	"context"
	"testing"

	"github.com/erinpentecost/deterbus"
	"github.com/stretchr/testify/require"
)

func TestExample(t *testing.T) {
	// Create a new Bus. We'll want to cancel the context we pass in eventually so we
	// don't leak a goroutine.
	ctx, cancel := context.WithCancel(context.Background())
	b := deterbus.NewBus(ctx)
	defer cancel()

	// Create a new topic for tracking changes to our bank account
	// so we can maintain a balance. We specify the type of the argument
	// we are going to send along with the event with the help of generics.
	accountChange := deterbus.NewTopic[int](b)

	// Create our handler.
	// Callbacks are invoked one-at-a-time, so we don't need a mutex around to protect
	// the `balance` variable here.
	balance := 0
	handlerFn := func(_ context.Context, delta int) {
		balance += delta
	}

	// Register a handler on the topic.
	// Subscription is an asynchronous call. It actually just does a Publish with an
	// internal "subscribe" event. We can Wait() for that to finish or close the
	// callback with the returned CallbackManager if we ever need to.
	callbackManager := accountChange.Subscribe(handlerFn)

	// Send a new event onto the bus. We're going to wait until all callbacks have finished.
	accountChange.Publish(context.TODO(), 99).Wait()

	require.Equal(t, 99, balance)

	// Now we'll schedule the callback to be removed.
	callbackManager.Unsubscribe()

	// Let's see what happens when we try to withdraw a million dollars now.
	accountChange.Publish(context.TODO(), -1000000).Wait()

	// There's no handler, so the event was dropped and not processed by anything.
	require.Equal(t, 99, balance)
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus just controls when topic handlers are invoked.

func NewBus added in v1.2.0

func NewBus(ctx context.Context) *Bus

type Callback added in v1.2.0

type Callback[T any] func(context.Context, T)

Callbacks are hooked up to Topics via Subscription. They are called for each Event that is Published to the Topic.

type CallbackManager added in v1.2.0

type CallbackManager interface {
	// Wait will block until the publish is queued.
	Wait()
	// Unsubscribe will queue an event to unsubscribe the callback.
	// The Wait function returned by Unsubscribe will block until that
	// removal event is queued.
	//
	// Note that the callback may still be invoked even after
	// CallbackManager.Unsubscribe()() is called if there is a backlog for
	// this topic.
	Unsubscribe() (Wait func())
}

CallbackManager contains utility functions for managing a callback.

type Topic

type Topic[T any] struct {
	// contains filtered or unexported fields
}

Topic is what users interact with. Each one watches the station to know when it must run.

func NewTopic added in v1.1.0

func NewTopic[T any](bus *Bus) *Topic[T]

NewTopic creates a new topic on an event bus.

func (*Topic[T]) Publish

func (t *Topic[T]) Publish(ctx context.Context, arg T) Waiter

Publish submits an event to queue. When processed, it will be sent to every Callback that is Subscribed to this Topic.

func (*Topic[T]) Subscribe

func (t *Topic[T]) Subscribe(callback Callback[T]) CallbackManager

Subscribe will invoke callback for each event Published to this topic.

type Waiter added in v1.2.0

type Waiter interface {
	Wait()
}

Waiter can be used to signal when all Subscribers have finished processing an event.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL