zmq4

package module
v4.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2025 License: BSD-3-Clause Imports: 24 Imported by: 1

README

zmq4 - LuxFi Fork

GitHub release go.dev reference CI Pure Go Tests CZMQ Tests Coverage GoDoc Go Report Card License

zmq4 is a pure-Go implementation of ØMQ (ZeroMQ), version 4.

This is the LuxFi fork of the original go-zeromq/zmq4 project, maintained for use in the Lux Network ecosystem.

See zeromq.org for more information about ZeroMQ.

Installation

go get github.com/luxfi/zmq/v4

Usage

import "github.com/luxfi/zmq/v4"

Version

This fork maintains compatibility with ZeroMQ v4.2.0 and follows Go module versioning with the /v4 suffix.

Development

This fork is maintained by the LuxFi team for use in the Lux Network consensus and networking layers.

Build Modes

This package automatically selects the implementation based on CGO:

  • CGO_ENABLED=0 (or unset): Pure Go implementation (default, no C dependencies)
  • CGO_ENABLED=1: CZMQ compatibility layer (requires libczmq)
Testing
# Run tests with pure Go implementation
CGO_ENABLED=0 go test ./...
# or simply:
make test

# Run tests with CZMQ compatibility layer
CGO_ENABLED=1 go test ./...
# or:
make test-czmq

# Run tests with verbose output
make test-verbose

# Run tests with race detector
make test-race

# Run tests with coverage
make test-cover

License

zmq4 is released under the BSD-3 license.

Documentation

Documentation for zmq4 is served by GoDoc.

Dependencies

Pure Go (Default)

By default, this package is a pure-Go implementation with no external dependencies.

Optional CZMQ Compatibility

For compatibility testing with the C implementation, this package optionally depends on:

Contributing

Contributions are welcome! Please submit issues and pull requests to the LuxFi ZMQ repository.

Original Project

This is a fork of go-zeromq/zmq4. The original project is licensed under BSD-3.

CI Status

Documentation

Overview

Package zmq4 implements the ØMQ sockets and protocol for ZeroMQ-4.

For more informations, see http://zeromq.org.

Index

Constants

View Source
const (
	CmdCancel      = "CANCEL"
	CmdError       = "ERROR"
	CmdHello       = "HELLO"
	CmdInitiate    = "INITIATE"
	CmdPing        = "PING"
	CmdPong        = "PONG"
	CmdReady       = "READY"
	CmdSubscribe   = "SUBSCRIBE"
	CmdUnsubscribe = "UNSUBSCRIBE"
	CmdWelcome     = "WELCOME"
)

ZMTP commands as per:

https://rfc.zeromq.org/spec:23/ZMTP/#commands
View Source
const (
	OptionSubscribe   = "SUBSCRIBE"
	OptionUnsubscribe = "UNSUBSCRIBE"
	OptionHWM         = "HWM"
	OptionIdentity    = "IDENTITY"
)

Socket option constants - only essential ones

View Source
const (
	DefaultSendHwm = 1000
)

Variables

View Source
var (
	ErrBadCmd   = errors.New("zmq4: invalid command name")
	ErrBadFrame = errors.New("zmq4: invalid frame")
)
View Source
var (
	ErrBadProperty = errors.New("zmq4: bad property")
)
View Source
var ErrClosedConn = errors.New("zmq4: read/write on closed connection")

Functions

func AuthAllow added in v4.2.1

func AuthAllow(domain string, addresses ...string)

AuthAllow adds allowed addresses (no-op for simplicity)

func AuthCurveAdd added in v4.2.1

func AuthCurveAdd(domain, publicKey string)

AuthCurveAdd adds a CURVE public key (no-op for simplicity)

func AuthCurvePublic added in v4.2.1

func AuthCurvePublic(secretKey string) (string, error)

AuthCurvePublic derives the public key from a secret key

func AuthCurveRemove added in v4.2.1

func AuthCurveRemove(domain, publicKey string)

AuthCurveRemove removes a CURVE public key (no-op for simplicity)

func AuthDeny added in v4.2.1

func AuthDeny(domain string, addresses ...string)

AuthDeny adds denied addresses (no-op for simplicity)

func AuthSetMetadataHandler added in v4.2.1

func AuthSetMetadataHandler(handler MetadataHandler)

AuthSetMetadataHandler sets the metadata handler (no-op for simplicity)

func AuthSetVerbose added in v4.2.1

func AuthSetVerbose(verbose bool)

AuthSetVerbose sets verbose mode (no-op)

func AuthStart added in v4.2.1

func AuthStart() error

AuthStart starts authentication (simplified)

func AuthStop added in v4.2.1

func AuthStop()

AuthStop stops authentication

func BackendName added in v4.2.1

func BackendName() string

BackendName returns the name of the current backend

func CWithID added in v4.2.2

func CWithID(id SocketIdentity) czmq4.SockOption

CWithID configures a ZeroMQ socket identity.

func IsCZMQAvailable added in v4.2.1

func IsCZMQAvailable() bool

IsCZMQAvailable returns false when using pure Go backend

func NewCurveKeypair added in v4.2.1

func NewCurveKeypair() (publicKey, secretKey string, err error)

NewCurveKeypair generates a new CURVE keypair

func Proxy

func Proxy(frontend, backend Socket) error

Proxy starts a proxy that forwards messages between frontend and backend. This is the ONLY way to create a proxy - no complex variations.

func RegisterTransport

func RegisterTransport(name string, trans transport.Transport) error

RegisterTransport registers a new transport with the zmq4 package.

func Transports

func Transports() []string

Transports returns the sorted list of currently registered transports.

func Z85decode added in v4.2.1

func Z85decode(text string) ([]byte, error)

Z85decode decodes Z85 text to binary data

func Z85encode added in v4.2.1

func Z85encode(data []byte) string

Z85encode encodes binary data to Z85 text format

Types

type Cmd

type Cmd struct {
	Name string
	Body []byte
}

Cmd is a ZMTP Cmd as per:

https://rfc.zeromq.org/spec:23/ZMTP/#formal-grammar

type Conn

type Conn struct {
	Server bool
	Meta   Metadata
	Peer   struct {
		Server bool
		Meta   Metadata
	}
	// contains filtered or unexported fields
}

Conn implements the ZeroMQ Message Transport Protocol as defined in https://rfc.zeromq.org/spec:23/ZMTP/.

func Open

func Open(rw net.Conn, sec Security, sockType SocketType, sockID SocketIdentity, server bool, onCloseErrorCB func(c *Conn)) (*Conn, error)

Open opens a ZMTP connection over rw with the given security, socket type and identity. An optional onCloseErrorCB can be provided to inform the caller when this Conn is closed. Open performs a complete ZMTP handshake.

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Closed

func (conn *Conn) Closed() bool

func (*Conn) Read

func (c *Conn) Read(p []byte) (int, error)

func (*Conn) RecvCmd

func (c *Conn) RecvCmd() (Cmd, error)

func (*Conn) RecvMsg

func (c *Conn) RecvMsg() (Msg, error)

RecvMsg receives a ZMTP message from the wire.

func (*Conn) SendCmd

func (c *Conn) SendCmd(name string, body []byte) error

SendCmd sends a ZMTP command over the wire.

func (*Conn) SendMsg

func (c *Conn) SendMsg(msg Msg) error

SendMsg sends a ZMTP message over the wire.

func (*Conn) SetClosed

func (conn *Conn) SetClosed()

func (*Conn) Write

func (c *Conn) Write(p []byte) (int, error)

type Metadata

type Metadata map[string]string

Metadata is describing a Conn's metadata information.

func (Metadata) MarshalZMTP

func (md Metadata) MarshalZMTP() ([]byte, error)

MarshalZMTP marshals MetaData to ZMTP encoded data.

func (*Metadata) UnmarshalZMTP

func (md *Metadata) UnmarshalZMTP(p []byte) error

UnmarshalZMTP unmarshals MetaData from a ZMTP encoded data.

type MetadataHandler added in v4.2.1

type MetadataHandler func(domain, address string) map[string]string

MetadataHandler simplified

type Msg

type Msg struct {
	Frames [][]byte
	Type   MsgType
	// contains filtered or unexported fields
}

Msg is a ZMTP message, possibly composed of multiple frames.

func NewMsg

func NewMsg(frame []byte) Msg

func NewMsgFrom

func NewMsgFrom(frames ...[]byte) Msg

func NewMsgFromString

func NewMsgFromString(frames []string) Msg

func NewMsgString

func NewMsgString(frame string) Msg

func (Msg) Bytes

func (msg Msg) Bytes() []byte

Bytes returns the concatenated content of all its frames.

func (Msg) Clone

func (msg Msg) Clone() Msg

func (Msg) Err

func (msg Msg) Err() error

func (Msg) String

func (msg Msg) String() string

type MsgType

type MsgType byte
const (
	UsrMsg MsgType = 0
	CmdMsg MsgType = 1
)

type Option

type Option func(s *socket)

Option configures a ZeroMQ socket - ONE way to configure

func WithAutomaticReconnect

func WithAutomaticReconnect(auto bool) Option

WithAutomaticReconnect is a no-op for compatibility

func WithDialerMaxRetries

func WithDialerMaxRetries(maxRetries int) Option

WithDialerMaxRetries is a no-op for compatibility

func WithDialerRetry

func WithDialerRetry(retry time.Duration) Option

WithDialerRetry is a no-op for compatibility

func WithID

func WithID(id SocketIdentity) Option

WithID sets socket identity

func WithLogger

func WithLogger(logger interface{}) Option

WithLogger is a no-op for compatibility

func WithSecurity

func WithSecurity(sec Security) Option

WithSecurity sets security mechanism (default: NULL)

func WithTimeout

func WithTimeout(timeout time.Duration) Option

WithTimeout sets socket timeout

type Property

type Property struct {
	K string
	V string
}

Property describes a Conn metadata's entry. The on-wire respresentation of Property is specified by:

https://rfc.zeromq.org/spec:23/ZMTP/

func (Property) Read

func (prop Property) Read(data []byte) (n int, err error)

func (*Property) Write

func (prop *Property) Write(data []byte) (n int, err error)

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func NewQueue

func NewQueue() *Queue

func (*Queue) Init

func (q *Queue) Init()

func (*Queue) Len

func (q *Queue) Len() int

func (*Queue) Peek

func (q *Queue) Peek() (Msg, bool)

func (*Queue) Pop

func (q *Queue) Pop()

func (*Queue) Push

func (q *Queue) Push(val Msg)

type Security

type Security interface {
	// Type returns the security mechanism type.
	Type() SecurityType

	// Handshake implements the ZMTP security handshake according to
	// this security mechanism.
	// see:
	//  https://rfc.zeromq.org/spec:23/ZMTP/
	//  https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
	//  https://rfc.zeromq.org/spec:25/ZMTP-CURVE/
	Handshake(conn *Conn, server bool) error

	// Encrypt writes the encrypted form of data to w.
	Encrypt(w io.Writer, data []byte) (int, error)

	// Decrypt writes the decrypted form of data to w.
	Decrypt(w io.Writer, data []byte) (int, error)
}

Security is an interface for ZMTP security mechanisms

type SecurityType

type SecurityType string

SecurityType denotes types of ZMTP security mechanisms

const (
	// NullSecurityType is an empty security mechanism
	// that does no authentication nor encryption.
	NullSecurity SecurityType = "NULL"

	// PlainSecurity is a security mechanism that uses
	// plaintext passwords. It is a reference implementation and
	// should not be used to anything important.
	PlainSecurity SecurityType = "PLAIN"

	// CurveSecurity uses ZMQ_CURVE for authentication
	// and encryption.
	CurveSecurity SecurityType = "CURVE"
)

type Socket

type Socket interface {
	// Close closes the open Socket.
	Close() error

	// Send puts the message on the outbound send queue.
	//
	// Send blocks until the message can be queued or the send deadline expires.
	Send(msg Msg) error

	// SendMulti puts the message on the outbound send queue.
	//
	// SendMulti blocks until the message can be queued or the send deadline
	// expires. The message will be sent as a multipart message.
	SendMulti(msg Msg) error

	// Recv receives a complete message.
	Recv() (Msg, error)

	// Listen connects a local endpoint to the Socket.
	//
	// In ZeroMQ's terminology, it binds.
	Listen(ep string) error

	// Dial connects a remote endpoint to the Socket.
	//
	// In ZeroMQ's terminology, it connects.
	Dial(ep string) error

	// Type returns the type of this Socket (for example PUB, SUB, etc.)
	Type() SocketType

	// Addr returns the listener's address. It returns nil if the socket isn't a
	// listener.
	Addr() net.Addr

	// GetOption retrieves an option for a socket.
	GetOption(name string) (interface{}, error)

	// SetOption sets an option for a socket.
	SetOption(name string, value interface{}) error
}

Socket represents a ZeroMQ socket.

func NewCDealer added in v4.2.2

func NewCDealer(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCPair added in v4.2.2

func NewCPair(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCPub added in v4.2.2

func NewCPub(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCPull added in v4.2.2

func NewCPull(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCPush added in v4.2.2

func NewCPush(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCRep added in v4.2.2

func NewCRep(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCReq added in v4.2.2

func NewCReq(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCRouter added in v4.2.2

func NewCRouter(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCStream added in v4.2.2

func NewCStream(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCSub added in v4.2.2

func NewCSub(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCXPub added in v4.2.2

func NewCXPub(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewCXSub added in v4.2.2

func NewCXSub(ctx context.Context, opts ...czmq4.SockOption) Socket

func NewDealer

func NewDealer(ctx context.Context, opts ...Option) Socket

NewDealer returns a new DEALER ZeroMQ socket. The returned socket value is initially unbound.

func NewPair

func NewPair(ctx context.Context, opts ...Option) Socket

NewPair returns a new PAIR ZeroMQ socket. The returned socket value is initially unbound.

func NewPub

func NewPub(ctx context.Context, opts ...Option) Socket

NewPub returns a new PUB ZeroMQ socket. The returned socket value is initially unbound.

func NewPull

func NewPull(ctx context.Context, opts ...Option) Socket

NewPull returns a new PULL ZeroMQ socket. The returned socket value is initially unbound.

func NewPush

func NewPush(ctx context.Context, opts ...Option) Socket

NewPush returns a new PUSH ZeroMQ socket. The returned socket value is initially unbound.

func NewRep

func NewRep(ctx context.Context, opts ...Option) Socket

NewRep returns a new REP ZeroMQ socket. The returned socket value is initially unbound.

func NewReq

func NewReq(ctx context.Context, opts ...Option) Socket

NewReq returns a new REQ ZeroMQ socket. The returned socket value is initially unbound.

func NewRouter

func NewRouter(ctx context.Context, opts ...Option) Socket

NewRouter returns a new ROUTER ZeroMQ socket. The returned socket value is initially unbound.

func NewStream added in v4.2.1

func NewStream(ctx context.Context, opts ...Option) Socket

NewStream returns a new STREAM ZeroMQ socket. The returned socket value is initially unbound. STREAM sockets are used to send and receive TCP data from a non-ZeroMQ peer when using the tcp:// transport.

func NewSub

func NewSub(ctx context.Context, opts ...Option) Socket

NewSub returns a new SUB ZeroMQ socket. The returned socket value is initially unbound.

func NewXPub

func NewXPub(ctx context.Context, opts ...Option) Socket

NewXPub returns a new XPUB ZeroMQ socket. The returned socket value is initially unbound.

func NewXSub

func NewXSub(ctx context.Context, opts ...Option) Socket

NewXSub returns a new XSUB ZeroMQ socket. The returned socket value is initially unbound.

type SocketIdentity

type SocketIdentity []byte

SocketIdentity is the ZMTP metadata socket identity. See:

https://rfc.zeromq.org/spec:23/ZMTP/.

func (SocketIdentity) String

func (id SocketIdentity) String() string

type SocketType

type SocketType string

SocketType is a ZeroMQ socket type.

const (
	Pair   SocketType = "PAIR"   // a ZMQ_PAIR socket
	Pub    SocketType = "PUB"    // a ZMQ_PUB socket
	Sub    SocketType = "SUB"    // a ZMQ_SUB socket
	Req    SocketType = "REQ"    // a ZMQ_REQ socket
	Rep    SocketType = "REP"    // a ZMQ_REP socket
	Dealer SocketType = "DEALER" // a ZMQ_DEALER socket
	Router SocketType = "ROUTER" // a ZMQ_ROUTER socket
	Pull   SocketType = "PULL"   // a ZMQ_PULL socket
	Push   SocketType = "PUSH"   // a ZMQ_PUSH socket
	XPub   SocketType = "XPUB"   // a ZMQ_XPUB socket
	XSub   SocketType = "XSUB"   // a ZMQ_XSUB socket
	Stream SocketType = "STREAM" // a ZMQ_STREAM socket
)

func (SocketType) IsCompatible

func (sck SocketType) IsCompatible(peer SocketType) bool

IsCompatible checks whether two sockets are compatible and thus can be connected together. See https://rfc.zeromq.org/spec:23/ZMTP/ for more informations.

type Topics

type Topics interface {
	// Topics returns the sorted list of topics a socket is subscribed to.
	Topics() []string
}

Topics is an interface that wraps the basic Topics method.

type UnknownTransportError

type UnknownTransportError struct {
	Name string
}

UnknownTransportError records an error when trying to use an unknown transport.

func (UnknownTransportError) Error

func (ute UnknownTransportError) Error() string

Directories

Path Synopsis
internal
errgroup
Package errgroup is bit more advanced than golang.org/x/sync/errgroup.
Package errgroup is bit more advanced than golang.org/x/sync/errgroup.
inproc
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns.
Package inproc provides tools to implement an in-process asynchronous pipe of net.Conns.
Package networking provides high-level ZMQ4 networking primitives for distributed systems communication.
Package networking provides high-level ZMQ4 networking primitives for distributed systems communication.
security
null
Package null provides the ZeroMQ NULL security mechanism
Package null provides the ZeroMQ NULL security mechanism
plain
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
Package plain provides the ZeroMQ PLAIN security mechanism as specified by: https://rfc.zeromq.org/spec:24/ZMTP-PLAIN/
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages.
Package transport defines the Transport interface and provides a net-based implementation that can be used by zmq4 sockets to exchange messages.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL