eventbus

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2019 License: MIT Imports: 4 Imported by: 0

Documentation

Overview

Package eventbus provides two message models: pub/sub and send/reply. There may be multiple subscribers subscribed to one topic, but there must be only one replier replying a topic.

New a EventBus:

var bus = New()

Get a global default EventBus:

var bus = Default()

Subscriber:

func handler(a int, b int, out *int) {
	(*out) = a + b
}

bus.Subscribe("topic", handler)

or handler will be triggerred async:

bus.SubscribeAsync("async:topic", handler, false)

Publisher:

var out int
bus.Publish("topic", 10, 13, &out)
fmt.Print(out) // 23

var out2 int
bus.Publish("async:topic", 10, 13, &out2)
fmt.Print(out2) // 0, as the subscriber is triggerred async

Replier:

func worker(a int, b int, out chan<- int) {
	out <- a + b
}

bus.Reply("task:add", worker, false)

Sender:

var c = make(chan int)
bus.Send("task:add", 11, 11, c)
fmt.Print(<-c) // 22, replier is triggerred async

Index

Constants

View Source
const (
	// TopicSetDebugLevel is topic for changing debug level
	TopicSetDebugLevel = "rpc:setdebuglevel"
	// TopicUpdateNetworkID is topic for updating network id
	TopicUpdateNetworkID = "rpc:updatenetworkid"
	// TopicGetNetworkID is topic for querying network id
	TopicGetNetworkID = "rpc:getnetworkid"
	// TopicGetAddressBook is topic for listing p2p peer status
	TopicGetAddressBook = "rpc:getaddressbook"
	//TopicP2PPeerAddr is a event topic for new peer addr found or peer addr updated
	TopicP2PPeerAddr = "p2p:peeraddr"
	//TopicP2PAddPeer is a event topic for adding peer addr to peer store
	TopicP2PAddPeer = "p2p:addpeer"
	// TopicConnEvent is a event topic of events for score updated
	TopicConnEvent = "p2p:connevent"

	// TopicChainUpdate is topic for notifying that the chain is updated,
	// either chain reorg, or chain extended.
	TopicChainUpdate = "chain:update"

	// TopicUtxoUpdate is topic for notifying that chain utxo is changed
	TopicUtxoUpdate = "chain:utxoupdate"

	// TopicGetDatabaseKeys is topic for get keys of a specified storage
	TopicGetDatabaseKeys = "rpc:database:keys"
	// TopicGetDatabaseValue is topic for get value of specified key
	TopicGetDatabaseValue = "rpc:database:get"

	// TopicRPCSendNewBlock is topic for sending new block to explorer
	TopicRPCSendNewBlock = "rpc:newblock:send"
	// TopicRPCSendNewLog is topic for sending new log to explorer
	TopicRPCSendNewLog = "rpc:newlog:send"

	// TopicMiners is topic for replying current miners
	TopicMiners = "dpos:miners"
	// TopicCheckMiner is topic for checking ts with miner
	TopicCheckMiner = "dpos:checkminer"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

Bus englobes global (subscribe, publish, control) bus behavior

func Default

func Default() Bus

Default returns the default EventBus.

func New

func New() Bus

New returns new EventBus with empty handlers.

type BusController

type BusController interface {
	HasSubscriber(topic string) bool
	HasReplier(topic string) bool
	WaitAsync()
}

BusController defines bus control behavior (checking handler's presence, synchronization)

type BusEvent

type BusEvent int64

BusEvent means events happened transfering by bus.

const (
	// ConnTimeOutEvent indicates the event if the conn time out.
	ConnTimeOutEvent BusEvent = iota

	// BadBlockEvent indicates the event if process new block throwing err.
	BadBlockEvent

	// BadTxEvent indicates the event if process new tx throwing err.
	BadTxEvent

	// SyncMsgEvent indicates the event when receive sync msg.
	SyncMsgEvent

	// HeartBeatEvent indicates the event when receive hb.
	HeartBeatEvent

	// NoHeartBeatEvent indicates the event when long time no receive hb.
	NoHeartBeatEvent

	// ConnUnsteadinessEvent indicates the event when conn is not steady.
	ConnUnsteadinessEvent

	// NewBlockEvent indicates the event for new block.
	NewBlockEvent

	// NewTxEvent indicates the event for new tx.
	NewTxEvent

	// PeerConnEvent indicates the event for conn.
	PeerConnEvent

	// PeerDisconnEvent indicates the event for disconn.
	PeerDisconnEvent
)

type BusPublisher

type BusPublisher interface {
	Publish(topic string, args ...interface{})
}

BusPublisher defines publishing-related bus behavior

type BusSubscriber

type BusSubscriber interface {
	Subscribe(topic string, fn interface{}) error
	SubscribeUniq(topic string, fn interface{}) error
	SubscribeAsync(topic string, fn interface{}, transactional bool) error
	SubscribeOnce(topic string, fn interface{}) error
	SubscribeOnceAsync(topic string, fn interface{}) error
	Unsubscribe(topic string, handler interface{}) error
}

BusSubscriber defines subscription-related bus behavior

type EventBus

type EventBus struct {
	// contains filtered or unexported fields
}

EventBus - box for handlers and callbacks.

func (*EventBus) HasReplier

func (bus *EventBus) HasReplier(topic string) bool

HasReplier returns true if exists a receiver on the topic.

func (*EventBus) HasSubscriber

func (bus *EventBus) HasSubscriber(topic string) bool

HasSubscriber returns true if exists any callback subscribed to the topic.

func (*EventBus) Publish

func (bus *EventBus) Publish(topic string, args ...interface{})

Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.

func (*EventBus) Reply

func (bus *EventBus) Reply(topic string, fn interface{}, transactional bool) error

Reply receives send-reply message on a topic. There should be only one function receiving message on one topic. Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

func (*EventBus) Send

func (bus *EventBus) Send(topic string, args ...interface{})

Send sends a send-reply message on a topic to replier

func (*EventBus) StopReply

func (bus *EventBus) StopReply(topic string, fn interface{}) error

StopReply removes replier callback defined for a topic. Returns error if there is no callback is receiving the topic.

func (*EventBus) Subscribe

func (bus *EventBus) Subscribe(topic string, fn interface{}) error

Subscribe subscribes to a topic. Returns error if `fn` is not a function.

func (*EventBus) SubscribeAsync

func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error

SubscribeAsync subscribes to a topic with an asynchronous callback Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently (false) Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnce

func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error

SubscribeOnce subscribes to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) SubscribeOnceAsync

func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error

SubscribeOnceAsync subscribes to a topic once with an asynchronous callback Handler will be removed after executing. Returns error if `fn` is not a function.

func (*EventBus) SubscribeUniq added in v0.3.0

func (bus *EventBus) SubscribeUniq(topic string, fn interface{}) error

SubscribeUniq subscribes to a topic uniq.

func (*EventBus) Unsubscribe

func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error

Unsubscribe removes callback defined for a topic. Returns error if there are no callbacks subscribed to the topic.

func (*EventBus) WaitAsync

func (bus *EventBus) WaitAsync()

WaitAsync waits for all async callbacks to complete

type MsgReplier

type MsgReplier interface {
	Reply(topic string, fn interface{}, transactional bool) error
	StopReply(topic string, fn interface{}) error
}

MsgReplier defines worker behavior for message sent by sender

type MsgSender

type MsgSender interface {
	Send(topic string, args ...interface{})
}

MsgSender sends message to replier who should reply the message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL