Documentation ¶
Overview ¶
Package bus is a minimalist event/message bus implementation for internal communication
The package requires a unique id generator to assign ids to events. You can write your own function to generate unique ids or use a package that provides unique id generation functionality.
The `bus` package respect to software design choice of the packages/projects. It supports both singleton and dependency injection to init a `bus` instance.
Here is a sample initilization using `monoton` id generator:
Example code for configuration:
import ( "github.com/mustafaturan/bus/v2" "github.com/mustafaturan/monoton/v2" "github.com/mustafaturan/monoton/v2/sequencer" ) func NewBus() *bus.Bus { // configure id generator (it doesn't have to be monoton) node := uint64(1) initialTime := uint64(1577865600000) // set 2020-01-01 PST as start m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime) if err != nil { panic(err) } // init an id generator var idGenerator bus.Next = m.Next // create a new bus instance b, err := bus.NewBus(idGenerator) if err != nil { panic(err) } // maybe register topics in here b.RegisterTopics("order.received", "order.fulfilled") return b }
Register Topics ¶
To emit events to the topics, topic names should be registered first:
Example code:
// register topics b.RegisterTopics("order.received", "order.fulfilled") // ...
Register Handlers ¶
To receive topic events you need to register handlers; A handler basically requires two vals which are a `Handle` function and topic `Matcher` regex pattern.
Example code:
handler := bus.Handler{ Handle: func(ctx context.Context, e Event) { // do something // NOTE: Highly recommended to process the event in an async way }, Matcher: ".*", // matches all topics } b.RegisterHandler("a unique key for the handler", handler)
Emit Event ¶
Example code:
// if txID val is blank, bus package generates one using the id generator ctx := context.Background() ctx = context.WithValue(ctx, bus.CtxKeyTxID, "a-transaction-id") // event topic name (must be registered before) topic := "order.received" // interface{} data for event order := make(map[string]string) order["orderID"] = "123456" order["orderAmount"] = "112.20" order["currency"] = "USD" // emit the event err := b.Emit(ctx, topic, order) if err != nil { // report the err fmt.Println(err) } // emit an event with opts err := b.EmitWithOpts(ctx, topic, order, bus.WithTxID("tx-id-val")) if err != nil { // report the err fmt.Println(err) }
Processing Events ¶
When an event is emitted, the topic handlers receive the event synchronously. It is highly recommended to process events asynchronous. Package leave the decision to the packages/projects to use concurrency abstractions depending on use-cases. Each handlers receive the same event as ref of `bus.Event` struct.
Index ¶
- Constants
- type Bus
- func (b *Bus) DeregisterHandler(key string)
- func (b *Bus) DeregisterTopics(topics ...string)
- func (b *Bus) Emit(ctx context.Context, topic string, data interface{}) error
- func (b *Bus) EmitWithOpts(ctx context.Context, topic string, data interface{}, opts ...EventOption) error
- func (b *Bus) HandlerKeys() []string
- func (b *Bus) HandlerTopicSubscriptions(handlerKey string) []string
- func (b *Bus) RegisterHandler(key string, h Handler)
- func (b *Bus) RegisterTopics(topics ...string)
- func (b *Bus) TopicHandlerKeys(topic string) []string
- func (b *Bus) Topics() []string
- type Event
- type EventOption
- type Handler
- type IDGenerator
- type Next
Constants ¶
const ( // CtxKeyTxID tx id context key CtxKeyTxID = ctxKey(116) // CtxKeySource source context key CtxKeySource = ctxKey(117) // Version syncs with package version Version = "3.0.3" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is a message bus
func (*Bus) DeregisterHandler ¶
DeregisterHandler deletes handler from the registry
func (*Bus) DeregisterTopics ¶
DeregisterTopics deletes topic
func (*Bus) Emit ¶
Emit inits a new event and delivers to the interested in handlers with sync safety
func (*Bus) EmitWithOpts ¶
func (b *Bus) EmitWithOpts(ctx context.Context, topic string, data interface{}, opts ...EventOption) error
EmitWithOpts inits a new event and delivers to the interested in handlers with sync safety and options
func (*Bus) HandlerKeys ¶
HandlerKeys returns list of registered handler keys
func (*Bus) HandlerTopicSubscriptions ¶
HandlerTopicSubscriptions returns all topic subscriptions of the handler
func (*Bus) RegisterHandler ¶
RegisterHandler re/register the handler to the registry
func (*Bus) RegisterTopics ¶
RegisterTopics registers topics and fullfills handlers
func (*Bus) TopicHandlerKeys ¶
TopicHandlerKeys returns all handlers for the topic
type Event ¶
type Event struct { ID string // identifier TxID string // transaction identifier Topic string // topic name Source string // source of the event OccurredAt time.Time // creation time in nanoseconds Data interface{} // actual event data }
Event is data structure for any logs
type EventOption ¶
EventOption is a function type to mutate event fields
func WithOccurredAt ¶
func WithOccurredAt(time time.Time) EventOption
WithOccurredAt returns an option to set event's occurredAt field
func WithSource ¶
func WithSource(source string) EventOption
WithSource returns an option to set event's source field
func WithTxID ¶
func WithTxID(txID string) EventOption
WithTxID returns an option to set event's txID field
type Handler ¶
type Handler struct { // handler func to process events Handle func(ctx context.Context, e Event) // topic matcher as regex pattern Matcher string // contains filtered or unexported fields }
Handler is a receiver for event reference with the given regex pattern
type IDGenerator ¶
type IDGenerator interface {
Generate() string
}
IDGenerator is a sequential unique id generator interface