messagebus

package module
v1.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 10, 2019 License: MIT Imports: 4 Imported by: 35

README

Vardius - message-bus

Build Status Go Report Card codecov license

Go simple async message bus.

ABOUT

Contributors:

Want to contribute ? Feel free to send pull requests!

Have problems, bugs, feature ideas? We are using the github issue tracker to manage them.

HOW TO USE

  1. GoDoc

Benchmark

CPU: 3,3 GHz Intel Core i7

RAM: 16 GB 2133 MHz LPDDR3

➜  message-bus git:(master) ✗ go test -bench=. -cpu=4 -benchmem
goos: darwin
goarch: amd64
BenchmarkBus-4                   3000000               534 ns/op              56 B/op          3 allocs/op
BenchmarkBusParallel-4           5000000               313 ns/op              48 B/op          2 allocs/op
BenchmarkBus100-4                 100000             14651 ns/op              56 B/op          3 allocs/op
BenchmarkBus100Parallel-4         300000             14130 ns/op              48 B/op          2 allocs/op
BenchmarkBus1000-4                 10000            159269 ns/op              56 B/op          3 allocs/op
BenchmarkBus1000Parallel-4         10000            142578 ns/op              48 B/op          2 allocs/op
BenchmarkBusNumCPU-4             1000000              1155 ns/op              56 B/op          3 allocs/op
BenchmarkBusNumCPUParallel-4     2000000               774 ns/op              48 B/op          2 allocs/op
PASS
ok      message-bus    23.125s

Basic example

package main

import (
    "fmt"

    "github.com/vardius/message-bus"
)

func main() {
    queueSize := 100
    bus := messagebus.New(queueSize)

    var wg sync.WaitGroup
    wg.Add(2)

    bus.Subscribe("topic", func(v bool) {
        defer wg.Done()
        fmt.Println(v)
    })

    bus.Subscribe("topic", func(v bool) {
        defer wg.Done()
        fmt.Println(v)
    })

    // Publish block only when the buffer of one of the subscribers is full.
    // change the buffer size altering queueSize when creating new messagebus
    bus.Publish("topic", true)
    wg.Wait()
}

License

This package is released under the MIT license. See the complete license in the package:

LICENSE

Documentation

Overview

Package messagebus provides simple async message publisher

Example
package main

import (
	"fmt"
	"sync"

	messagebus "github.com/vardius/message-bus"
)

func main() {
	queueSize := 100
	bus := messagebus.New(queueSize)

	var wg sync.WaitGroup
	wg.Add(2)

	bus.Subscribe("topic", func(v bool) {
		defer wg.Done()
		fmt.Println("s1", v)
	})

	bus.Subscribe("topic", func(v bool) {
		defer wg.Done()
		fmt.Println("s2", v)
	})

	// Publish block only when the buffer of one of the subscribers is full.
	// change the buffer size altering queueSize when creating new messagebus
	bus.Publish("topic", true)
	wg.Wait()

}
Output:

s1 true
s2 true
Example (Second)
package main

import (
	"fmt"

	messagebus "github.com/vardius/message-bus"
)

func main() {
	queueSize := 2
	subscribersAmount := 3

	ch := make(chan int, queueSize)
	defer close(ch)

	bus := messagebus.New(queueSize)

	for i := 0; i < subscribersAmount; i++ {
		bus.Subscribe("topic", func(i int, out chan<- int) { out <- i })
	}

	go func() {
		for n := 0; n < queueSize; n++ {
			bus.Publish("topic", n, ch)
		}
	}()

	var sum = 0
	for sum < (subscribersAmount * queueSize) {
		select {
		case <-ch:
			sum++
		}
	}

	fmt.Println(sum)
}
Output:

6

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MessageBus

type MessageBus interface {
	Publish(topic string, args ...interface{})
	Close(topic string)
	Subscribe(topic string, fn interface{}) error
	Unsubscribe(topic string, fn interface{}) error
}

MessageBus implements publish/subscribe messaging paradigm

func New

func New(handlerQueueSize int) MessageBus

New creates new MessageBus handlerQueueSize sets buffered channel length per subscriber

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL