bus

package module
Version: v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2020 License: Apache-2.0 Imports: 5 Imported by: 2

README

🔊 Bus

GoDoc Build Status Coverage Status Go Report Card GitHub license

Bus is a minimalist event/message bus implementation for internal communication. It is heavily inspired from my event_bus package for Elixir language.

API

The method names and arities/args are stable now. No change should be expected on the package for the version 1.x.x except any bug fixes.

Installation

Via go packages: go get github.com/mustafaturan/bus

Usage

Configure

The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.

The bus package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a bus instance.

Hint: Check the demo project for the singleton configuration.

Here is a sample initilization using monoton id generator:

import (
    "github.com/mustafaturan/bus"
    "github.com/mustafaturan/monoton"
    "github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
    // configure id generator (it doesn't have to be monoton)
    node        := uint64(1)
    initialTime := uint64(0)
    monoton.Configure(sequencer.NewMillisecond(), node, initialTime)

    // init an id generator
    var idGenerator bus.Next = monoton.Next

    // create a new bus instance
    b, err := bus.NewBus(idGenerator)
    if err != nil {
        panic(err)
    }

    // maybe register topics in here
    b.RegisterTopics("order.received", "order.fulfilled")

    return b
}
Register Event Topics

To emit events to the topics, topic names need to be registered first:

// register topics
b.RegisterTopics("order.received", "order.fulfilled")
Register Event Handlers

To receive topic events you need to register handlers; A handler basically requires two vals which are a Handle function and topic Matcher regex pattern.

handler := bus.Handler{
    Handle: func(e *bus.Event) {
        // do something
        // NOTE: Highly recommended to process the event in an async way
    },
    Matcher: ".*", // matches all topics
}
b.RegisterHandler("a unique key for the handler", &handler)
Emit Events
// if txID val is blank, bus package generates one using the id generator
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "some-transaction-id-if-exists")

// event topic name (must be registered before)
topic := "order.received"

// interface{} data for event
order := make(map[string]string)
order["orderID"]     = "123456"
order["orderAmount"] = "112.20"
order["currency"]    = "USD"

// emit the event
event, err := b.Emit(ctx, topic, order)

if err != nil {
    // report the err
    fmt.Println(err)
}

// if the caller needs the event, a ref for the event is returning as result of
// the `Emit` call.
fmt.Println(event)
Processing Events

When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of bus.Event struct:

// Event data structure
type Event struct {
	ID         string      // identifier
	TxID       string      // transaction identifier
	Topic      string      // topic name
	Data       interface{} // actual event data
	OccurredAt int64       // creation time in nanoseconds
}
Sample Project

A demo project with three consumers which increments a counter for each event topic, printer consumer which prints all events and lastly calculator consumer which sums amounts.

Benchmarks
BenchmarkEmit-4   	 5983903	       200 ns/op	     104 B/op	       2 allocs/op

Contributing

All contributors should follow Contributing Guidelines before creating pull requests.

Credits

Mustafa Turan

License

Apache License 2.0

Copyright (c) 2020 Mustafa Turan

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Documentation

Overview

Package bus is a minimalist event/message bus implementation for internal communication

The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.

The `bus` package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a `bus` instance.

Here is a sample initilization using `monoton` id generator:

Example code for configuration:

import (
	"github.com/mustafaturan/bus"
	"github.com/mustafaturan/monoton"
	"github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
	// configure id generator (it doesn't have to be monoton)
	node        := uint64(1)
	initialTime := uint64(0)
	monoton.Configure(sequencer.NewMillisecond(), node, initialTime)

	// init an id generator
	var idGenerator bus.Next = monoton.Next

	// create a new bus instance
	b, err := bus.NewBus(idGenerator)
	if err != nil {
		panic(err)
	}

	// maybe register topics in here
	b.RegisterTopics("order.received", "order.fulfilled")

	return b
}

Register Topics

To emit events to the topics, topic names should be registered first:

Example code:

// register topics
b.RegisterTopics("order.received", "order.fulfilled")
// ...

Register Handlers

To receive topic events you need to register handlers; A handler basically requires two vals which are a `Handle` function and topic `Matcher` regex pattern.

Example code:

handler := bus.Handler{
	Handle: func(e *Event) {
		// do something
		// NOTE: Highly recommended to process the event in an async way
	},
	Matcher: ".*", // matches all topics
}
b.RegisterHandler("a unique key for the handler", &handler)

Emit Event

Example code:

// if txID val is blank, bus package generates one using the id generator
ctx := context.Background()
ctx = context.WithValue(ctx, bus.CtxKeyTxID, "a-transaction-id")

// event topic name (must be registered before)
topic := "order.received"

// interface{} data for event
order := make(map[string]string)
order["orderID"]     = "123456"
order["orderAmount"] = "112.20"
order["currency"]    = "USD"

// emit the event
event, err := b.Emit(ctx, topic, order)

if err != nil {
	// report the err
	fmt.Println(err)
}

// in case of need to do anything with event on caller, a ref is also
// returning on `Emit` call.
fmt.Println(event)

Processing Events

When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of `bus.Event` struct.

Index

Constants

View Source
const (
	// CtxKeyTxID tx id context key
	CtxKeyTxID = ctxKey("bus.txID")

	// Version syncs with package version
	Version = "1.0.2"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus added in v1.0.0

type Bus struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Bus is a message bus

func NewBus added in v1.0.0

func NewBus(g IDGenerator) (*Bus, error)

NewBus inits a new bus

func (*Bus) DeregisterHandler added in v1.0.0

func (b *Bus) DeregisterHandler(key string)

DeregisterHandler deletes handler from the registry

func (*Bus) DeregisterTopics added in v1.0.0

func (b *Bus) DeregisterTopics(topicNames ...string)

DeregisterTopics deletes topic

func (*Bus) Emit added in v1.0.0

func (b *Bus) Emit(ctx context.Context, topicName string, data interface{}) (*Event, error)

Emit inits a new event and delivers to the interested in handlers

func (*Bus) HandlerKeys added in v1.0.0

func (b *Bus) HandlerKeys() []string

HandlerKeys returns list of registered handler keys

func (*Bus) HandlerTopicSubscriptions added in v1.0.0

func (b *Bus) HandlerTopicSubscriptions(handlerKey string) []string

HandlerTopicSubscriptions returns all topic subscriptions of the handler

func (*Bus) RegisterHandler added in v1.0.0

func (b *Bus) RegisterHandler(key string, h *Handler)

RegisterHandler re/register the handler to the registry

func (*Bus) RegisterTopics added in v1.0.0

func (b *Bus) RegisterTopics(topicNames ...string)

RegisterTopics registers topics and fullfills handlers

func (*Bus) TopicHandlers added in v1.0.0

func (b *Bus) TopicHandlers(topicName string) []*Handler

TopicHandlers returns all handlers for the topic

func (*Bus) Topics added in v1.0.0

func (b *Bus) Topics() []string

Topics lists the all registered topics

type Event

type Event struct {
	ID         string      // identifier
	TxID       string      // transaction identifier
	Topic      string      // topic name
	Data       interface{} // actual event data
	OccurredAt int64       // creation time in nanoseconds
	// contains filtered or unexported fields
}

Event is data structure for any logs

func (*Event) Context added in v1.0.0

func (e *Event) Context() context.Context

Context returns event's context

type Handler

type Handler struct {
	Handle  func(e *Event) // handler func to process events
	Matcher string         // topic matcher as regex pattern
}

Handler is a receiver for event reference with the given regex pattern

type IDGenerator added in v1.0.0

type IDGenerator interface {
	Generate() string
}

IDGenerator is a sequential unique id generator interface

type Next added in v1.0.0

type Next func() string

Next is a sequential unique id generator func type

func (Next) Generate added in v1.0.0

func (n Next) Generate() string

Generate is an implementation of IDGenerator for bus.Next fn type

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL