README

EventBus

GoDoc Coverage Status Build Status

Package EventBus is the little and lightweight eventbus with async compatibility for GoLang.

Installation

Make sure that Go is installed on your computer. Type the following command in your terminal:

go get github.com/asaskevich/EventBus

After it the package is ready to use.

Import package in your project

Add following line in your *.go file:

import "github.com/asaskevich/EventBus"

If you unhappy to use long EventBus, you can do something like this:

import (
	evbus "github.com/asaskevich/EventBus"
)
Example
func calculator(a int, b int) {
	fmt.Printf("%d\n", a + b)
}

func main() {
	bus := EventBus.New();
	bus.Subscribe("main:calculator", calculator);
	bus.Publish("main:calculator", 20, 40);
	bus.Unsubscribe("main:calculator", calculator);
}
Implemented methods
  • New()
  • Subscribe()
  • SubscribeOnce()
  • HasCallback()
  • Unsubscribe()
  • Publish()
  • SubscribeAsync()
  • SubscribeOnceAsync()
  • WaitAsync()
New()

New returns new EventBus with empty handlers.

bus := EventBus.New();
Subscribe(topic string, fn interface{}) error

Subscribe to a topic. Returns error if fn is not a function.

func Handler() { ... }
...
bus.Subscribe("topic:handler", Handler)
SubscribeOnce(topic string, fn interface{}) error

Subscribe to a topic once. Handler will be removed after executing. Returns error if fn is not a function.

func HelloWorld() { ... }
...
bus.SubscribeOnce("topic:handler", HelloWorld)
Unsubscribe(topic string, fn interface{}) error

Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

bus.Unsubscribe("topic:handler", HelloWord);
HasCallback(topic string) bool

Returns true if exists any callback subscribed to the topic.

Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func Handler(str string) { ... }
...
bus.Subscribe("topic:handler", Handler)
...
bus.Publish("topic:handler", "Hello, World!");
SubscribeAsync(topic string, fn interface{}, transactional bool)

Subscribe to a topic with an asynchronous callback. Returns error if fn is not a function.

func slowCalculator(a, b int) {
	time.Sleep(3 * time.Second)
	fmt.Printf("%d\n", a + b)
}

bus := EventBus.New()
bus.SubscribeAsync("main:slow_calculator", slowCalculator, false)

bus.Publish("main:slow_calculator", 20, 60)

fmt.Println("start: do some stuff while waiting for a result")
fmt.Println("end: do some stuff while waiting for a result")

bus.WaitAsync() // wait for all async callbacks to complete

fmt.Println("do some stuff after waiting for result")

Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false)

SubscribeOnceAsync(topic string, args ...interface{})

SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously

WaitAsync()

WaitAsync waits for all async callbacks to complete.

Cross Process Events

Works with two rpc services:

  • a client service to listen to remotely published events from a server
  • a server service to listen to client subscriptions

server.go

func main() {
    server := NewServer(":2010", "/_server_bus_", New())
    server.Start()
    // ...
    server.EventBus().Publish("main:calculator", 4, 6)
    // ...
    server.Stop()
}

client.go

func main() {
    client := NewClient(":2015", "/_client_bus_", New())
    client.Start()
    client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_")
    // ...
    client.Stop()
}
Notes

Documentation is available here: godoc.org. Full information about code coverage is also available here: EventBus on gocover.io.

Support

If you do have a contribution for the package feel free to put up a Pull Request or open Issue.

Special thanks to contributors

Documentation

Index

Constants

View Source
const (
	// PublishService - Client service method
	PublishService = "ClientService.PushEvent"
)
View Source
const (
	// RegisterService - Server subscribe service method
	RegisterService = "ServerService.Register"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus interface {
	BusController
	BusSubscriber
	BusPublisher
}

    Bus englobes global (subscribe, publish, control) bus behavior

    func New

    func New() Bus

      New returns new EventBus with empty handlers.

      type BusController

      type BusController interface {
      	HasCallback(topic string) bool
      	WaitAsync()
      }

        BusController defines bus control behavior (checking handler's presence, synchronization)

        type BusPublisher

        type BusPublisher interface {
        	Publish(topic string, args ...interface{})
        }

          BusPublisher defines publishing-related bus behavior

          type BusSubscriber

          type BusSubscriber interface {
          	Subscribe(topic string, fn interface{}) error
          	SubscribeAsync(topic string, fn interface{}, transactional bool) error
          	SubscribeOnce(topic string, fn interface{}) error
          	SubscribeOnceAsync(topic string, fn interface{}) error
          	Unsubscribe(topic string, handler interface{}) error
          }

            BusSubscriber defines subscription-related bus behavior

            type Client

            type Client struct {
            	// contains filtered or unexported fields
            }

              Client - object capable of subscribing to a remote event bus

              func NewClient

              func NewClient(address, path string, eventBus Bus) *Client

                NewClient - create a client object with the address and server path

                func (*Client) EventBus

                func (client *Client) EventBus() Bus

                  EventBus - returns the underlying event bus

                  func (*Client) Start

                  func (client *Client) Start() error

                    Start - starts the client service to listen to remote events

                    func (*Client) Stop

                    func (client *Client) Stop()

                      Stop - signal for the service to stop serving

                      func (*Client) Subscribe

                      func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string)

                        Subscribe subscribes to a topic in a remote event bus

                        func (*Client) SubscribeOnce

                        func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string)

                          SubscribeOnce subscribes once to a topic in a remote event bus

                          type ClientArg

                          type ClientArg struct {
                          	Args  []interface{}
                          	Topic string
                          }

                            ClientArg - object containing event for client to publish locally

                            type ClientService

                            type ClientService struct {
                            	// contains filtered or unexported fields
                            }

                              ClientService - service object listening to events published in a remote event bus

                              func (*ClientService) PushEvent

                              func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error

                                PushEvent - exported service to listening to remote events

                                type EventBus

                                type EventBus struct {
                                	// contains filtered or unexported fields
                                }

                                  EventBus - box for handlers and callbacks.

                                  func (*EventBus) HasCallback

                                  func (bus *EventBus) HasCallback(topic string) bool

                                    HasCallback returns true if exists any callback subscribed to the topic.

                                    func (*EventBus) Publish

                                    func (bus *EventBus) Publish(topic string, args ...interface{})

                                      Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

                                      func (*EventBus) Subscribe

                                      func (bus *EventBus) Subscribe(topic string, fn interface{}) error

                                        Subscribe subscribes to a topic. Returns error if `fn` is not a function.

                                        func (*EventBus) SubscribeAsync

                                        func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error

                                          SubscribeAsync subscribes to a topic with an asynchronous callback Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

                                          func (*EventBus) SubscribeOnce

                                          func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error

                                            SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

                                            func (*EventBus) SubscribeOnceAsync

                                            func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error

                                              SubscribeOnceAsync subscribes to a topic once with an asynchronous callback Handler will be removed after executing. Returns error if `fn` is not a function.

                                              func (*EventBus) Unsubscribe

                                              func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error

                                                Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

                                                func (*EventBus) WaitAsync

                                                func (bus *EventBus) WaitAsync()

                                                  WaitAsync waits for all async callbacks to complete

                                                  type NetworkBus

                                                  type NetworkBus struct {
                                                  	*Client
                                                  	*Server
                                                  	// contains filtered or unexported fields
                                                  }

                                                    NetworkBus - object capable of subscribing to remote event buses in addition to remote event busses subscribing to it's local event bus. Compoed of a server and client

                                                    func NewNetworkBus

                                                    func NewNetworkBus(address, path string) *NetworkBus

                                                      NewNetworkBus - returns a new network bus object at the server address and path

                                                      func (*NetworkBus) EventBus

                                                      func (networkBus *NetworkBus) EventBus() Bus

                                                        EventBus - returns wrapped event bus

                                                        func (*NetworkBus) Start

                                                        func (networkBus *NetworkBus) Start() error

                                                          Start - helper method to serve a network bus service

                                                          func (*NetworkBus) Stop

                                                          func (networkBus *NetworkBus) Stop()

                                                            Stop - signal for the service to stop serving

                                                            type NetworkBusService

                                                            type NetworkBusService struct {
                                                            	// contains filtered or unexported fields
                                                            }

                                                              NetworkBusService - object capable of serving the network bus

                                                              type Server

                                                              type Server struct {
                                                              	// contains filtered or unexported fields
                                                              }

                                                                Server - object capable of being subscribed to by remote handlers

                                                                func NewServer

                                                                func NewServer(address, path string, eventBus Bus) *Server

                                                                  NewServer - create a new Server at the address and path

                                                                  func (*Server) EventBus

                                                                  func (server *Server) EventBus() Bus

                                                                    EventBus - returns wrapped event bus

                                                                    func (*Server) HasClientSubscribed

                                                                    func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool

                                                                      HasClientSubscribed - True if a client subscribed to this server with the same topic

                                                                      func (*Server) Start

                                                                      func (server *Server) Start() error

                                                                        Start - starts a service for remote clients to subscribe to events

                                                                        func (*Server) Stop

                                                                        func (server *Server) Stop()

                                                                          Stop - signal for the service to stop serving

                                                                          type ServerService

                                                                          type ServerService struct {
                                                                          	// contains filtered or unexported fields
                                                                          }

                                                                            ServerService - service object to listen to remote subscriptions

                                                                            func (*ServerService) Register

                                                                            func (service *ServerService) Register(arg *SubscribeArg, success *bool) error

                                                                              Register - Registers a remote handler to this event bus for a remote subscribe - a given client address only needs to subscribe once event will be republished in local event bus

                                                                              type SubscribeArg

                                                                              type SubscribeArg struct {
                                                                              	ClientAddr    string
                                                                              	ClientPath    string
                                                                              	ServiceMethod string
                                                                              	SubscribeType SubscribeType
                                                                              	Topic         string
                                                                              }

                                                                                SubscribeArg - object to hold subscribe arguments from remote event handlers

                                                                                type SubscribeType

                                                                                type SubscribeType int

                                                                                  SubscribeType - how the client intends to subscribe

                                                                                  const (
                                                                                  	// Subscribe - subscribe to all events
                                                                                  	Subscribe SubscribeType = iota
                                                                                  	// SubscribeOnce - subscribe to only one event
                                                                                  	SubscribeOnce
                                                                                  )