Documentation ¶
Overview ¶
Package messagebus provides simple async message publisher
Example ¶
package main import ( "fmt" "sync" messagebus "github.com/vardius/message-bus" ) func main() { queueSize := 100 bus := messagebus.New(queueSize) var wg sync.WaitGroup wg.Add(2) bus.Subscribe("topic", func(v bool) { defer wg.Done() fmt.Println("s1", v) }) bus.Subscribe("topic", func(v bool) { defer wg.Done() fmt.Println("s2", v) }) // Publish block only when the buffer of one of the subscribers is full. // change the buffer size altering queueSize when creating new messagebus bus.Publish("topic", true) wg.Wait() }
Output: s1 true s2 true
Example (Second) ¶
package main import ( "fmt" messagebus "github.com/vardius/message-bus" ) func main() { queueSize := 2 subscribersAmount := 3 ch := make(chan int, queueSize) defer close(ch) bus := messagebus.New(queueSize) for i := 0; i < subscribersAmount; i++ { bus.Subscribe("topic", func(i int, out chan<- int) { out <- i }) } go func() { for n := 0; n < queueSize; n++ { bus.Publish("topic", n, ch) } }() var sum = 0 for sum < (subscribersAmount * queueSize) { select { case <-ch: sum++ } } fmt.Println(sum) }
Output: 6
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageBus ¶
type MessageBus interface { // Publish publishes arguments to the given topic subscribers // Publish block only when the buffer of one of the subscribers is full. Publish(topic string, args ...interface{}) // Close unsubscribe all handlers from given topic Close(topic string) // Subscribe subscribes to the given topic Subscribe(topic string, fn interface{}) error // Unsubscribe unsubscribe handler from the given topic Unsubscribe(topic string, fn interface{}) error }
MessageBus implements publish/subscribe messaging paradigm
func New ¶
func New(handlerQueueSize int) MessageBus
New creates new MessageBus handlerQueueSize sets buffered channel length per subscriber
Click to show internal directories.
Click to hide internal directories.