rtreq

package module
v0.0.0-...-8d70db6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2017 License: MIT Imports: 15 Imported by: 0

README

ZMQ REQ/ROUTER Template

This package is a template for designing asynchronous ZMQ servers with the REQ/ROUTER pattern.

Quickstart

Install the server/client system using go get as follows:

$ go get github.com/bbengfort/rtreq/...

You should then have the rtreq command installed on your system:

$ rtreq --help

You can run the server as follows:

$ rtreq serve

And send messages from the client as:

$ rtreq send "hello world"

Note the various arguments you can pass to both serve and send to configure the setup. Run benchmarks with the bench command:

$ rtreq bench

The primary comparison is REQ/REP vs REQ/ROUTER sockets.

Documentation

Index

Constants

View Source
const (
	Trace uint8 = iota
	Debug
	Info
	Status
	Warn
	Silent
)

Levels for implementing the debug and trace message functionality.

View Source
const DefaultNWorkers = 16

DefaultNWorkers is the number of workers allocated to handle clients.

View Source
const IPCAddr = "ipc://workers.ipc"

IPCAddr is the in process communcation socket for workers.

Variables

View Source
var (
	ErrNotImplemented = errors.New("functionality not implemented yet")
)

Standard errors for primary operations.

Functions

func LogLevel

func LogLevel() string

LogLevel returns a string representation of the current level

func SetLogLevel

func SetLogLevel(level uint8)

SetLogLevel modifies the log level for messages at runtime. Ensures that the highest level that can be set is the trace level.

Types

type Client

type Client struct {
	Transporter
	// contains filtered or unexported fields
}

Client communicates a server.

func NewClient

func NewClient(addr, name string, context *zmq.Context) (c *Client, err error)

NewClient creates a new rtreq.Client. If context is nil, it also creates a context that will be managed by the sever.

func (*Client) Access

func (c *Client) Access(done chan<- bool, echan chan<- error, retries int, timeout time.Duration)

Access sends a request to the server and waits for a response, measuring the latency of the message send to get throughput benchmarks.

func (*Client) Benchmark

func (c *Client) Benchmark(duration time.Duration, results string, retries int, timeout time.Duration, nClients int) error

Benchmark the throughput in terms of messages per second to the zmqnet.

func (*Client) Connect

func (c *Client) Connect() (err error)

Connect to the remote peer

func (*Client) Reset

func (c *Client) Reset() error

Reset the socket by setting the linger to 0, closing it, then reconnecting.

func (*Client) Results

func (c *Client) Results(path string, data map[string]interface{}) error

Results saves the throughput to disk

func (*Client) Send

func (c *Client) Send(message string, retries int, timeout time.Duration) error

Send a message to the remote peer in a safe fashion, specifying the # of retries and the timeout to wait on.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error wraps other library errors and provides easier logging.

func WrapError

func WrapError(format string, err error, a ...interface{}) *Error

WrapError creates a new wrapped error message with the format string.

func (*Error) Error

func (e *Error) Error() string

Error prefixes the message to the internal error string

func (*Error) String

func (e *Error) String() string

String returns the error message

type Metrics

type Metrics struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Metrics tracks the measurable statistics of the system over time from the perspective of the local replica. Many stats are simply counters, other statistics perform online computations of the distribution of values.

func (*Metrics) Accesses

func (m *Metrics) Accesses() uint64

Accesses returns the total number of accesses to the replica.

func (*Metrics) Append

func (m *Metrics) Append(o *Metrics)

Append another metrics' data to the current metrics

func (*Metrics) ClientMean

func (m *Metrics) ClientMean() float64

ClientMean returns the average number of accesses per client.

func (*Metrics) Complete

func (m *Metrics) Complete()

Complete an access and set the finished time.

func (*Metrics) Duration

func (m *Metrics) Duration() time.Duration

Duration computes the amount of time during which accesses were received.

func (*Metrics) Increment

func (m *Metrics) Increment(client string)

Increment the access metrics and set the started time.

func (*Metrics) Init

func (m *Metrics) Init()

Init the metrics

func (*Metrics) NClients

func (m *Metrics) NClients() uint64

NClients returns the number of clients accessing the replica

func (*Metrics) Serialize

func (m *Metrics) Serialize(extra map[string]interface{}) map[string]interface{}

Serialize the data structure to a map

func (*Metrics) String

func (m *Metrics) String() string

String returns a quick summary of the access metrics

func (*Metrics) Throughput

func (m *Metrics) Throughput() (throughput float64)

Throughput computes the number of messages per second.

func (*Metrics) Write

func (m *Metrics) Write(path string, extra map[string]interface{}) error

Write the metrics to the path, appending the JSON as a line to the file.

type RepServer

type RepServer struct {
	Transporter
}

RepServer responds to requests from other peers using a REP socket.

func (*RepServer) Run

func (s *RepServer) Run() (err error)

Run the server and listen for messages

func (*RepServer) Shutdown

func (s *RepServer) Shutdown(path string) error

Shutdown the server and print the metrics out

type RouterServer

type RouterServer struct {
	sync.Mutex
	Transporter
	// contains filtered or unexported fields
}

RouterServer responds to requests from other peers using a ROUTER socket.

func (*RouterServer) Close

func (s *RouterServer) Close() (err error)

Close the socket and clean up the connections.

func (*RouterServer) Run

func (s *RouterServer) Run() (err error)

Run the server and listen for messages

func (*RouterServer) SetWorkers

func (s *RouterServer) SetWorkers(n int)

SetWorkers specifies the number of workers, if n is 0 uses DefaultNWorkers

func (*RouterServer) Shutdown

func (s *RouterServer) Shutdown(path string) error

Shutdown the server and print the metrics out

type Server

type Server interface {
	Init(addr, name string, context *zmq.Context)
	Run() error
	Shutdown(path string) error
}

Server represents a transporter that can respond to requests from peers.

func NewServer

func NewServer(addr, name string, sync bool, nWorkers int, context *zmq.Context) (s Server, err error)

NewServer creates a new rtreq.Server. If context is nil, it also creates a context that will be managed by the sever.

type Transporter

type Transporter struct {
	// contains filtered or unexported fields
}

Transporter is a wrapper around a zmq.Socket object that is accessed by a single host, either remote or local. Both clients and servers are transporters.

The primary role of the transporter is to send and receive messages defined as protocol buffers. They can wrap any type of ZMQ object and its up to the primary classes to instantiate the socket correctly.

func (*Transporter) Close

func (t *Transporter) Close() error

Close the socket and clean up the connections.

func (*Transporter) Init

func (t *Transporter) Init(addr, name string, context *zmq.Context)

Init the transporter with the specified host and any other internal data.

func (*Transporter) Shutdown

func (t *Transporter) Shutdown() error

Shutdown the ZMQ context permanently (should only be called once).

type Worker

type Worker struct {
	Transporter
}

Worker connects to an inprocess socket and handle client messages in parallel without sharing state. Workers have all the benefits of other transporters, but maintain local sockets.

func (*Worker) Init

func (w *Worker) Init(name string, context *zmq.Context)

Init the worker and connect it.

func (*Worker) Run

func (w *Worker) Run() error

Run the worker to listen for messages and respond to them.

Directories

Path Synopsis
cmd
Package msg is a generated protocol buffer package.
Package msg is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL