mangos

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 23, 2018 License: Apache-2.0 Imports: 11 Imported by: 0

README

mangos

Linux Status Windows Status Apache License Gitter GoDoc Go Report Card

Package mangos is an implementation in pure Go of the SP ("Scalability Protocols") messaging system. This makes heavy use of go channels, internally, but it can operate on systems that lack support for cgo.

NOTE: The repository has moved from github.com/go-mangos/mangos. Please import using nanomsg.org/go-mangos. Also, be advised that the master branch of this repository may contain breaking changes. Therefore, consider using a tag, such as v1, to ensure that you have the latest stable version.

The reference implementation of the SP protocols is available as nanomsg™; there is also an effort to implement an improved and more capable version of nanomsg called NNG™.

The design is intended to make it easy to add new transports with almost trivial effort, as well as new topologies ("protocols" in SP terminology.)

At present, all of the Req/Rep, Pub/Sub, Pair, Bus, Push/Pull, and Surveyor/Respondent patterns are supported.

Additionally, there is an experimental new pattern called STAR available. This pattern is like Bus, except that the messages are delivered not just to immediate peers, but to all members of the topology. Developers must be careful not to create cycles in their network when using this pattern, otherwise infinite loops can occur.

Supported transports include TCP, inproc, IPC, Websocket, Websocket/TLS and TLS. Use addresses of the form "tls+tcp://:" to access TLS. Note that ipc:// is not supported on Windows (by either this or the reference implementation.) Forcing the local TCP port in Dial is not supported yet (this is rarely useful).

Basic interoperability with nanomsg and NNG has been verified (you can do so yourself with nanocat and macat) for all protocols and transports that NNG and nanomsg support. Additionally there are a number of projects that use the two products together.

There is a third party experimental QUIC transport available at quic-mangos. (An RFE to make this transport official exists.)

If you find this useful, I would appreciate knowing about it. I can be reached via my email address, garrett -at- damore -dot- org

Installing

Using go get

$ go get -u github.com/nanomsg/go-mangos

After this command mangos is ready to use. Its source will be in:

$GOPATH/src/pkg/github.com/nanomsg.org/go-mangos

You can use go get -u -a to update all installed packages.

Documentation

For docs, see http://godoc.org/nanomsg.org/go-mangos or run:

$ godoc nanomsg.org/go-mangos

Testing

This package supports internal self tests, which can be run in the idiomatic Go way. (Note that most of the tests are in a test subdirectory.)

$ go test nanomsg.org/go-mangos/...

There are also internal benchmarks available:

$ go test -bench=. nanomsg.org/go-mangos/test

Commercial Support

Staysail Systems, Inc. offers commercial support for mangos.

Examples

Some examples are posted in the directories under examples/ in this project.

These examples are rewrites (in Go) of Tim Dysinger's Getting Started with Nanomsg.

godoc in the example directories will yield information about how to run each example program.

Enjoy!

Copyright 2018 The Mangos Authors

mangos™, Nanomsg™ and NNG™ are trademarks of Garrett D'Amore.

Documentation

Overview

Package mangos provides a pure Go implementation of the Scalability Protocols. These are more familiarily known as "nanomsg" which is the C-based software package that is also their reference implementation.

These protocols facilitate the rapid creation of applications which rely on multiple participants in sometimes complex communications topologies, including Request/Reply, Publish/Subscribe, Push/Pull, Surveyor/Respondant, etc.

For more information, see www.nanomsg.org.

Index

Constants

View Source
const (
	// OptionRaw is used to enable RAW mode processing.  The details of
	// how this varies from normal mode vary from protocol to protocol.
	// RAW mode corresponds to AF_SP_RAW in the C variant, and must be
	// used with Devices.  In particular, RAW mode sockets are completely
	// stateless -- any state between recv/send messages is included in
	// the message headers.  Protocol names starting with "X" default
	// to the RAW mode of the same protocol without the leading "X".
	// The value passed is a bool.
	OptionRaw = "RAW"

	// OptionRecvDeadline is the time until the next Recv times out.  The
	// value is a time.Duration.  Zero value may be passed to indicate that
	// no timeout should be applied.  A negative value indicates a
	// non-blocking operation.  By default there is no timeout.
	OptionRecvDeadline = "RECV-DEADLINE"

	// OptionSendDeadline is the time until the next Send times out.  The
	// value is a time.Duration.  Zero value may be passed to indicate that
	// no timeout should be applied.  A negative value indicates a
	// non-blocking operation.  By default there is no timeout.
	OptionSendDeadline = "SEND-DEADLINE"

	// OptionRetryTime is used by REQ.  The argument is a time.Duration.
	// When a request has not been replied to within the given duration,
	// the request will automatically be resent to an available peer.
	// This value should be longer than the maximum possible processing
	// and transport time.  The value zero indicates that no automatic
	// retries should be sent.  The default value is one minute.
	//
	// Note that changing this option is only guaranteed to affect requests
	// sent after the option is set.  Changing the value while a request
	// is outstanding may not have the desired effect.
	OptionRetryTime = "RETRY-TIME"

	// OptionSubscribe is used by SUB/XSUB.  The argument is a []byte.
	// The application will receive messages that start with this prefix.
	// Multiple subscriptions may be in effect on a given socket.  The
	// application will not receive messages that do not match any current
	// subscriptions.  (If there are no subscriptions for a SUB/XSUB
	// socket, then the application will not receive any messages.  An
	// empty prefix can be used to subscribe to all messages.)
	OptionSubscribe = "SUBSCRIBE"

	// OptionUnsubscribe is used by SUB/XSUB.  The argument is a []byte,
	// representing a previously established subscription, which will be
	// removed from the socket.
	OptionUnsubscribe = "UNSUBSCRIBE"

	// OptionSurveyTime is used to indicate the deadline for survey
	// responses, when used with a SURVEYOR socket.  Messages arriving
	// after this will be discarded.  Additionally, this will set the
	// OptionRecvDeadline when starting the survey, so that attempts to
	// receive messages fail with ErrRecvTimeout when the survey is
	// concluded.  The value is a time.Duration.  Zero can be passed to
	// indicate an infinite time.  Default is 1 second.
	OptionSurveyTime = "SURVEY-TIME"

	// OptionTLSConfig is used to supply TLS configuration details. It
	// can be set using the ListenOptions or DialOptions.
	// The parameter is a tls.Config pointer.
	OptionTLSConfig = "TLS-CONFIG"

	// OptionWriteQLen is used to set the size, in messages, of the write
	// queue channel. By default, it's 128. This option cannot be set if
	// Dial or Listen has been called on the socket.
	OptionWriteQLen = "WRITEQ-LEN"

	// OptionReadQLen is used to set the size, in messages, of the read
	// queue channel. By default, it's 128. This option cannot be set if
	// Dial or Listen has been called on the socket.
	OptionReadQLen = "READQ-LEN"

	// OptionKeepAlive is used to set TCP KeepAlive.  Value is a boolean.
	// Default is true.
	OptionKeepAlive = "KEEPALIVE"

	// OptionNoDelay is used to configure Nagle -- when true messages are
	// sent as soon as possible, otherwise some buffering may occur.
	// Value is a boolean.  Default is true.
	OptionNoDelay = "NO-DELAY"

	// OptionLinger is used to set the linger property.  This is the amount
	// of time to wait for send queues to drain when Close() is called.
	// Close() may block for up to this long if there is unsent data, but
	// will return as soon as all data is delivered to the transport.
	// Value is a time.Duration.  Default is one second.
	OptionLinger = "LINGER"

	// OptionTTL is used to set the maximum time-to-live for messages.
	// Note that not all protocols can honor this at this time, but for
	// those that do, if a message traverses more than this many devices,
	// it will be dropped.  This is used to provide protection against
	// loops in the topology.  The default is protocol specific.
	OptionTTL = "TTL"

	// OptionMaxRecvSize supplies the maximum receive size for inbound
	// messages.  This option exists because the wire protocol allows
	// the sender to specify the size of the incoming message, and
	// if the size were overly large, a bad remote actor could perform a
	// remote Denial-Of-Service by requesting ridiculously  large message
	// sizes and then stalling on send.  The default value is 1MB.
	//
	// A value of 0 removes the limit, but should not be used unless
	// absolutely sure that the peer is trustworthy.
	//
	// Not all transports honor this lmit.  For example, this limit
	// makes no sense when used with inproc.
	//
	// Note that the size includes any Protocol specific header.  It is
	// better to pick a value that is a little too big, than too small.
	//
	// This option is only intended to prevent gross abuse  of the system,
	// and not a substitute for proper application message verification.
	OptionMaxRecvSize = "MAX-RCV-SIZE"

	// OptionReconnectTime is the initial interval used for connection
	// attempts.  If a connection attempt does not succeed, then ths socket
	// will wait this long before trying again.  An optional exponential
	// backoff may cause this value to grow.  See OptionMaxReconnectTime
	// for more details.   This is a time.Duration whose default value is
	// 100msec.  This option must be set before starting any dialers.
	OptionReconnectTime = "RECONNECT-TIME"

	// OptionMaxReconnectTime is the maximum value of the time between
	// connection attempts, when an exponential backoff is used.  If this
	// value is zero, then exponential backoff is disabled, otherwise
	// the value to wait between attempts is doubled until it hits this
	// limit.  This value is a time.Duration, with initial value 0.
	// This option must be set before starting any dialers.
	OptionMaxReconnectTime = "MAX-RECONNECT-TIME"

	// OptionBestEffort enables non-blocking send operations on the
	// socket. Normally (for some socket types), a socket will block if
	// there are no receivers, or the receivers are unable to keep up
	// with the sender. (Multicast sockets types like Bus or Star do not
	// behave this way.)  If this option is set, instead of blocking, the
	// message will be silently discarded.  The value is a boolean, and
	// defaults to False.
	OptionBestEffort = "BEST-EFFORT"
)
View Source
const (
	PortActionAdd = iota
	PortActionRemove
)

PortAction values.

View Source
const (
	// PropLocalAddr expresses a local address.  For dialers, this is
	// the (often random) address that was locally bound.  For listeners,
	// it is usually the service address.  The value is a net.Addr.
	PropLocalAddr = "LOCAL-ADDR"

	// PropRemoteAddr expresses a remote address.  For dialers, this is
	// the service address.  For listeners, its the address of the far
	// end dialer.  The value is a net.Addr.
	PropRemoteAddr = "REMOTE-ADDR"

	// PropTLSConnState is used to supply TLS connection details. The
	// value is a tls.ConnectionState.  It is only valid when TLS is used.
	PropTLSConnState = "TLS-STATE"

	// PropHTTPRequest conveys an *http.Request.  This property only exists
	// for websocket connections.
	PropHTTPRequest = "HTTP-REQUEST"
)
View Source
const (
	ProtoPair       = (1 * 16)
	ProtoPub        = (2 * 16)
	ProtoSub        = (2 * 16) + 1
	ProtoReq        = (3 * 16)
	ProtoRep        = (3 * 16) + 1
	ProtoPush       = (5 * 16)
	ProtoPull       = (5 * 16) + 1
	ProtoSurveyor   = (6 * 16) + 2
	ProtoRespondent = (6 * 16) + 3
	ProtoBus        = (7 * 16)

	ProtoStar = (100 * 16)
)

Useful constants for protocol numbers. Note that the major protocol number is stored in the upper 12 bits, and the minor (subprotocol) is located in the bottom 4 bits.

Variables

View Source
var (
	ErrBadAddr     = errors.New("invalid address")
	ErrBadHeader   = errors.New("invalid header received")
	ErrBadVersion  = errors.New("invalid protocol version")
	ErrTooShort    = errors.New("message is too short")
	ErrTooLong     = errors.New("message is too long")
	ErrClosed      = errors.New("connection closed")
	ErrConnRefused = errors.New("connection refused")
	ErrSendTimeout = errors.New("send time out")
	ErrRecvTimeout = errors.New("receive time out")
	ErrProtoState  = errors.New("incorrect protocol state")
	ErrProtoOp     = errors.New("invalid operation for protocol")
	ErrBadTran     = errors.New("invalid or unsupported transport")
	ErrBadProto    = errors.New("invalid or unsupported protocol")
	ErrPipeFull    = errors.New("pipe full")
	ErrPipeEmpty   = errors.New("pipe empty")
	ErrBadOption   = errors.New("invalid or unsupported option")
	ErrBadValue    = errors.New("invalid option value")
	ErrGarbled     = errors.New("message garbled")
	ErrAddrInUse   = errors.New("address in use")
	ErrBadProperty = errors.New("invalid property name")
	ErrTLSNoConfig = errors.New("missing TLS configuration")
	ErrTLSNoCert   = errors.New("missing TLS certificates")
)

Various error codes.

Functions

func Device

func Device(s1 Socket, s2 Socket) error

Device is used to create a forwarding loop between two sockets. If the same socket is listed (or either socket is nil), then a loopback device is established instead. Note that the single socket case is only valid for protocols where the underlying protocol can peer for itself (e.g. PAIR, or BUS, but not REQ/REP or PUB/SUB!) Both sockets will be placed into RAW mode.

If the plumbing is successful, nil will be returned. Two threads will be established to forward messages in each direction. If either socket returns error on receive or send, the goroutine doing the forwarding will exit. This means that closing either socket will generally cause the goroutines to exit. Apart from closing the socket(s), no further operations should be performed against the socket.

func DrainChannel added in v1.2.0

func DrainChannel(ch chan<- *Message, expire time.Time) bool

DrainChannel waits for the channel of Messages to finish emptying (draining) for up to the expiration. It returns true if the drain completed (the channel is empty), false otherwise.

func NullRecv added in v1.2.0

func NullRecv(ep Endpoint)

NullRecv simply loops, receiving and discarding messages, until the Endpoint returns back a nil message. This allows the Endpoint to notice a dropped connection. It is intended for use by Protocols that are write only -- it lets them become aware of a loss of connectivity even when they have no data to send.

func ProtocolName

func ProtocolName(number uint16) string

ProtocolName returns the name corresponding to a given protocol number. This is useful for transports like WebSocket, which use a text name rather than the number in the handshake.

func ResolveTCPAddr added in v1.2.0

func ResolveTCPAddr(addr string) (*net.TCPAddr, error)

ResolveTCPAddr is like net.ResolveTCPAddr, but it handles the wildcard used in nanomsg URLs, replacing it with an empty string to indicate that all local interfaces be used.

func StripScheme

func StripScheme(t Transport, addr string) (string, error)

StripScheme removes the leading scheme (such as "http://") from an address string. This is mostly a utility for benefit of transport providers.

func ValidPeers

func ValidPeers(p1, p2 Protocol) bool

ValidPeers returns true if the two sockets are capable of peering to one another. For example, REQ can peer with REP, but not with BUS.

Types

type CondTimed

type CondTimed struct {
	sync.Cond
}

CondTimed is a condition variable (ala sync.Cond) but inclues a timeout.

func (*CondTimed) WaitAbsTimeout

func (cv *CondTimed) WaitAbsTimeout(when time.Time) bool

WaitAbsTimeout is like WaitRelTimeout, but expires on an absolute time instead of a relative one.

func (*CondTimed) WaitRelTimeout

func (cv *CondTimed) WaitRelTimeout(when time.Duration) bool

WaitRelTimeout is like Wait, but it times out. The fact that it timed out can be determined by checking the return value. True indicates that it woke up without a timeout (signaled another way), whereas false indicates a timeout occurred.

type Dialer

type Dialer interface {
	// Close closes the dialer, and removes it from any active socket.
	// Further operations on the Dialer will return ErrClosed.
	Close() error

	// Dial starts connecting on the address.  If a connection fails,
	// it will restart.
	Dial() error

	// Address returns the string (full URL) of the Listener.
	Address() string

	// SetOption sets an option the Listener. Setting options
	// can only be done before Listen() has been called.
	SetOption(name string, value interface{}) error

	// GetOption gets an option value from the Listener.
	GetOption(name string) (interface{}, error)
}

Dialer is an interface to the underlying dialer for a transport and address.

type Endpoint

type Endpoint interface {
	// GetID returns a unique 31-bit value associated with the Endpoint.
	// The value is unique for a given socket, at a given time.
	GetID() uint32

	// Close does what you think.
	Close() error

	// SendMsg sends a message.  On success it returns nil. This is a
	// blocking call.
	SendMsg(*Message) error

	// RecvMsg receives a message.  It blocks until the message is
	// received.  On error, the pipe is closed and nil is returned.
	RecvMsg() *Message
}

Endpoint represents the handle that a Protocol implementation has to the underlying stream transport. It can be thought of as one side of a TCP, IPC, or other type of connection.

type Listener

type Listener interface {
	// Close closes the listener, and removes it from any active socket.
	// Further operations on the Listener will return ErrClosed.
	Close() error

	// Listen starts listening for new connectons on the address.
	Listen() error

	// Address returns the string (full URL) of the Listener.
	Address() string

	// SetOption sets an option the Listener. Setting options
	// can only be done before Listen() has been called.
	SetOption(name string, value interface{}) error

	// GetOption gets an option value from the Listener.
	GetOption(name string) (interface{}, error)
}

Listener is an interface to the underlying listener for a transport and address.

type Message

type Message struct {
	// Header carries any protocol (SP) specific header.  Applications
	// should not modify or use this unless they are using Raw mode.
	// No user data may be placed here.
	Header []byte

	// Body carries the body of the message.  This can also be thought
	// of as the message "payload".
	Body []byte

	// Port may be set on message receipt, to indicate the Port from
	// which the Message was received.  There are no guarantees that the
	// Port is still active, and applications should only use this for
	// informational purposes.
	Port Port
	// contains filtered or unexported fields
}

Message encapsulates the messages that we exchange back and forth. The meaning of the Header and Body fields, and where the splits occur, will vary depending on the protocol. Note however that any headers applied by transport layers (including TCP/ethernet headers, and SP protocol independent length headers), are *not* included in the Header.

func NewMessage

func NewMessage(sz int) *Message

NewMessage is the supported way to obtain a new Message. This makes use of a "cache" which greatly reduces the load on the garbage collector.

func (*Message) Dup

func (m *Message) Dup() *Message

Dup creates a "duplicate" message. What it really does is simply increment the reference count on the message. Note that since the underlying message is actually shared, consumers must take care not to modify the message. (We might revise this API in the future to add a copy-on-write facility, but for now modification is neither needed nor supported.) Applications should *NOT* make use of this function -- it is intended for Protocol, Transport and internal use only.

func (*Message) Expired added in v1.2.0

func (m *Message) Expired() bool

Expired returns true if the message has "expired". This is used by transport implementations to discard messages that have been stuck in the write queue for too long, and should be discarded rather than delivered across the transport. This is only used on the TX path, there is no sense of "expiration" on the RX path.

func (*Message) Free

func (m *Message) Free()

Free decrements the reference count on a message, and releases its resources if no further references remain. While this is not strictly necessary thanks to GC, doing so allows for the resources to be recycled without engaging GC. This can have rather substantial benefits for performance.

type Pipe

type Pipe interface {

	// Send sends a complete message.  In the event of a partial send,
	// the Pipe will be closed, and an error is returned.  For reasons
	// of efficiency, we allow the message to be sent in a scatter/gather
	// list.
	Send(*Message) error

	// Recv receives a complete message.  In the event that either a
	// complete message could not be received, an error is returned
	// to the caller and the Pipe is closed.
	//
	// To mitigate Denial-of-Service attacks, we limit the max message
	// size to 1M.
	Recv() (*Message, error)

	// Close closes the underlying transport.  Further operations on
	// the Pipe will result in errors.  Note that messages that are
	// queued in transport buffers may still be received by the remote
	// peer.
	Close() error

	// LocalProtocol returns the 16-bit SP protocol number used by the
	// local side.  This will normally be sent to the peer during
	// connection establishment.
	LocalProtocol() uint16

	// RemoteProtocol returns the 16-bit SP protocol number used by the
	// remote side.  This will normally be received from the peer during
	// connection establishment.
	RemoteProtocol() uint16

	// IsOpen returns true if the underlying connection is open.
	IsOpen() bool

	// GetProp returns an arbitrary transport specific property.
	// These are like options, but are read-only and specific to a single
	// connection.  If the property doesn't exist, then ErrBadProperty
	// should be returned.
	GetProp(string) (interface{}, error)
}

Pipe behaves like a full-duplex message-oriented connection between two peers. Callers may call operations on a Pipe simultaneously from different goroutines. (These are different from net.Conn because they provide message oriented semantics.)

Pipe is only intended for use by transport implementors, and should not be directly used in applications.

func NewConnPipe

func NewConnPipe(c net.Conn, sock Socket, props ...interface{}) (Pipe, error)

NewConnPipe allocates a new Pipe using the supplied net.Conn, and initializes it. It performs the handshake required at the SP layer, only returning the Pipe once the SP layer negotiation is complete.

Stream oriented transports can utilize this to implement a Transport. The implementation will also need to implement PipeDialer, PipeAccepter, and the Transport enclosing structure. Using this layered interface, the implementation needn't bother concerning itself with passing actual SP messages once the lower layer connection is established.

func NewConnPipeIPC

func NewConnPipeIPC(c net.Conn, sock Socket, props ...interface{}) (Pipe, error)

NewConnPipeIPC allocates a new Pipe using the IPC exchange protocol.

type PipeDialer

type PipeDialer interface {
	// Dial is used to initiate a connection to a remote peer.
	Dial() (Pipe, error)

	// SetOption sets a local option on the dialer.
	// ErrBadOption can be returned for unrecognized options.
	// ErrBadValue can be returned for incorrect value types.
	SetOption(name string, value interface{}) error

	// GetOption gets a local option from the dialer.
	// ErrBadOption can be returned for unrecognized options.
	GetOption(name string) (value interface{}, err error)
}

PipeDialer represents the client side of a connection. Clients initiate the connection.

PipeDialer is only intended for use by transport implementors, and should not be directly used in applications.

type PipeListener

type PipeListener interface {

	// Listen actually begins listening on the interface.  It is
	// called just prior to the Accept() routine normally. It is
	// the socket equivalent of bind()+listen().
	Listen() error

	// Accept completes the server side of a connection.  Once the
	// connection is established and initial handshaking is complete,
	// the resulting connection is returned to the client.
	Accept() (Pipe, error)

	// Close ceases any listening activity, and will specifically close
	// any underlying file descriptor.  Once this is done, the only way
	// to resume listening is to create a new Server instance.  Presumably
	// this function is only called when the last reference to the server
	// is about to go away.  Established connections are unaffected.
	Close() error

	// SetOption sets a local option on the listener.
	// ErrBadOption can be returned for unrecognized options.
	// ErrBadValue can be returned for incorrect value types.
	SetOption(name string, value interface{}) error

	// GetOption gets a local option from the listener.
	// ErrBadOption can be returned for unrecognized options.
	GetOption(name string) (value interface{}, err error)

	// Address gets the local address.  The value may not be meaningful
	// until Listen() has been called.
	Address() string
}

PipeListener represents the server side of a connection. Servers respond to a connection request from clients.

PipeListener is only intended for use by transport implementors, and should not be directly used in applications.

type Port

type Port interface {

	// Address returns the address (URL form) associated with the port.
	// This matches the string passed to Dial() or Listen().
	Address() string

	// GetProp returns an arbitrary property.  The details will vary
	// for different transport types.
	GetProp(name string) (interface{}, error)

	// IsOpen determines whether this is open or not.
	IsOpen() bool

	// Close closes the Conn.  This does a disconnect, or something similar.
	// Note that if a dialer is present and active, it will redial.
	Close() error

	// IsServer returns true if the connection is from a server (Listen).
	IsServer() bool

	// IsClient returns true if the connection is from a client (Dial).
	IsClient() bool

	// LocalProtocol returns the local protocol number.
	LocalProtocol() uint16

	// RemoteProtocol returns the remote protocol number.
	RemoteProtocol() uint16

	// Dialer returns the dialer for this Port, or nil if a server.
	Dialer() Dialer

	// Listener returns the listener for this Port, or nil if a client.
	Listener() Listener
}

Port represents the high level interface to a low level communications channel. There is one of these associated with a given TCP connection, for example. This interface is intended for application use.

Note that applicatons cannot send or receive data on a Port directly.

type PortAction

type PortAction int

PortAction determines whether the action on a Port is addition or removal.

type PortHook

type PortHook func(PortAction, Port) bool

PortHook is a function that is called when a port is added or removed to or from a Socket. In the case of PortActionAdd, the function may return false to indicate that the port should not be added.

type Protocol

type Protocol interface {

	// Init is called by the core to allow the protocol to perform
	// any initialization steps it needs.  It should save the handle
	// for future use, as well.
	Init(ProtocolSocket)

	// Shutdown is used to drain the send side.  It is only ever called
	// when the socket is being shutdown cleanly. Protocols should use
	// the linger time, and wait up to that time for sockets to drain.
	Shutdown(time.Time)

	// AddEndpoint is called when a new Endpoint is added to the socket.
	// Typically this is as a result of connect or accept completing.
	AddEndpoint(Endpoint)

	// RemoveEndpoint is called when an Endpoint is removed from the socket.
	// Typically this indicates a disconnected or closed connection.
	RemoveEndpoint(Endpoint)

	// ProtocolNumber returns a 16-bit value for the protocol number,
	// as assigned by the SP governing body. (IANA?)
	Number() uint16

	// Name returns our name.
	Name() string

	// PeerNumber() returns a 16-bit number for our peer protocol.
	PeerNumber() uint16

	// PeerName() returns the name of our peer protocol.
	PeerName() string

	// GetOption is used to retrieve the current value of an option.
	// If the protocol doesn't recognize the option, EBadOption should
	// be returned.
	GetOption(string) (interface{}, error)

	// SetOption is used to set an option.  EBadOption is returned if
	// the option name is not recognized, EBadValue if the value is
	// invalid.
	SetOption(string, interface{}) error
}

Protocol implementations handle the "meat" of protocol processing. Each protocol type will implement one of these. For protocol pairs (REP/REQ), there will be one for each half of the protocol.

type ProtocolRecvHook

type ProtocolRecvHook interface {
	// RecvHook is called just before the message is handed to the
	// application.  The message may be modified.  If false is returned,
	// then the message is dropped.
	RecvHook(*Message) bool
}

ProtocolRecvHook is intended to be an additional extension to the Protocol interface.

type ProtocolSendHook

type ProtocolSendHook interface {
	// SendHook is called when the application calls Send.
	// If false is returned, the message will be silently dropped.
	// Note that the message may be dropped for other reasons,
	// such as if backpressure is applied.
	SendHook(*Message) bool
}

ProtocolSendHook is intended to be an additional extension to the Protocol interface.

type ProtocolSocket

type ProtocolSocket interface {
	// SendChannel represents the channel used to send messages.  The
	// application injects messages to it, and the protocol consumes
	// messages from it.  The channel may be closed when the core needs to
	// create a new channel, typically after an option is set that requires
	// the channel to be reconfigured.  (OptionWriteQLen) When the protocol
	// implementation notices this, it should call this function again to obtain
	// the value of the new channel.
	SendChannel() <-chan *Message

	// RecvChannel is the channel used to receive messages.  The protocol
	// should inject messages to it, and the application will consume them
	// later.
	RecvChannel() chan<- *Message

	// The protocol can wait on this channel to close.  When it is closed,
	// it indicates that the application has closed the upper read socket,
	// and the protocol should stop any further read operations on this
	// instance.
	CloseChannel() <-chan struct{}

	// GetOption may be used by the protocol to retrieve an option from
	// the socket.  This can ultimately wind up calling into the socket's
	// own GetOption handler, so care should be used!
	GetOption(string) (interface{}, error)

	// SetOption is used by the Protocol to set an option on the socket.
	// Note that this may set transport options, or even call back down
	// into the protocol's own SetOption interface!
	SetOption(string, interface{}) error

	// SetRecvError is used to cause socket RX callers to report an
	// error.  This can be used to force an error return rather than
	// waiting for a message that will never arrive (e.g. due to state).
	// If set to nil, then RX works normally.
	SetRecvError(error)

	// SetSendError is used to cause socket TX callers to report an
	// error.  This can be used to force an error return rather than
	// waiting to send a message that will never be delivered (e.g. due
	// to incorrect state.)  If set to nil, then TX works normally.
	SetSendError(error)
}

ProtocolSocket is the "handle" given to protocols to interface with the socket. The Protocol implementation should not access any sockets or pipes except by using functions made available on the ProtocolSocket. Note that all functions listed here are non-blocking.

type Socket

type Socket interface {
	// Close closes the open Socket.  Further operations on the socket
	// will return ErrClosed.
	Close() error

	// Send puts the message on the outbound send queue.  It blocks
	// until the message can be queued, or the send deadline expires.
	// If a queued message is later dropped for any reason,
	// there will be no notification back to the application.
	Send([]byte) error

	// Recv receives a complete message.  The entire message is received.
	Recv() ([]byte, error)

	// SendMsg puts the message on the outbound send.  It works like Send,
	// but allows the caller to supply message headers.  AGAIN, the Socket
	// ASSUMES OWNERSHIP OF THE MESSAGE.
	SendMsg(*Message) error

	// RecvMsg receives a complete message, including the message header,
	// which is useful for protocols in raw mode.
	RecvMsg() (*Message, error)

	// Dial connects a remote endpoint to the Socket.  The function
	// returns immediately, and an asynchronous goroutine is started to
	// establish and maintain the connection, reconnecting as needed.
	// If the address is invalid, then an error is returned.
	Dial(addr string) error

	DialOptions(addr string, options map[string]interface{}) error

	// NewDialer returns a Dialer object which can be used to get
	// access to the underlying configuration for dialing.
	NewDialer(addr string, options map[string]interface{}) (Dialer, error)

	// Listen connects a local endpoint to the Socket.  Remote peers
	// may connect (e.g. with Dial) and will each be "connected" to
	// the Socket.  The accepter logic is run in a separate goroutine.
	// The only error possible is if the address is invalid.
	Listen(addr string) error

	ListenOptions(addr string, options map[string]interface{}) error

	NewListener(addr string, options map[string]interface{}) (Listener, error)

	// GetOption is used to retrieve an option for a socket.
	GetOption(name string) (interface{}, error)

	// SetOption is used to set an option for a socket.
	SetOption(name string, value interface{}) error

	// Protocol is used to get the underlying Protocol.
	GetProtocol() Protocol

	// AddTransport adds a new Transport to the socket.  Transport specific
	// options may have been configured on the Transport prior to this.
	AddTransport(Transport)

	// SetPortHook sets a PortHook function to be called when a Port is
	// added or removed from this socket (connect/disconnect).  The previous
	// hook is returned (nil if none.)
	SetPortHook(PortHook) PortHook
}

Socket is the main access handle applications use to access the SP system. It is an abstraction of an application's "connection" to a messaging topology. Applications can have more than one Socket open at a time.

func MakeSocket

func MakeSocket(proto Protocol) Socket

MakeSocket is intended for use by Protocol implementations. The intention is that they can wrap this to provide a "proto.NewSocket()" implementation.

type Transport

type Transport interface {
	// Scheme returns a string used as the prefix for SP "addresses".
	// This is similar to a URI scheme.  For example, schemes can be
	// "tcp" (for "tcp://xxx..."), "ipc", "inproc", etc.
	Scheme() string

	// NewDialer creates a new Dialer for this Transport.
	NewDialer(url string, sock Socket) (PipeDialer, error)

	// NewListener creates a new PipeListener for this Transport.
	// This generally also arranges for an OS-level file descriptor to be
	// opened, and bound to the the given address, as well as establishing
	// any "listen" backlog.
	NewListener(url string, sock Socket) (PipeListener, error)
}

Transport is the interface for transport suppliers to implement.

type Waiter

type Waiter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Waiter is a way to wait for completion, but it includes a timeout. It is similar in some respects to sync.WaitGroup.

func (*Waiter) Add

func (w *Waiter) Add()

Add adds a new go routine/item to wait for. This should be called before starting go routines you want to wait for, for example.

func (*Waiter) Done

func (w *Waiter) Done()

Done is called when the item to wait for is done. There should be a one to one correspondance between Add and Done. When the count drops to zero, any callers blocked in Wait() are woken. If the count drops below zero, it panics.

func (*Waiter) Init

func (w *Waiter) Init()

Init must be called to initialize the Waiter.

func (*Waiter) Wait

func (w *Waiter) Wait()

Wait waits without a timeout. It only completes when the count drops to zero.

func (*Waiter) WaitAbsTimeout

func (w *Waiter) WaitAbsTimeout(t time.Time) bool

WaitAbsTimeout is like WaitRelTimeout, but waits until an absolute time.

func (*Waiter) WaitRelTimeout

func (w *Waiter) WaitRelTimeout(d time.Duration) bool

WaitRelTimeout waits until either the count drops to zero, or the timeout expires. It returns true if the count is zero, false otherwise.

Directories

Path Synopsis
Package nanomsg is a compatibility wrapper.
Package nanomsg is a compatibility wrapper.
examples
bus
bus implements a bus example.
bus implements a bus example.
pair
pair implements a pair example.
pair implements a pair example.
pipeline
pipeline implements a one way pipe example.
pipeline implements a one way pipe example.
pubsub
pubsub implements a publish/subscribe example.
pubsub implements a publish/subscribe example.
raw
raw implements an example concurrent request/reply server, using the raw server socket.
raw implements an example concurrent request/reply server, using the raw server socket.
reqrep
reqprep implements a request/reply example.
reqprep implements a request/reply example.
survey
survey implements a survey example.
survey implements a survey example.
websocket
websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
websocket implements a simple websocket server for mangos, demonstrating how to use multiplex multiple sockets on a single HTTP server instance.
macat implements a nanocat(1) workalike command.
macat implements a nanocat(1) workalike command.
protocol
bus
Package bus implements the BUS protocol.
Package bus implements the BUS protocol.
pair
Package pair implements the PAIR protocol.
Package pair implements the PAIR protocol.
pub
Package pub implements the PUB protocol.
Package pub implements the PUB protocol.
pull
Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
Package pull implements the PULL protocol, which is the read side of the pipeline pattern.
push
Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
Package push implements the PUSH protocol, which is the write side of the pipeline pattern.
rep
Package rep implements the REP protocol, which is the response side of the request/response pattern.
Package rep implements the REP protocol, which is the response side of the request/response pattern.
req
Package req implements the REQ protocol, which is the request side of the request/response pattern.
Package req implements the REQ protocol, which is the request side of the request/response pattern.
respondent
Package respondent implements the RESPONDENT protocol.
Package respondent implements the RESPONDENT protocol.
star
Package star implements a new, experimental protocol called "STAR".
Package star implements a new, experimental protocol called "STAR".
sub
Package sub implements the SUB protocol.
Package sub implements the SUB protocol.
surveyor
Package surveyor implements the SURVEYOR protocol.
Package surveyor implements the SURVEYOR protocol.
Package test contains support code for varous mangos tests.
Package test contains support code for varous mangos tests.
transport
all
Package all is used to register all transports.
Package all is used to register all transports.
inproc
Package inproc implements an simple inproc transport for mangos.
Package inproc implements an simple inproc transport for mangos.
ipc
Package ipc implements the IPC transport on top of UNIX domain sockets.
Package ipc implements the IPC transport on top of UNIX domain sockets.
tcp
Package tcp implements the TCP transport for mangos.
Package tcp implements the TCP transport for mangos.
tlstcp
Package tlstcp implements the TLS over TCP transport for mangos.
Package tlstcp implements the TLS over TCP transport for mangos.
ws
Package ws implements a simple WebSocket transport for mangos.
Package ws implements a simple WebSocket transport for mangos.
wss
Package wss implements a secure WebSocket transport for mangos.
Package wss implements a secure WebSocket transport for mangos.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL