zmqpubsub

package module
v0.0.0-...-27c6326 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2015 License: MIT Imports: 5 Imported by: 21

README

zmqpubsub

zmqpubsub is a simple Go pubsub implementation on top of ZeroMQ.

It abstracts the underlying ZeroMQ machinery to provide a Go-friendly API for the publish-subscribe messaging pattern.

Install

go get -tags zmq_3_x github.com/hpcloud/zmqpubsub

Usage

Broker

First setup a broker:

var Broker zmqpubsub.Broker

func init() {
	Broker.PubAddr = "tcp://127.0.0.1:4000"
	Broker.SubAddr = "tcp://127.0.0.1:4001"
	Broker.BufferSize = 100
}

func main() {
    ...
    Broker.MustRun()
}

The broker specifies the addresses to which publishers/subscribers will connect to/from.

Subscriber

Subscription messages are sent in a Go channel. Thanks to Tomb, subscriptions can be stopped at any time by calling Stop.

sub := Broker.Subscribe("")
defer sub.Stop()

for msg := range sub.Ch {
    fmt.Printf("%s => %s\n", msg.Key, msg.Value)
}

Publisher

The publisher part is equivalently simple. It should be noted however that as publishers are not thread-safe they must be managed from the same goroutine that created them.

pub := Broker.NewPublisherMust()
defer pub.Stop()

pub.MustPublish("key", "hello world")
pub.MustPublish("key", "hello universe")

Example

To run the provided example,

go run -tags zmq_3_x example/pubsub.go

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetGlobalContext

func GetGlobalContext() (*zmq.Context, error)

GetGlobalContext returns a singleton zmq Context for the current Go process.

Types

type Broker

type Broker struct {
	PubAddr         string // Publisher Endpoint Address
	SubAddr         string // Subscriber Endpoint Address
	BufferSize      int    // Memory buffer size
	SubscribeFilter string
}

func (Broker) MustRun

func (z Broker) MustRun()

func (Broker) NewPublisher

func (z Broker) NewPublisher() (*Publisher, error)

func (Broker) NewPublisher2

func (z Broker) NewPublisher2(addr string) (*Publisher, error)

func (Broker) NewPublisherMust

func (z Broker) NewPublisherMust() *Publisher

func (Broker) Run

func (z Broker) Run() error

Run runs a broker for this pubsub configuration.

func (Broker) Subscribe

func (z Broker) Subscribe(filters ...string) *Subscription

Subscribe returns a subscription (channel) for given filters.

type Forwarder

type Forwarder struct {
	// contains filtered or unexported fields
}

Forwarder is a zeromq forwarder device acting as a broker between multiple publishers and multiple subscribes.

func NewForwarder

func NewForwarder(options Broker) (*Forwarder, error)

func (*Forwarder) Run

func (b *Forwarder) Run() error

type Message

type Message struct {
	Key   string
	Value string
}

Message represents a zeromq message with two parts, Key and Value separated by a single space assuming the convention that Key is used to match against subscribe filters.

func NewMessage

func NewMessage(data []byte) Message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher creates a thread/goroutine-unsafe publisher. Only a single goroutine must use the created publisher.

func (*Publisher) MustPublish

func (p *Publisher) MustPublish(key string, value string)

func (*Publisher) Publish

func (p *Publisher) Publish(key string, value string) error

func (*Publisher) Stop

func (p *Publisher) Stop()

type Subscription

type Subscription struct {
	Ch chan Message // Channel to read messages from
	tomb.Tomb
	// contains filtered or unexported fields
}

Subscription provides channel abstraction over zmq SUB sockets

func (*Subscription) Stop

func (sub *Subscription) Stop() error

Stop stops this Subscription

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL