zmq4

package
v0.0.0-...-90932cc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2015 License: BSD-2-Clause, MIT Imports: 9 Imported by: 0

README

A Go interface to ZeroMQ version 4.

Warning: API for ZeroMQ 4.1.0 specific features may change until 4.1.0 is officially released

This requires ZeroMQ version 4.0.1 or above. To use CURVE security, ZeroMQ must be installed with libsodium enabled.

For ZeroMQ version 3, see: http://github.com/pebbe/zmq3

For ZeroMQ version 2, see: http://github.com/pebbe/zmq2

Including all examples of ØMQ - The Guide.

Keywords: zmq, zeromq, 0mq, networks, distributed computing, message passing, fanout, pubsub, pipeline, request-reply

See also

  • goczmq - A Go interface to CZMQ

Install

go get github.com/pebbe/zmq4

Docs

API change

There has been an API change in commit 0bc5ab465849847b0556295d9a2023295c4d169e of 2014-06-27, 10:17:55 UTC in the functions AuthAllow and AuthDeny.

Old:

func AuthAllow(addresses ...string)
func AuthDeny(addresses ...string)

New:

func AuthAllow(domain string, addresses ...string)
func AuthDeny(domain string, addresses ...string)

If domain can be parsed as an IP address, it will be interpreted as such, and it and all remaining addresses are added to all domains.

So this should still work as before:

zmq.AuthAllow("127.0.0.1", "123.123.123.123")

But this won't compile:

a := []string{"127.0.0.1", "123.123.123.123"}
zmq.AuthAllow(a...)

And needs to be rewritten as:

a := []string{"127.0.0.1", "123.123.123.123"}
zmq.AuthAllow("*", a...)

Furthermore, an address can now be a single IP address, as well as an IP address and mask in CIDR notation, e.g. "123.123.123.0/24".

Documentation

Overview

A Go interface to ZeroMQ (zmq, 0mq) version 4.

WARNING: API for ZeroMQ 4.1.0 specific features may change until 4.1.0 is officially released

For ZeroMQ version 3, see: http://github.com/pebbe/zmq3

For ZeroMQ version 2, see: http://github.com/pebbe/zmq2

http://www.zeromq.org/

See also the wiki: https://github.com/pebbe/zmq4/wiki

A note on the use of a context:

This package provides a default context. This is what will be used by the functions without a context receiver, that create a socket or manipulate the context. Package developers that import this package should probably not use the default context with its associated functions, but create their own context(s). See: type Context.

Example (Multiple_contexts)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
	"time"
)

func main() {
	chQuit := make(chan interface{})
	chReactor := make(chan bool)

	addr1 := "tcp://127.0.0.1:9997"
	addr2 := "tcp://127.0.0.1:9998"

	serv_ctx1, err := zmq.NewContext()
	if checkErr(err) {
		return
	}
	serv1, err := serv_ctx1.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}
	err = serv1.Bind(addr1)
	if checkErr(err) {
		return
	}
	defer func() {
		serv1.Close()
		serv_ctx1.Term()
	}()

	serv_ctx2, err := zmq.NewContext()
	if checkErr(err) {
		return
	}
	serv2, err := serv_ctx2.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}
	err = serv2.Bind(addr2)
	if checkErr(err) {
		return
	}
	defer func() {
		serv2.Close()
		serv_ctx2.Term()
	}()

	new_service := func(sock *zmq.Socket, addr string) {
		socket_handler := func(state zmq.State) error {
			msg, err := sock.RecvMessage(0)
			if checkErr(err) {
				return err
			}
			_, err = sock.SendMessage(addr, msg)
			if checkErr(err) {
				return err
			}
			return nil
		}
		quit_handler := func(interface{}) error {
			return errors.New("quit")
		}

		defer func() {
			chReactor <- true
		}()

		reactor := zmq.NewReactor()
		reactor.AddSocket(sock, zmq.POLLIN, socket_handler)
		reactor.AddChannel(chQuit, 1, quit_handler)
		err = reactor.Run(100 * time.Millisecond)
		fmt.Println(err)
	}

	go new_service(serv1, addr1)
	go new_service(serv2, addr2)

	time.Sleep(time.Second)

	// default context

	sock1, err := zmq.NewSocket(zmq.REQ)
	if checkErr(err) {
		return
	}
	sock2, err := zmq.NewSocket(zmq.REQ)
	if checkErr(err) {
		return
	}
	err = sock1.Connect(addr1)
	if checkErr(err) {
		return
	}
	err = sock2.Connect(addr2)
	if checkErr(err) {
		return
	}
	_, err = sock1.SendMessage(addr1)
	if checkErr(err) {
		return
	}
	_, err = sock2.SendMessage(addr2)
	if checkErr(err) {
		return
	}
	msg, err := sock1.RecvMessage(0)
	fmt.Println(err, msg)
	msg, err = sock2.RecvMessage(0)
	fmt.Println(err, msg)
	err = sock1.Close()
	if checkErr(err) {
		return
	}
	err = sock2.Close()
	if checkErr(err) {
		return
	}

	// non-default contexts

	ctx1, err := zmq.NewContext()
	if checkErr(err) {
		return
	}
	ctx2, err := zmq.NewContext()
	if checkErr(err) {
		return
	}
	sock1, err = ctx1.NewSocket(zmq.REQ)
	if checkErr(err) {
		return
	}
	sock2, err = ctx2.NewSocket(zmq.REQ)
	if checkErr(err) {
		return
	}
	err = sock1.Connect(addr1)
	if checkErr(err) {
		return
	}
	err = sock2.Connect(addr2)
	if checkErr(err) {
		return
	}
	_, err = sock1.SendMessage(addr1)
	if checkErr(err) {
		return
	}
	_, err = sock2.SendMessage(addr2)
	if checkErr(err) {
		return
	}
	msg, err = sock1.RecvMessage(0)
	fmt.Println(err, msg)
	msg, err = sock2.RecvMessage(0)
	fmt.Println(err, msg)
	err = sock1.Close()
	if checkErr(err) {
		return
	}
	err = sock2.Close()
	if checkErr(err) {
		return
	}

	err = ctx1.Term()
	if checkErr(err) {
		return
	}
	err = ctx2.Term()
	if checkErr(err) {
		return
	}

	// close(chQuit) doesn't work because the reactor removes closed channels, instead of acting on them
	chQuit <- true
	<-chReactor
	chQuit <- true
	<-chReactor

	fmt.Println("Done")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

<nil> [tcp://127.0.0.1:9997 tcp://127.0.0.1:9997]
<nil> [tcp://127.0.0.1:9998 tcp://127.0.0.1:9998]
<nil> [tcp://127.0.0.1:9997 tcp://127.0.0.1:9997]
<nil> [tcp://127.0.0.1:9998 tcp://127.0.0.1:9998]
quit
quit
Done
Example (Socket_event)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"time"
)

func rep_socket_monitor(addr string) {
	s, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}
	defer s.Close()
	err = s.Connect(addr)
	if checkErr(err) {
		return
	}
	for {
		a, b, _, err := s.RecvEvent(0)
		if checkErr(err) {
			break
		}
		fmt.Println(a, b)
		if a == zmq.EVENT_CLOSED {
			break
		}
	}
}

func main() {

	// REP socket
	rep, err := zmq.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}

	// REP socket monitor, all events
	err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL)
	if checkErr(err) {
		rep.Close()
		return
	}
	go rep_socket_monitor("inproc://monitor.rep")
	time.Sleep(time.Second)

	// Generate an event
	rep.Bind("tcp://*:9689")
	if checkErr(err) {
		rep.Close()
		return
	}

	rep.Close()

	// Allow some time for event detection
	time.Sleep(time.Second)

	fmt.Println("Done")
}
Output:

EVENT_LISTENING tcp://0.0.0.0:9689
EVENT_CLOSED tcp://0.0.0.0:9689
Done
Example (Test_abstract_ipc)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
)

func main() {

	addr := "ipc://@/tmp/tester"

	// This is Linux only
	if runtime.GOOS != "linux" {
		fmt.Printf("%q\n", addr)
		fmt.Println("Done")
		return
	}

	sb, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}

	err = sb.Bind(addr)
	if checkErr(err) {
		return
	}

	endpoint, err := sb.GetLastEndpoint()
	if checkErr(err) {
		return
	}
	fmt.Printf("%q\n", endpoint)

	sc, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}
	err = sc.Connect(addr)
	if checkErr(err) {
		return
	}

	bounce(sb, sc)

	err = sc.Close()
	if checkErr(err) {
		return
	}

	err = sb.Close()
	if checkErr(err) {
		return
	}

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

"ipc://@/tmp/tester"
Done
Example (Test_conflate)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
	"strconv"
	"time"
)

func main() {

	bind_to := "tcp://127.0.0.1:5555"

	err := zmq.SetIoThreads(1)
	if checkErr(err) {
		return
	}

	s_in, err := zmq.NewSocket(zmq.PULL)
	if checkErr(err) {
		return
	}

	err = s_in.SetConflate(true)
	if checkErr(err) {
		return
	}

	err = s_in.Bind(bind_to)
	if checkErr(err) {
		return
	}

	s_out, err := zmq.NewSocket(zmq.PUSH)
	if checkErr(err) {
		return
	}

	err = s_out.Connect(bind_to)
	if checkErr(err) {
		return
	}

	message_count := 20

	for j := 0; j < message_count; j++ {
		_, err = s_out.Send(fmt.Sprint(j), 0)
		if checkErr(err) {
			return
		}
	}

	time.Sleep(time.Second)

	payload_recved, err := s_in.Recv(0)
	if checkErr(err) {
		return
	}
	i, err := strconv.Atoi(payload_recved)
	if checkErr(err) {
		return
	}
	if i != message_count-1 {
		checkErr(errors.New("payload_recved != message_count - 1"))
		return
	}

	err = s_in.Close()
	if checkErr(err) {
		return
	}

	err = s_out.Close()
	if checkErr(err) {
		return
	}

	fmt.Println("Done")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

Done
Example (Test_connect_resolve)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"runtime"
)

func main() {

	sock, err := zmq.NewSocket(zmq.PUB)
	if checkErr(err) {
		return
	}

	err = sock.Connect("tcp://localhost:1234")
	checkErr(err)

	err = sock.Connect("tcp://localhost:invalid")
	fmt.Println(err)

	err = sock.Connect("tcp://in val id:1234")
	fmt.Println(err)

	err = sock.Connect("invalid://localhost:1234")
	fmt.Println(err)

	err = sock.Close()
	checkErr(err)

	fmt.Println("Done")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

invalid argument
invalid argument
protocol not supported
Done
Example (Test_ctx_options)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
)

func main() {

	i, err := zmq.GetMaxSockets()
	fmt.Println(i == zmq.MaxSocketsDflt, err)
	i, err = zmq.GetIoThreads()
	fmt.Println(i == zmq.IoThreadsDflt, err)
	b, err := zmq.GetIpv6()
	fmt.Println(b, err)

	zmq.SetIpv6(true)
	b, err = zmq.GetIpv6()
	fmt.Println(b, err)

	router, _ := zmq.NewSocket(zmq.ROUTER)
	b, err = router.GetIpv6()
	fmt.Println(b, err)

	fmt.Println(router.Close())

	zmq.SetIpv6(false)

	fmt.Println("Done")
}
Output:

true <nil>
true <nil>
false <nil>
true <nil>
true <nil>
<nil>
Done
Example (Test_disconnect_inproc)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"runtime"
	"time"
)

func main() {

	publicationsReceived := 0
	isSubscribed := false

	pubSocket, err := zmq.NewSocket(zmq.XPUB)
	if checkErr(err) {
		return
	}
	subSocket, err := zmq.NewSocket(zmq.SUB)
	if checkErr(err) {
		return
	}
	err = subSocket.SetSubscribe("foo")
	if checkErr(err) {
		return
	}

	err = pubSocket.Bind("inproc://someInProcDescriptor")
	if checkErr(err) {
		return
	}

	iteration := 0

	poller := zmq.NewPoller()
	poller.Add(subSocket, zmq.POLLIN) // read publications
	poller.Add(pubSocket, zmq.POLLIN) // read subscriptions
	for {
		sockets, err := poller.Poll(100 * time.Millisecond)
		if checkErr(err) {
			break //  Interrupted
		}

		for _, socket := range sockets {
			if socket.Socket == pubSocket {
				for {
					buffer, err := pubSocket.Recv(0)
					if checkErr(err) {
						return
					}
					fmt.Printf("pubSocket: %q\n", buffer)

					if buffer[0] == 0 {
						fmt.Println("pubSocket, isSubscribed == true:", isSubscribed == true)
						isSubscribed = false
					} else {
						fmt.Println("pubSocket, isSubscribed == false:", isSubscribed == false)
						isSubscribed = true
					}

					more, err := pubSocket.GetRcvmore()
					if checkErr(err) {
						return
					}
					if !more {
						break //  Last message part
					}
				}
				break
			}
		}

		for _, socket := range sockets {
			if socket.Socket == subSocket {
				for {
					msg, err := subSocket.Recv(0)
					if checkErr(err) {
						return
					}
					fmt.Printf("subSocket: %q\n", msg)
					more, err := subSocket.GetRcvmore()
					if checkErr(err) {
						return
					}
					if !more {
						publicationsReceived++
						break //  Last message part
					}

				}
				break
			}
		}

		if iteration == 1 {
			err := subSocket.Connect("inproc://someInProcDescriptor")
			checkErr(err)
		}
		if iteration == 4 {
			err := subSocket.Disconnect("inproc://someInProcDescriptor")
			checkErr(err)
		}
		if iteration > 4 && len(sockets) == 0 {
			break
		}

		_, err = pubSocket.Send("foo", zmq.SNDMORE)
		checkErr(err)
		_, err = pubSocket.Send("this is foo!", 0)
		checkErr(err)

		iteration++

	}

	fmt.Println("publicationsReceived == 3:", publicationsReceived == 3)
	fmt.Println("!isSubscribed:", !isSubscribed)

	err = pubSocket.Close()
	checkErr(err)
	err = subSocket.Close()
	checkErr(err)

	fmt.Println("Done")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

pubSocket: "\x01foo"
pubSocket, isSubscribed == false: true
subSocket: "foo"
subSocket: "this is foo!"
subSocket: "foo"
subSocket: "this is foo!"
subSocket: "foo"
subSocket: "this is foo!"
pubSocket: "\x00foo"
pubSocket, isSubscribed == true: true
publicationsReceived == 3: true
!isSubscribed: true
Done
Example (Test_fork)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"runtime"
)

func main() {

	address := "tcp://127.0.0.1:6571"
	NUM_MESSAGES := 5

	//  Create and bind pull socket to receive messages
	pull, err := zmq.NewSocket(zmq.PULL)
	if checkErr(err) {
		return
	}
	err = pull.Bind(address)
	if checkErr(err) {
		return
	}

	done := make(chan bool)

	go func() {
		defer func() { done <- true }()

		//  Create new socket, connect and send some messages

		push, err := zmq.NewSocket(zmq.PUSH)
		if checkErr(err) {
			return
		}
		defer func() {
			err := push.Close()
			checkErr(err)
		}()

		err = push.Connect(address)
		if checkErr(err) {
			return
		}

		for count := 0; count < NUM_MESSAGES; count++ {
			_, err = push.Send("Hello", 0)
			if checkErr(err) {
				return
			}
		}

	}()

	for count := 0; count < NUM_MESSAGES; count++ {
		msg, err := pull.Recv(0)
		fmt.Printf("%q %v\n", msg, err)
	}

	err = pull.Close()
	checkErr(err)

	<-done

	fmt.Println("Done")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

"Hello" <nil>
"Hello" <nil>
"Hello" <nil>
"Hello" <nil>
"Hello" <nil>
Done
Example (Test_hwm)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"runtime"
	"time"
)

func main() {

	MAX_SENDS := 10000
	BIND_FIRST := 1
	CONNECT_FIRST := 2

	test_defaults := func() int {

		// Set up bind socket
		bind_socket, err := zmq.NewSocket(zmq.PULL)
		if checkErr(err) {
			return 0
		}
		defer func() {
			err := bind_socket.Close()
			checkErr(err)
		}()

		err = bind_socket.Bind("inproc://a")
		if checkErr(err) {
			return 0
		}

		// Set up connect socket
		connect_socket, err := zmq.NewSocket(zmq.PUSH)
		if checkErr(err) {
			return 0
		}
		defer func() {
			err := connect_socket.Close()
			checkErr(err)
		}()

		err = connect_socket.Connect("inproc://a")
		if checkErr(err) {
			return 0
		}

		// Send until we block
		send_count := 0
		for send_count < MAX_SENDS {
			_, err := connect_socket.Send("", zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Send:", err)
				break
			}
			send_count++
		}

		// Now receive all sent messages
		recv_count := 0
		for {
			_, err := bind_socket.Recv(zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Recv:", err)
				break
			}
			recv_count++
		}
		fmt.Println("send_count == recv_count:", send_count == recv_count)

		return send_count
	}

	count_msg := func(send_hwm, recv_hwm, testType int) int {

		var bind_socket, connect_socket *zmq.Socket
		var err error

		if testType == BIND_FIRST {
			// Set up bind socket
			bind_socket, err = zmq.NewSocket(zmq.PULL)
			if checkErr(err) {
				return 0
			}
			defer func() {
				err := bind_socket.Close()
				checkErr(err)
			}()

			err = bind_socket.SetRcvhwm(recv_hwm)
			if checkErr(err) {
				return 0
			}

			err = bind_socket.Bind("inproc://a")
			if checkErr(err) {
				return 0
			}

			// Set up connect socket
			connect_socket, err = zmq.NewSocket(zmq.PUSH)
			if checkErr(err) {
				return 0
			}
			defer func() {
				err := connect_socket.Close()
				checkErr(err)
			}()

			err = connect_socket.SetSndhwm(send_hwm)
			if checkErr(err) {
				return 0
			}

			err = connect_socket.Connect("inproc://a")
			if checkErr(err) {
				return 0
			}
		} else {
			// Set up connect socket
			connect_socket, err = zmq.NewSocket(zmq.PUSH)
			if checkErr(err) {
				return 0
			}
			defer func() {
				err := connect_socket.Close()
				checkErr(err)
			}()

			err = connect_socket.SetSndhwm(send_hwm)
			if checkErr(err) {
				return 0
			}

			err = connect_socket.Connect("inproc://a")
			if checkErr(err) {
				return 0
			}

			// Set up bind socket
			bind_socket, err = zmq.NewSocket(zmq.PULL)
			if checkErr(err) {
				return 0
			}
			defer func() {
				err := bind_socket.Close()
				checkErr(err)
			}()

			err = bind_socket.SetRcvhwm(recv_hwm)
			if checkErr(err) {
				return 0
			}

			err = bind_socket.Bind("inproc://a")
			if checkErr(err) {
				return 0
			}
		}

		// Send until we block
		send_count := 0
		for send_count < MAX_SENDS {
			_, err := connect_socket.Send("", zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Send:", err)
				break
			}
			send_count++
		}

		// Now receive all sent messages
		recv_count := 0
		for {
			_, err := bind_socket.Recv(zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Recv:", err)
				break
			}
			recv_count++
		}
		fmt.Println("send_count == recv_count:", send_count == recv_count)

		// Now it should be possible to send one more.
		_, err = connect_socket.Send("", 0)
		if checkErr(err) {
			return 0
		}

		//  Consume the remaining message.
		_, err = bind_socket.Recv(0)
		checkErr(err)

		return send_count
	}

	test_inproc_bind_first := func(send_hwm, recv_hwm int) int {
		return count_msg(send_hwm, recv_hwm, BIND_FIRST)
	}

	test_inproc_connect_first := func(send_hwm, recv_hwm int) int {
		return count_msg(send_hwm, recv_hwm, CONNECT_FIRST)
	}

	test_inproc_connect_and_close_first := func(send_hwm, recv_hwm int) int {

		// Set up connect socket
		connect_socket, err := zmq.NewSocket(zmq.PUSH)
		if checkErr(err) {
			return 0
		}

		err = connect_socket.SetSndhwm(send_hwm)
		if checkErr(err) {
			connect_socket.Close()
			return 0
		}

		err = connect_socket.Connect("inproc://a")
		if checkErr(err) {
			connect_socket.Close()
			return 0
		}

		// Send until we block
		send_count := 0
		for send_count < MAX_SENDS {
			_, err := connect_socket.Send("", zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Send:", err)
				break
			}
			send_count++
		}

		// Close connect
		err = connect_socket.Close()
		if checkErr(err) {
			return 0
		}

		// Set up bind socket
		bind_socket, err := zmq.NewSocket(zmq.PULL)
		if checkErr(err) {
			return 0
		}
		defer func() {
			err := bind_socket.Close()
			checkErr(err)
		}()

		err = bind_socket.SetRcvhwm(recv_hwm)
		if checkErr(err) {
			return 0
		}

		err = bind_socket.Bind("inproc://a")
		if checkErr(err) {
			return 0
		}

		// Now receive all sent messages
		recv_count := 0
		for {
			_, err := bind_socket.Recv(zmq.DONTWAIT)
			if err != nil {
				fmt.Println("Recv:", err)
				break
			}
			recv_count++
		}
		fmt.Println("send_count == recv_count:", send_count == recv_count)

		return send_count
	}

	// Default values are 1000 on send and 1000 one receive, so 2000 total
	fmt.Println("Default values")
	count := test_defaults()
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	// Infinite send and receive buffer
	fmt.Println("\nInfinite send and receive")
	count = test_inproc_bind_first(0, 0)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)
	count = test_inproc_connect_first(0, 0)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	// Infinite send buffer
	fmt.Println("\nInfinite send buffer")
	count = test_inproc_bind_first(1, 0)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)
	count = test_inproc_connect_first(1, 0)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	// Infinite receive buffer
	fmt.Println("\nInfinite receive buffer")
	count = test_inproc_bind_first(0, 1)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)
	count = test_inproc_connect_first(0, 1)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	// Send and recv buffers hwm 1, so total that can be queued is 2
	fmt.Println("\nSend and recv buffers hwm 1")
	count = test_inproc_bind_first(1, 1)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)
	count = test_inproc_connect_first(1, 1)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	// Send hwm of 1, send before bind so total that can be queued is 1
	fmt.Println("\nSend hwm of 1, send before bind")
	count = test_inproc_connect_and_close_first(1, 0)
	fmt.Println("count:", count)
	time.Sleep(100 * time.Millisecond)

	fmt.Println("\nDone")
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

Default values
Send: resource temporarily unavailable
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 2000

Infinite send and receive
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000

Infinite send buffer
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000

Infinite receive buffer
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 10000

Send and recv buffers hwm 1
Send: resource temporarily unavailable
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 2
Send: resource temporarily unavailable
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 2

Send hwm of 1, send before bind
Send: resource temporarily unavailable
Recv: resource temporarily unavailable
send_count == recv_count: true
count: 1

Done
Example (Test_pair_ipc)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
)

func main() {

	sb, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}

	err = sb.Bind("ipc:///tmp/tester")
	if checkErr(err) {
		return
	}

	sc, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}

	err = sc.Connect("ipc:///tmp/tester")
	if checkErr(err) {
		return
	}

	bounce(sb, sc)

	err = sc.Close()
	if checkErr(err) {
		return
	}

	err = sb.Close()
	if checkErr(err) {
		return
	}

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

Done
Example (Test_pair_tcp)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
)

func main() {

	sb, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}

	err = sb.Bind("tcp://127.0.0.1:9736")
	if checkErr(err) {
		return
	}

	sc, err := zmq.NewSocket(zmq.PAIR)
	if checkErr(err) {
		return
	}

	err = sc.Connect("tcp://127.0.0.1:9736")
	if checkErr(err) {
		return
	}

	bounce(sb, sc)

	err = sc.Close()
	if checkErr(err) {
		return
	}

	err = sb.Close()
	if checkErr(err) {
		return
	}

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

Done
Example (Test_remote_endpoint)
if _, minor, _ := zmq.Version(); minor < 1 {
	fmt.Println("127.0.0.1")
	fmt.Println("Done")
	return
}

addr := "tcp://127.0.0.1:9560"

rep, err := zmq.NewSocket(zmq.REP)
if checkErr(err) {
	return
}
req, err := zmq.NewSocket(zmq.REQ)
if checkErr(err) {
	rep.Close()
	return
}

err = rep.Bind(addr)
if checkErr(err) {
	rep.Close()
	req.Close()
	return
}
err = req.Connect(addr)
if checkErr(err) {
	rep.Close()
	req.Close()
	return
}

tmp := "test"
_, err = req.Send(tmp, 0)
if checkErr(err) {
	rep.Close()
	req.Close()
	return
}

// get message with remote endpoint
msg, props, err := rep.RecvWithMetadata(0, "Remote-Endpoint")
if checkErr(err) {
	rep.Close()
	req.Close()
	return
}
if msg != tmp {
	fmt.Println(tmp, "!=", msg)
}

fmt.Println(props["Peer-Address"])

err = rep.Close()
if checkErr(err) {
	req.Close()
	return
}

err = req.Close()
if checkErr(err) {
	return
}

fmt.Println("Done")
Output:

127.0.0.1
Done
Example (Test_security_curve)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
	"time"
)

func main() {

	time.Sleep(100 * time.Millisecond)

	//  Generate new keypairs for this test
	client_public, client_secret, err := zmq.NewCurveKeypair()
	if checkErr(err) {
		return
	}
	server_public, server_secret, err := zmq.NewCurveKeypair()
	if checkErr(err) {
		return
	}

	handler, err := zmq.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}
	err = handler.Bind("inproc://zeromq.zap.01")
	if checkErr(err) {
		return
	}

	doHandler := func(state zmq.State) error {
		msg, err := handler.RecvMessage(0)
		if err != nil {
			return err //  Terminating
		}
		version := msg[0]
		sequence := msg[1]
		// domain := msg[2]
		// address := msg[3]
		identity := msg[4]
		mechanism := msg[5]
		client_key := msg[6]
		client_key_text := zmq.Z85encode(client_key)

		if version != "1.0" {
			return errors.New("version != 1.0")
		}
		if mechanism != "CURVE" {
			return errors.New("mechanism != CURVE")
		}
		if identity != "IDENT" {
			return errors.New("identity != IDENT")
		}

		if client_key_text == client_public {
			handler.SendMessage(version, sequence, "200", "OK", "anonymous", "")
		} else {
			handler.SendMessage(version, sequence, "400", "Invalid client public key", "", "")
		}
		return nil
	}

	doQuit := func(i interface{}) error {
		err := handler.Close()
		checkErr(err)
		fmt.Println("Handler closed")
		return errors.New("Quit")
	}
	quit := make(chan interface{})

	reactor := zmq.NewReactor()
	reactor.AddSocket(handler, zmq.POLLIN, doHandler)
	reactor.AddChannel(quit, 0, doQuit)
	go func() {
		reactor.Run(100 * time.Millisecond)
		fmt.Println("Reactor finished")
		quit <- true
	}()
	defer func() {
		quit <- true
		<-quit
		close(quit)
	}()

	//  Server socket will accept connections
	server, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = server.SetCurveServer(1)
	if checkErr(err) {
		return
	}
	err = server.SetCurveSecretkey(server_secret)
	if checkErr(err) {
		return
	}
	err = server.SetIdentity("IDENT")
	if checkErr(err) {
		return
	}
	server.Bind("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}

	err = server.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}

	//  Check CURVE security with valid credentials
	client, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetCurveServerkey(server_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurvePublickey(client_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurveSecretkey(client_secret)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	bounce(server, client)
	err = client.Close()
	if checkErr(err) {
		return
	}

	time.Sleep(100 * time.Millisecond)

	//  Check CURVE security with a garbage server key
	//  This will be caught by the curve_server class, not passed to ZAP
	garbage_key := "0000111122223333444455556666777788889999"
	client, err = zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetCurveServerkey(garbage_key)
	if checkErr(err) {
		return
	}
	err = client.SetCurvePublickey(client_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurveSecretkey(client_secret)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	client.SetLinger(0)
	err = client.Close()
	if checkErr(err) {
		return
	}

	time.Sleep(100 * time.Millisecond)

	//  Check CURVE security with a garbage client secret key
	//  This will be caught by the curve_server class, not passed to ZAP
	client, err = zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetCurveServerkey(server_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurvePublickey(garbage_key)
	if checkErr(err) {
		return
	}
	err = client.SetCurveSecretkey(client_secret)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	client.SetLinger(0)
	err = client.Close()
	if checkErr(err) {
		return
	}

	time.Sleep(100 * time.Millisecond)

	//  Check CURVE security with a garbage client secret key
	//  This will be caught by the curve_server class, not passed to ZAP
	client, err = zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetCurveServerkey(server_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurvePublickey(client_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurveSecretkey(garbage_key)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	client.SetLinger(0)
	err = client.Close()
	if checkErr(err) {
		return
	}

	time.Sleep(100 * time.Millisecond)

	//  Check CURVE security with bogus client credentials
	//  This must be caught by the ZAP handler

	bogus_public, bogus_secret, _ := zmq.NewCurveKeypair()
	client, err = zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetCurveServerkey(server_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurvePublickey(bogus_public)
	if checkErr(err) {
		return
	}
	err = client.SetCurveSecretkey(bogus_secret)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	client.SetLinger(0)
	err = client.Close()
	if checkErr(err) {
		return
	}

	//  Shutdown
	err = server.Close()
	checkErr(err)

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

resource temporarily unavailable
resource temporarily unavailable
resource temporarily unavailable
resource temporarily unavailable
Done
Handler closed
Reactor finished
Example (Test_security_null)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
	"time"
)

func main() {

	time.Sleep(100 * time.Millisecond)

	handler, err := zmq.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}
	err = handler.Bind("inproc://zeromq.zap.01")
	if checkErr(err) {
		return
	}

	doHandler := func(state zmq.State) error {
		msg, err := handler.RecvMessage(0)
		if err != nil {
			return err //  Terminating
		}
		version := msg[0]
		sequence := msg[1]
		domain := msg[2]
		// address := msg[3]
		// identity := msg[4]
		mechanism := msg[5]

		if version != "1.0" {
			return errors.New("version != 1.0")
		}
		if mechanism != "NULL" {
			return errors.New("mechanism != NULL")
		}

		if domain == "TEST" {
			handler.SendMessage(version, sequence, "200", "OK", "anonymous", "")
		} else {
			handler.SendMessage(version, sequence, "400", "BAD DOMAIN", "", "")
		}
		return nil
	}

	doQuit := func(i interface{}) error {
		err := handler.Close()
		checkErr(err)
		fmt.Println("Handler closed")
		return errors.New("Quit")
	}
	quit := make(chan interface{})

	reactor := zmq.NewReactor()
	reactor.AddSocket(handler, zmq.POLLIN, doHandler)
	reactor.AddChannel(quit, 0, doQuit)
	go func() {
		reactor.Run(100 * time.Millisecond)
		fmt.Println("Reactor finished")
		quit <- true
	}()
	defer func() {
		quit <- true
		<-quit
		close(quit)
	}()

	//  We bounce between a binding server and a connecting client
	server, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	client, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}

	//  We first test client/server with no ZAP domain
	//  Libzmq does not call our ZAP handler, the connect must succeed
	err = server.Bind("tcp://127.0.0.1:9683")
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9683")
	if checkErr(err) {
		return
	}
	bounce(server, client)
	server.Unbind("tcp://127.0.0.1:9683")
	client.Disconnect("tcp://127.0.0.1:9683")

	//  Now define a ZAP domain for the server; this enables
	//  authentication. We're using the wrong domain so this test
	//  must fail.
	err = server.SetZapDomain("WRONG")
	if checkErr(err) {
		return
	}
	err = server.Bind("tcp://127.0.0.1:9687")
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9687")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	err = server.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	server.Unbind("tcp://127.0.0.1:9687")
	client.Disconnect("tcp://127.0.0.1:9687")

	//  Now use the right domain, the test must pass
	err = server.SetZapDomain("TEST")
	if checkErr(err) {
		return
	}
	err = server.Bind("tcp://127.0.0.1:9688")
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9688")
	if checkErr(err) {
		return
	}
	bounce(server, client)
	server.Unbind("tcp://127.0.0.1:9688")
	client.Disconnect("tcp://127.0.0.1:9688")

	err = client.Close()
	checkErr(err)
	err = server.Close()
	checkErr(err)

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

resource temporarily unavailable
Done
Handler closed
Reactor finished
Example (Test_security_plain)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"errors"
	"fmt"
	"runtime"
	"time"
)

func main() {

	time.Sleep(100 * time.Millisecond)

	handler, err := zmq.NewSocket(zmq.REP)
	if checkErr(err) {
		return
	}
	err = handler.Bind("inproc://zeromq.zap.01")
	if checkErr(err) {
		return
	}

	doHandler := func(state zmq.State) error {
		msg, err := handler.RecvMessage(0)
		if err != nil {
			return err //  Terminating
		}
		version := msg[0]
		sequence := msg[1]
		// domain := msg[2]
		// address := msg[3]
		identity := msg[4]
		mechanism := msg[5]
		username := msg[6]
		password := msg[7]

		if version != "1.0" {
			return errors.New("version != 1.0")
		}
		if mechanism != "PLAIN" {
			return errors.New("mechanism != PLAIN")
		}
		if identity != "IDENT" {
			return errors.New("identity != IDENT")
		}

		if username == "admin" && password == "password" {
			handler.SendMessage(version, sequence, "200", "OK", "anonymous", "")
		} else {
			handler.SendMessage(version, sequence, "400", "Invalid username or password", "", "")
		}
		return nil
	}

	doQuit := func(i interface{}) error {
		err := handler.Close()
		checkErr(err)
		fmt.Println("Handler closed")
		return errors.New("Quit")
	}
	quit := make(chan interface{})

	reactor := zmq.NewReactor()
	reactor.AddSocket(handler, zmq.POLLIN, doHandler)
	reactor.AddChannel(quit, 0, doQuit)
	go func() {
		reactor.Run(100 * time.Millisecond)
		fmt.Println("Reactor finished")
		quit <- true
	}()
	defer func() {
		quit <- true
		<-quit
		close(quit)
	}()

	//  Server socket will accept connections
	server, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = server.SetIdentity("IDENT")
	if checkErr(err) {
		return
	}
	err = server.SetPlainServer(1)
	if checkErr(err) {
		return
	}
	err = server.Bind("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}

	//  Check PLAIN security with correct username/password
	client, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	err = client.SetPlainUsername("admin")
	if checkErr(err) {
		return
	}
	err = client.SetPlainPassword("password")
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	bounce(server, client)
	err = client.Close()
	if checkErr(err) {
		return
	}

	//  Check PLAIN security with badly configured client (as_server)
	//  This will be caught by the plain_server class, not passed to ZAP
	client, err = zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	client.SetPlainServer(1)
	if checkErr(err) {
		return
	}
	err = client.Connect("tcp://127.0.0.1:9998")
	if checkErr(err) {
		return
	}
	err = client.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	err = server.SetRcvtimeo(time.Second)
	if checkErr(err) {
		return
	}
	bounce(server, client)
	client.SetLinger(0)
	err = client.Close()
	if checkErr(err) {
		return
	}

	err = server.Close()
	checkErr(err)

	fmt.Println("Done")
}

func bounce(server, client *zmq.Socket) {

	content := "12345678ABCDEFGH12345678abcdefgh"

	rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = client.Send(content, zmq.DONTWAIT)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err := server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err := server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = server.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = server.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

	rc, err = server.Send(content, zmq.SNDMORE)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	rc, err = server.Send(content, 0)
	if checkErr0(err) {
		return
	}
	if rc != 32 {
		checkErr0(errors.New("rc != 32"))
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if !rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore)))
		return
	}

	msg, err = client.Recv(0)
	if checkErr0(err) {
		return
	}

	if msg != content {
		checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content)))
	}

	rcvmore, err = client.GetRcvmore()
	if checkErr0(err) {
		return
	}
	if rcvmore {
		checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore)))
		return
	}

}

func checkErr0(err error) bool {
	if err != nil {
		fmt.Println(err)
		return true
	}
	return false
}

func checkErr(err error) bool {
	if err != nil {
		_, filename, lineno, ok := runtime.Caller(1)
		if ok {
			fmt.Printf("%v:%v: %v\n", filename, lineno, err)
		} else {
			fmt.Println(err)
		}
		return true
	}
	return false
}
Output:

resource temporarily unavailable
Done
Handler closed
Reactor finished
Example (Version)
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
)

func main() {
	major, _, _ := zmq.Version()
	fmt.Println("Version:", major)
}
Output:

Version: 4

Index

Examples

Constants

View Source
const (

	// On Windows platform some of the standard POSIX errnos are not defined.
	EADDRINUSE      = Errno(C.EADDRINUSE)
	EADDRNOTAVAIL   = Errno(C.EADDRNOTAVAIL)
	EAFNOSUPPORT    = Errno(C.EAFNOSUPPORT)
	ECONNABORTED    = Errno(C.ECONNABORTED)
	ECONNREFUSED    = Errno(C.ECONNREFUSED)
	ECONNRESET      = Errno(C.ECONNRESET)
	EHOSTUNREACH    = Errno(C.EHOSTUNREACH)
	EINPROGRESS     = Errno(C.EINPROGRESS)
	EMSGSIZE        = Errno(C.EMSGSIZE)
	ENETDOWN        = Errno(C.ENETDOWN)
	ENETRESET       = Errno(C.ENETRESET)
	ENETUNREACH     = Errno(C.ENETUNREACH)
	ENOBUFS         = Errno(C.ENOBUFS)
	ENOTCONN        = Errno(C.ENOTCONN)
	ENOTSOCK        = Errno(C.ENOTSOCK)
	ENOTSUP         = Errno(C.ENOTSUP)
	EPROTONOSUPPORT = Errno(C.EPROTONOSUPPORT)
	ETIMEDOUT       = Errno(C.ETIMEDOUT)

	// Native 0MQ error codes.
	EFSM           = Errno(C.EFSM)
	EMTHREAD       = Errno(C.EMTHREAD)
	ENOCOMPATPROTO = Errno(C.ENOCOMPATPROTO)
	ETERM          = Errno(C.ETERM)
)
View Source
const (
	MaxSocketsDflt = int(C.ZMQ_MAX_SOCKETS_DFLT)
	IoThreadsDflt  = int(C.ZMQ_IO_THREADS_DFLT)
)
View Source
const (
	// Constants for NewSocket()
	// See: http://api.zeromq.org/4-0:zmq-socket#toc3
	REQ    = Type(C.ZMQ_REQ)
	REP    = Type(C.ZMQ_REP)
	DEALER = Type(C.ZMQ_DEALER)
	ROUTER = Type(C.ZMQ_ROUTER)
	PUB    = Type(C.ZMQ_PUB)
	SUB    = Type(C.ZMQ_SUB)
	XPUB   = Type(C.ZMQ_XPUB)
	XSUB   = Type(C.ZMQ_XSUB)
	PUSH   = Type(C.ZMQ_PUSH)
	PULL   = Type(C.ZMQ_PULL)
	PAIR   = Type(C.ZMQ_PAIR)
	STREAM = Type(C.ZMQ_STREAM)
)
View Source
const (
	// Flags for (*Socket)Send(), (*Socket)Recv()
	// For Send, see: http://api.zeromq.org/4-0:zmq-send#toc2
	// For Recv, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
	DONTWAIT = Flag(C.ZMQ_DONTWAIT)
	SNDMORE  = Flag(C.ZMQ_SNDMORE)
)
View Source
const (
	// Flags for (*Socket)Monitor() and (*Socket)RecvEvent()
	// See: http://api.zeromq.org/4-0:zmq-socket-monitor#toc3
	EVENT_ALL             = Event(C.ZMQ_EVENT_ALL)
	EVENT_CONNECTED       = Event(C.ZMQ_EVENT_CONNECTED)
	EVENT_CONNECT_DELAYED = Event(C.ZMQ_EVENT_CONNECT_DELAYED)
	EVENT_CONNECT_RETRIED = Event(C.ZMQ_EVENT_CONNECT_RETRIED)
	EVENT_LISTENING       = Event(C.ZMQ_EVENT_LISTENING)
	EVENT_BIND_FAILED     = Event(C.ZMQ_EVENT_BIND_FAILED)
	EVENT_ACCEPTED        = Event(C.ZMQ_EVENT_ACCEPTED)
	EVENT_ACCEPT_FAILED   = Event(C.ZMQ_EVENT_ACCEPT_FAILED)
	EVENT_CLOSED          = Event(C.ZMQ_EVENT_CLOSED)
	EVENT_CLOSE_FAILED    = Event(C.ZMQ_EVENT_CLOSE_FAILED)
	EVENT_DISCONNECTED    = Event(C.ZMQ_EVENT_DISCONNECTED)
	EVENT_MONITOR_STOPPED = Event(C.ZMQ_EVENT_MONITOR_STOPPED)
)
View Source
const (
	// Flags for (*Socket)GetEvents()
	// See: http://api.zeromq.org/4-0:zmq-getsockopt#toc25
	POLLIN  = State(C.ZMQ_POLLIN)
	POLLOUT = State(C.ZMQ_POLLOUT)
)
View Source
const (
	// Constants for (*Socket)GetMechanism()
	// See: http://api.zeromq.org/4-0:zmq-getsockopt#toc31
	NULL   = Mechanism(C.ZMQ_NULL)
	PLAIN  = Mechanism(C.ZMQ_PLAIN)
	CURVE  = Mechanism(C.ZMQ_CURVE)
	GSSAPI = Mechanism(C.ZMQ_GSSAPI)
)
View Source
const CURVE_ALLOW_ANY = "*"

Variables

View Source
var (
	ErrorContextClosed     = errors.New("Context is closed")
	ErrorSocketClosed      = errors.New("Socket is closed")
	ErrorMoreExpected      = errors.New("More expected")
	ErrorNotImplemented405 = errors.New("Not implemented, requires 0MQ version 4.0.5")
	ErrorNotImplemented41  = errors.New("Not implemented, requires 0MQ version 4.1")
)

Functions

func AuthAllow

func AuthAllow(domain string, addresses ...string)

Allow (whitelist) some addresses for a domain.

An address can be a single IP address, or an IP address and mask in CIDR notation.

For NULL, all clients from these addresses will be accepted.

For PLAIN and CURVE, they will be allowed to continue with authentication.

You can call this method multiple times to whitelist multiple IP addresses.

If you whitelist a single address for a domain, any non-whitelisted addresses for that domain are treated as blacklisted.

Use domain "*" for all domains.

For backward compatibility: if domain can be parsed as an IP address, it will be interpreted as another address, and it and all remaining addresses will be added to all domains.

func AuthCurveAdd

func AuthCurveAdd(domain string, pubkeys ...string)

Add public user keys for CURVE authentication for a given domain.

To cover all domains, use "*".

Public keys are in Z85 printable text format.

To allow all client keys without checking, specify CURVE_ALLOW_ANY for the key.

func AuthCurveRemove

func AuthCurveRemove(domain string, pubkeys ...string)

Remove user keys from CURVE authentication for a given domain.

func AuthCurveRemoveAll

func AuthCurveRemoveAll(domain string)

Remove all user keys from CURVE authentication for a given domain.

func AuthDeny

func AuthDeny(domain string, addresses ...string)

Deny (blacklist) some addresses for a domain.

An address can be a single IP address, or an IP address and mask in CIDR notation.

For all security mechanisms, this rejects the connection without any further authentication.

Use either a whitelist for a domain, or a blacklist for a domain, not both. If you define both a whitelist and a blacklist for a domain, only the whitelist takes effect.

Use domain "*" for all domains.

For backward compatibility: if domain can be parsed as an IP address, it will be interpreted as another address, and it and all remaining addresses will be added to all domains.

func AuthMetaBlob

func AuthMetaBlob(key, value string) (blob []byte, err error)

This encodes a key/value pair into the format used by a ZAP handler.

Returns an error if key is more then 255 characters long.

func AuthPlainAdd

func AuthPlainAdd(domain, username, password string)

Add a user for PLAIN authentication for a given domain.

Set `domain` to "*" to apply to all domains.

func AuthPlainRemove

func AuthPlainRemove(domain string, usernames ...string)

Remove users from PLAIN authentication for a given domain.

func AuthPlainRemoveAll

func AuthPlainRemoveAll(domain string)

Remove all users from PLAIN authentication for a given domain.

func AuthSetMetadataHandler

func AuthSetMetadataHandler(
	handler func(
		version, request_id, domain, address, identity, mechanism string, credentials ...string) (metadata map[string]string))

This function sets the metadata handler that is called by the ZAP handler to retrieve key/value properties that should be set on reply messages in case of a status code "200" (succes).

Default properties are `Socket-Type`, which is already set, and `Identity` and `User-Id` that are empty by default. The last two can be set, and more properties can be added.

The `User-Id` property is used for the `user id` frame of the reply message. All other properties are stored in the `metadata` frame of the reply message.

The default handler returns an empty map.

For the meaning of the handler arguments, and other details, see: http://rfc.zeromq.org/spec:27#toc10

func AuthSetVerbose

func AuthSetVerbose(verbose bool)

Enable verbose tracing of commands and activity.

func AuthStart

func AuthStart() (err error)

Start authentication.

Note that until you add policies, all incoming NULL connections are allowed (classic ZeroMQ behaviour), and all PLAIN and CURVE connections are denied.

Example
package main

import (
	zmq "github.com/pebbe/zmq4"

	"fmt"
	"log"
)

func main() {

	checkErr := func(err error) bool {
		if err == nil {
			return false
		}
		log.Println(err)
		return true
	}

	zmq.AuthSetVerbose(false)

	//  Start authentication engine
	err := zmq.AuthStart()
	if checkErr(err) {
		return
	}
	defer zmq.AuthStop()

	zmq.AuthSetMetadataHandler(
		func(version, request_id, domain, address, identity, mechanism string, credentials ...string) (metadata map[string]string) {
			return map[string]string{
				"Identity": identity,
				"User-Id":  "anonymous",
				"Hello":    "World!",
				"Foo":      "Bar",
			}
		})

	zmq.AuthAllow("domain1", "127.0.0.1")

	//  We need two certificates, one for the client and one for
	//  the server. The client must know the server's public key
	//  to make a CURVE connection.
	client_public, client_secret, err := zmq.NewCurveKeypair()
	if checkErr(err) {
		return
	}
	server_public, server_secret, err := zmq.NewCurveKeypair()
	if checkErr(err) {
		return
	}

	//  Tell authenticator to use this public client key
	zmq.AuthCurveAdd("domain1", client_public)

	//  Create and bind server socket
	server, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	defer server.Close()
	server.SetIdentity("Server1")
	server.ServerAuthCurve("domain1", server_secret)
	err = server.Bind("tcp://*:9000")
	if checkErr(err) {
		return
	}

	//  Create and connect client socket
	client, err := zmq.NewSocket(zmq.DEALER)
	if checkErr(err) {
		return
	}
	defer client.Close()
	server.SetIdentity("Client1")
	client.ClientAuthCurve(server_public, client_public, client_secret)
	err = client.Connect("tcp://127.0.0.1:9000")
	if checkErr(err) {
		return
	}

	//  Send a message from client to server
	_, err = client.SendMessage("Greatings", "Earthlings!")
	if checkErr(err) {
		return
	}

	// Receive message and metadata on the server
	keys := []string{"Identity", "User-Id", "Socket-Type", "Hello", "Foo", "Fuz"}
	message, metadata, err := server.RecvMessageWithMetadata(0, keys...)
	if checkErr(err) {
		return
	}
	fmt.Println(message)
	if _, minor, _ := zmq.Version(); minor < 1 {
		// Metadata requires at least ZeroMQ version 4.1
		fmt.Println(`Identity: "Server1" true`)
		fmt.Println(`User-Id: "anonymous" true`)
		fmt.Println(`Socket-Type: "DEALER" true`)
		fmt.Println(`Hello: "World!" true`)
		fmt.Println(`Foo: "Bar" true`)
		fmt.Println(`Fuz: "" false`)
	} else {
		for _, key := range keys {
			value, ok := metadata[key]
			fmt.Printf("%v: %q %v\n", key, value, ok)
		}
	}
}
Output:

[Greatings Earthlings!]
Identity: "Server1" true
User-Id: "anonymous" true
Socket-Type: "DEALER" true
Hello: "World!" true
Foo: "Bar" true
Fuz: "" false

func AuthStop

func AuthStop()

Stop authentication.

func Error

func Error(e int) string

Get 0MQ error message string.

func GetIoThreads

func GetIoThreads() (int, error)

Returns the size of the 0MQ thread pool in the default context.

func GetIpv6

func GetIpv6() (bool, error)

Returns the IPv6 option in the default context.

func GetMaxSockets

func GetMaxSockets() (int, error)

Returns the maximum number of sockets allowed in the default context.

func HasCurve

func HasCurve() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the CURVE security mechanism

func HasGssapi

func HasGssapi() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the GSSAPI security mechanism

func HasIpc

func HasIpc() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the ipc:// protocol

func HasNorm

func HasNorm() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the norm:// protocol

func HasPgm

func HasPgm() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the pgm:// protocol

func HasTipc

func HasTipc() bool

Returns false for ZeroMQ version < 4.1.0

Else: returns true if the library supports the tipc:// protocol

func NewCurveKeypair

func NewCurveKeypair() (z85_public_key, z85_secret_key string, err error)

Generate a new CURVE keypair

See: http://api.zeromq.org/4-0:zmq-curve-keypair

func Proxy

func Proxy(frontend, backend, capture *Socket) error

Start built-in ØMQ proxy

See: http://api.zeromq.org/4-0:zmq-proxy#toc2

func ProxySteerable

func ProxySteerable(frontend, backend, capture, control *Socket) error

Start built-in ØMQ proxy with PAUSE/RESUME/TERMINATE control flow

Returns ErrorNotImplemented405 with ZeroMQ version < 4.0.5

See: http://api.zeromq.org/4-0:zmq-proxy-steerable#toc2

func SetIoThreads

func SetIoThreads(n int) error

Specifies the size of the 0MQ thread pool to handle I/O operations in the default context. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one. This option only applies before creating any sockets.

Default value 1

func SetIpv6

func SetIpv6(i bool) error

Sets the IPv6 value for all sockets created in the default context from this point onwards. A value of true means IPv6 is enabled, while false means the socket will use only IPv4. When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.

Default value false

func SetMaxSockets

func SetMaxSockets(n int) error

Sets the maximum number of sockets allowed in the default context.

Default value 1024

func Term

func Term() error

Terminates the default context.

For linger behavior, see: http://api.zeromq.org/4-0:zmq-ctx-term

func Version

func Version() (major, minor, patch int)

Report 0MQ library version.

func Z85decode

func Z85decode(s string) string

Decode a binary key from Z85 printable text

See: http://api.zeromq.org/4-0:zmq-z85-decode

func Z85encode

func Z85encode(data string) string

Encode a binary key as Z85 printable text

See: http://api.zeromq.org/4-0:zmq-z85-encode

Types

type Context

type Context struct {
	// contains filtered or unexported fields
}

A context that is not the default context.

func NewContext

func NewContext() (ctx *Context, err error)

Create a new context.

func (*Context) GetIoThreads

func (ctx *Context) GetIoThreads() (int, error)

Returns the size of the 0MQ thread pool.

func (*Context) GetIpv6

func (ctx *Context) GetIpv6() (bool, error)

Returns the IPv6 option.

func (*Context) GetMaxSockets

func (ctx *Context) GetMaxSockets() (int, error)

Returns the maximum number of sockets allowed.

func (*Context) NewSocket

func (ctx *Context) NewSocket(t Type) (soc *Socket, err error)

Create 0MQ socket in the given context.

WARNING: The Socket is not thread safe. This means that you cannot access the same Socket from different goroutines without using something like a mutex.

For a description of socket types, see: http://api.zeromq.org/4-0:zmq-socket#toc3

func (*Context) SetIoThreads

func (ctx *Context) SetIoThreads(n int) error

Specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one. This option only applies before creating any sockets.

Default value 1

func (*Context) SetIpv6

func (ctx *Context) SetIpv6(i bool) error

Sets the IPv6 value for all sockets created in the context from this point onwards. A value of true means IPv6 is enabled, while false means the socket will use only IPv4. When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.

Default value false

func (*Context) SetMaxSockets

func (ctx *Context) SetMaxSockets(n int) error

Sets the maximum number of sockets allowed.

Default value 1024

func (*Context) Term

func (ctx *Context) Term() error

Terminates the context.

For linger behavior, see: http://api.zeromq.org/4-0:zmq-ctx-term

type Errno

type Errno uintptr

An Errno is an unsigned number describing an error condition as returned by a call to ZeroMQ. It implements the error interface. The number is either a standard system error, or an error defined by the C library of ZeroMQ.

func AsErrno

func AsErrno(err error) Errno

Convert error to Errno.

Example usage:

switch AsErrno(err) {

case zmq.Errno(syscall.EINTR):
    // standard system error

    // call was interrupted

case zmq.ETERM:
    // error defined by ZeroMQ

    // context was terminated

}

See also: examples/interrupt.go

func (Errno) Error

func (errno Errno) Error() string

Return Errno as string.

type Event

type Event int

Used by (*Socket)Monitor() and (*Socket)RecvEvent()

func (Event) String

func (e Event) String() string

Socket event as string.

type Flag

type Flag int

Used by (*Socket)Send() and (*Socket)Recv()

func (Flag) String

func (f Flag) String() string

Socket flag as string.

type Mechanism

type Mechanism int

Specifies the security mechanism, used by (*Socket)GetMechanism()

func (Mechanism) String

func (m Mechanism) String() string

Security mechanism as string.

type Polled

type Polled struct {
	Socket *Socket // socket with matched event(s)
	Events State   // actual matched event(s)
}

Return type for (*Poller)Poll

type Poller

type Poller struct {
	// contains filtered or unexported fields
}

func NewPoller

func NewPoller() *Poller

Create a new Poller

func (*Poller) Add

func (p *Poller) Add(soc *Socket, events State)

Add items to the poller

Events is a bitwise OR of zmq.POLLIN and zmq.POLLOUT

func (*Poller) Poll

func (p *Poller) Poll(timeout time.Duration) ([]Polled, error)

Input/output multiplexing

If timeout < 0, wait forever until a matching event is detected

Only sockets with matching socket events are returned in the list.

Example:

poller := zmq.NewPoller()
poller.Add(socket0, zmq.POLLIN)
poller.Add(socket1, zmq.POLLIN)
//  Process messages from both sockets
for {
    sockets, _ := poller.Poll(-1)
    for _, socket := range sockets {
        switch s := socket.Socket; s {
        case socket0:
            msg, _ := s.Recv(0)
            //  Process msg
        case socket1:
            msg, _ := s.Recv(0)
            //  Process msg
        }
    }
}

func (*Poller) PollAll

func (p *Poller) PollAll(timeout time.Duration) ([]Polled, error)

This is like (*Poller)Poll, but it returns a list of all sockets, in the same order as they were added to the poller, not just those sockets that had an event.

For each socket in the list, you have to check the Events field to see if there was actually an event.

When error is not nil, the return list contains no sockets.

func (*Poller) String

func (p *Poller) String() string

Poller as string.

type Reactor

type Reactor struct {
	// contains filtered or unexported fields
}

func NewReactor

func NewReactor() *Reactor

Create a reactor to mix the handling of sockets and channels (timers or other channels).

Example:

reactor := zmq.NewReactor()
reactor.AddSocket(socket1, zmq.POLLIN, socket1_handler)
reactor.AddSocket(socket2, zmq.POLLIN, socket2_handler)
reactor.AddChannelTime(time.Tick(time.Second), 1, ticker_handler)
reactor.Run(time.Second)

func (*Reactor) AddChannel

func (r *Reactor) AddChannel(ch <-chan interface{}, limit int, handler func(interface{}) error) (id uint64)

Add channel handler to the reactor.

Returns id of added handler, that can be used later to remove it.

If limit is positive, at most this many items will be handled in each run through the main loop, otherwise it will process as many items as possible.

The handler function receives the value received from the channel.

func (*Reactor) AddChannelTime

func (r *Reactor) AddChannelTime(ch <-chan time.Time, limit int, handler func(interface{}) error) (id uint64)

This function wraps AddChannel, using a channel of type time.Time instead of type interface{}.

func (*Reactor) AddSocket

func (r *Reactor) AddSocket(soc *Socket, events State, handler func(State) error)

Add socket handler to the reactor.

You can have only one handler per socket. Adding a second one will remove the first.

The handler receives the socket state as an argument: POLLIN, POLLOUT, or both.

func (*Reactor) RemoveChannel

func (r *Reactor) RemoveChannel(id uint64)

Remove a channel from the reactor.

Closed channels are removed automaticly.

func (*Reactor) RemoveSocket

func (r *Reactor) RemoveSocket(soc *Socket)

Remove a socket handler from the reactor.

func (*Reactor) Run

func (r *Reactor) Run(interval time.Duration) (err error)

Run the reactor.

The interval determines the time-out on the polling of sockets. Interval must be positive if there are channels. If there are no channels, you can set interval to -1.

The run alternates between polling/handling sockets (using the interval as timeout), and reading/handling channels. The reading of channels is without time-out: if there is no activity on any channel, the run continues to poll sockets immediately.

The run exits when any handler returns an error, returning that same error.

func (*Reactor) SetVerbose

func (r *Reactor) SetVerbose(verbose bool)

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

Socket functions starting with `Set` or `Get` are used for setting and getting socket options.

func NewSocket

func NewSocket(t Type) (soc *Socket, err error)

Create 0MQ socket in the default context.

WARNING: The Socket is not thread safe. This means that you cannot access the same Socket from different goroutines without using something like a mutex.

For a description of socket types, see: http://api.zeromq.org/4-0:zmq-socket#toc3

func (*Socket) Bind

func (soc *Socket) Bind(endpoint string) error

Accept incoming connections on a socket.

For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-bind#toc2

func (*Socket) ClientAuthCurve

func (client *Socket) ClientAuthCurve(server_public_key, client_public_key, client_secret_key string) error

Set CURVE client role.

func (*Socket) ClientAuthPlain

func (client *Socket) ClientAuthPlain(username, password string) error

Set PLAIN client role.

func (*Socket) Close

func (soc *Socket) Close() error

If not called explicitly, the socket will be closed on garbage collection

func (*Socket) Connect

func (soc *Socket) Connect(endpoint string) error

Create outgoing connection from socket.

For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-connect#toc2

func (*Socket) Disconnect

func (soc *Socket) Disconnect(endpoint string) error

Disconnect a socket.

For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-connect#toc2

func (*Socket) GetAffinity

func (soc *Socket) GetAffinity() (uint64, error)

ZMQ_AFFINITY: Retrieve I/O thread affinity

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc7

func (*Socket) GetBacklog

func (soc *Socket) GetBacklog() (int, error)

ZMQ_BACKLOG: Retrieve maximum length of the queue of outstanding connections

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc16

func (*Socket) GetCurvePublickeyRaw

func (soc *Socket) GetCurvePublickeyRaw() (string, error)

ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc35

func (*Socket) GetCurvePublickeykeyZ85

func (soc *Socket) GetCurvePublickeykeyZ85() (string, error)

ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc35

func (*Socket) GetCurveSecretkeyRaw

func (soc *Socket) GetCurveSecretkeyRaw() (string, error)

ZMQ_CURVE_SECRETKEY: Retrieve current CURVE secret key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc36

func (*Socket) GetCurveSecretkeyZ85

func (soc *Socket) GetCurveSecretkeyZ85() (string, error)

ZMQ_CURVE_SECRETKEY: Retrieve current CURVE secret key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc36

func (*Socket) GetCurveServerkeyRaw

func (soc *Socket) GetCurveServerkeyRaw() (string, error)

ZMQ_CURVE_SERVERKEY: Retrieve current CURVE server key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc37

func (*Socket) GetCurveServerkeyZ85

func (soc *Socket) GetCurveServerkeyZ85() (string, error)

ZMQ_CURVE_SERVERKEY: Retrieve current CURVE server key

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc37

func (*Socket) GetEvents

func (soc *Socket) GetEvents() (State, error)

ZMQ_EVENTS: Retrieve socket event state

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc25

func (*Socket) GetFd

func (soc *Socket) GetFd() (int, error)

ZMQ_FD: Retrieve file descriptor associated with the socket

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc24

func (*Socket) GetGssapiPlaintext

func (soc *Socket) GetGssapiPlaintext() (bool, error)

ZMQ_GSSAPI_PLAINTEXT: Retrieve GSSAPI plaintext or encrypted status

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc10

func (*Socket) GetGssapiPrincipal

func (soc *Socket) GetGssapiPrincipal() (string, error)

ZMQ_GSSAPI_PRINCIPAL: Retrieve the name of the GSSAPI principal

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc11

func (*Socket) GetGssapiServer

func (soc *Socket) GetGssapiServer() (bool, error)

ZMQ_GSSAPI_SERVER: Retrieve current GSSAPI server role

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc12

func (*Socket) GetGssapiServicePrincipal

func (soc *Socket) GetGssapiServicePrincipal() (string, error)

ZMQ_GSSAPI_SERVICE_PRINCIPAL: Retrieve the name of the GSSAPI service principal

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc13

func (*Socket) GetHandshakeIvlt

func (soc *Socket) GetHandshakeIvlt() (time.Duration, error)

ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc14

func (*Socket) GetIdentity

func (soc *Socket) GetIdentity() (string, error)

ZMQ_IDENTITY: Retrieve socket identity

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc8

func (*Socket) GetImmediate

func (soc *Socket) GetImmediate() (bool, error)

ZMQ_IMMEDIATE: Retrieve attach-on-connect value

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc23

func (*Socket) GetIpv6

func (soc *Socket) GetIpv6() (bool, error)

ZMQ_IPV6: Retrieve IPv6 socket status

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc21

func (*Socket) GetLastEndpoint

func (soc *Socket) GetLastEndpoint() (string, error)

ZMQ_LAST_ENDPOINT: Retrieve the last endpoint set

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc26

func (*Socket) GetLinger

func (soc *Socket) GetLinger() (time.Duration, error)

ZMQ_LINGER: Retrieve linger period for socket shutdown

Returns time.Duration(-1) for infinite

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc13

func (*Socket) GetMaxmsgsize

func (soc *Socket) GetMaxmsgsize() (int64, error)

ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc17

func (*Socket) GetMechanism

func (soc *Socket) GetMechanism() (Mechanism, error)

ZMQ_MECHANISM: Retrieve current security mechanism

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc31

func (*Socket) GetMulticastHops

func (soc *Socket) GetMulticastHops() (int, error)

ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc18

func (*Socket) GetPlainPassword

func (soc *Socket) GetPlainPassword() (string, error)

ZMQ_PLAIN_PASSWORD: Retrieve current password

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc34

func (*Socket) GetPlainServer

func (soc *Socket) GetPlainServer() (int, error)

ZMQ_PLAIN_SERVER: Retrieve current PLAIN server role

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc32

func (*Socket) GetPlainUsername

func (soc *Socket) GetPlainUsername() (string, error)

ZMQ_PLAIN_USERNAME: Retrieve current PLAIN username

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc33

func (*Socket) GetRate

func (soc *Socket) GetRate() (int, error)

ZMQ_RATE: Retrieve multicast data rate

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc9

func (*Socket) GetRcvbuf

func (soc *Socket) GetRcvbuf() (int, error)

ZMQ_RCVBUF: Retrieve kernel receive buffer size

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc12

func (*Socket) GetRcvhwm

func (soc *Socket) GetRcvhwm() (int, error)

ZMQ_RCVHWM: Retrieve high water mark for inbound messages

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc6

func (*Socket) GetRcvmore

func (soc *Socket) GetRcvmore() (bool, error)

ZMQ_RCVMORE: More message data parts to follow

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc4

func (*Socket) GetRcvtimeo

func (soc *Socket) GetRcvtimeo() (time.Duration, error)

ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN

Returns time.Duration(-1) for infinite

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc19

func (*Socket) GetReconnectIvl

func (soc *Socket) GetReconnectIvl() (time.Duration, error)

ZMQ_RECONNECT_IVL: Retrieve reconnection interval

Returns time.Duration(-1) for no reconnection

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc14

func (*Socket) GetReconnectIvlMax

func (soc *Socket) GetReconnectIvlMax() (time.Duration, error)

ZMQ_RECONNECT_IVL_MAX: Retrieve maximum reconnection interval

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc15

func (*Socket) GetRecoveryIvl

func (soc *Socket) GetRecoveryIvl() (time.Duration, error)

ZMQ_RECOVERY_IVL: Get multicast recovery interval

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc10

func (*Socket) GetSndbuf

func (soc *Socket) GetSndbuf() (int, error)

ZMQ_SNDBUF: Retrieve kernel transmit buffer size

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc11

func (*Socket) GetSndhwm

func (soc *Socket) GetSndhwm() (int, error)

ZMQ_SNDHWM: Retrieves high water mark for outbound messages

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc5

func (*Socket) GetSndtimeo

func (soc *Socket) GetSndtimeo() (time.Duration, error)

ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN

Returns time.Duration(-1) for infinite

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc20

func (*Socket) GetSocksProxy

func (soc *Socket) GetSocksProxy() (string, error)

ZMQ_SOCKS_PROXY: NOT DOCUMENTED

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

func (*Socket) GetTcpKeepalive

func (soc *Socket) GetTcpKeepalive() (int, error)

ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc27

func (*Socket) GetTcpKeepaliveCnt

func (soc *Socket) GetTcpKeepaliveCnt() (int, error)

ZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc29

func (*Socket) GetTcpKeepaliveIdle

func (soc *Socket) GetTcpKeepaliveIdle() (int, error)

ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS)

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc28

func (*Socket) GetTcpKeepaliveIntvl

func (soc *Socket) GetTcpKeepaliveIntvl() (int, error)

ZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc30

func (*Socket) GetTos

func (soc *Socket) GetTos() (int, error)

ZMQ_TOS: Retrieve the Type-of-Service socket override status

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-getsockopt#toc42

func (*Socket) GetType

func (soc *Socket) GetType() (Type, error)

ZMQ_TYPE: Retrieve socket type

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc3

func (*Socket) GetZapDomain

func (soc *Socket) GetZapDomain() (string, error)

ZMQ_ZAP_DOMAIN: Retrieve RFC 27 authentication domain

See: http://api.zeromq.org/4-0:zmq-getsockopt#toc38

func (*Socket) Monitor

func (soc *Socket) Monitor(addr string, events Event) error

Register a monitoring callback.

See: http://api.zeromq.org/4-0:zmq-socket-monitor#toc2

WARNING: Closing a context with a monitoring callback will lead to random crashes. This is a bug in the ZeroMQ library. The monitoring callback has the same context as the socket it was created for.

Example:

package main

import (
    zmq "github.com/pebbe/zmq4"
    "log"
    "time"
)

func rep_socket_monitor(addr string) {
    s, err := zmq.NewSocket(zmq.PAIR)
    if err != nil {
        log.Fatalln(err)
    }
    err = s.Connect(addr)
    if err != nil {
        log.Fatalln(err)
    }
    for {
        a, b, c, err := s.RecvEvent(0)
        if err != nil {
            log.Println(err)
            break
        }
        log.Println(a, b, c)
    }
    s.Close()
}

func main() {

    // REP socket
    rep, err := zmq.NewSocket(zmq.REP)
    if err != nil {
        log.Fatalln(err)
    }

    // REP socket monitor, all events
    err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL)
    if err != nil {
        log.Fatalln(err)
    }
    go rep_socket_monitor("inproc://monitor.rep")

    // Generate an event
    rep.Bind("tcp://*:5555")
    if err != nil {
        log.Fatalln(err)
    }

    // Allow some time for event detection
    time.Sleep(time.Second)

    rep.Close()
    zmq.Term()
}

func (*Socket) Recv

func (soc *Socket) Recv(flags Flag) (string, error)

Receive a message part from a socket.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2

func (*Socket) RecvBytes

func (soc *Socket) RecvBytes(flags Flag) ([]byte, error)

Receive a message part from a socket.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2

func (*Socket) RecvBytesWithMetadata

func (soc *Socket) RecvBytesWithMetadata(flags Flag, properties ...string) (msg []byte, metadata map[string]string, err error)

Receive a message part with metadata.

This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.

The returned metadata map contains only those properties that exist on the message.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2

For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3

func (*Socket) RecvEvent

func (soc *Socket) RecvEvent(flags Flag) (event_type Event, addr string, value int, err error)

Receive a message part from a socket interpreted as an event.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2

For a description of event_type, see: http://api.zeromq.org/4-0:zmq-socket-monitor#toc3

For an example, see: func (*Socket) Monitor

func (*Socket) RecvMessage

func (soc *Socket) RecvMessage(flags Flag) (msg []string, err error)

Receive parts as message from socket.

Returns last non-nil error code.

func (*Socket) RecvMessageBytes

func (soc *Socket) RecvMessageBytes(flags Flag) (msg [][]byte, err error)

Receive parts as message from socket.

Returns last non-nil error code.

func (*Socket) RecvMessageBytesWithMetadata

func (soc *Socket) RecvMessageBytesWithMetadata(flags Flag, properties ...string) (msg [][]byte, metadata map[string]string, err error)

Receive parts as message from socket, including metadata.

Metadata is picked from the first message part.

For details about metadata, see RecvBytesWithMetadata().

Returns last non-nil error code.

func (*Socket) RecvMessageWithMetadata

func (soc *Socket) RecvMessageWithMetadata(flags Flag, properties ...string) (msg []string, metadata map[string]string, err error)

Receive parts as message from socket, including metadata.

Metadata is picked from the first message part.

For details about metadata, see RecvWithMetadata().

Returns last non-nil error code.

func (*Socket) RecvWithMetadata

func (soc *Socket) RecvWithMetadata(flags Flag, properties ...string) (msg string, metadata map[string]string, err error)

Receive a message part with metadata.

This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.

The returned metadata map contains only those properties that exist on the message.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2

For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3

func (*Socket) Send

func (soc *Socket) Send(data string, flags Flag) (int, error)

Send a message part on a socket.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-send#toc2

func (*Socket) SendBytes

func (soc *Socket) SendBytes(data []byte, flags Flag) (int, error)

Send a message part on a socket.

For a description of flags, see: http://api.zeromq.org/4-0:zmq-send#toc2

func (*Socket) SendMessage

func (soc *Socket) SendMessage(parts ...interface{}) (total int, err error)

Send multi-part message on socket.

Any `[]string' or `[][]byte' is split into separate `string's or `[]byte's

Any other part that isn't a `string' or `[]byte' is converted to `string' with `fmt.Sprintf("%v", part)'.

Returns total bytes sent.

func (*Socket) SendMessageDontwait

func (soc *Socket) SendMessageDontwait(parts ...interface{}) (total int, err error)

Like SendMessage(), but adding the DONTWAIT flag.

func (*Socket) ServerAuthCurve

func (server *Socket) ServerAuthCurve(domain, secret_key string) error

Set CURVE server role.

func (*Socket) ServerAuthNull

func (server *Socket) ServerAuthNull(domain string) error

Set NULL server role.

func (*Socket) ServerAuthPlain

func (server *Socket) ServerAuthPlain(domain string) error

Set PLAIN server role.

func (*Socket) SetAffinity

func (soc *Socket) SetAffinity(value uint64) error

ZMQ_AFFINITY: Set I/O thread affinity

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc5

func (*Socket) SetBacklog

func (soc *Socket) SetBacklog(value int) error

ZMQ_BACKLOG: Set maximum length of the queue of outstanding connections

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc16

func (*Socket) SetConflate

func (soc *Socket) SetConflate(value bool) error

ZMQ_CONFLATE: Keep only last message

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc43

func (*Socket) SetConnectRid

func (soc *Socket) SetConnectRid(value string) error

ZMQ_CONNECT_RID: Assign the next outbound connection id

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc5

func (*Socket) SetCurvePublickey

func (soc *Socket) SetCurvePublickey(key string) error

ZMQ_CURVE_PUBLICKEY: Set CURVE public key

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc39

func (*Socket) SetCurveSecretkey

func (soc *Socket) SetCurveSecretkey(key string) error

ZMQ_CURVE_SECRETKEY: Set CURVE secret key

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc40

func (*Socket) SetCurveServer

func (soc *Socket) SetCurveServer(value int) error

ZMQ_CURVE_SERVER: Set CURVE server role

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc38

func (*Socket) SetCurveServerkey

func (soc *Socket) SetCurveServerkey(key string) error

ZMQ_CURVE_SERVERKEY: Set CURVE server key

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc41

func (*Socket) SetGssapiPlaintext

func (soc *Socket) SetGssapiPlaintext(value bool) error

ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc11

func (*Socket) SetGssapiPrincipal

func (soc *Socket) SetGssapiPrincipal(value string) error

ZMQ_GSSAPI_PRINCIPAL: Set name of GSSAPI principal

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc12

func (*Socket) SetGssapiServer

func (soc *Socket) SetGssapiServer(value bool) error

ZMQ_GSSAPI_SERVER: Set GSSAPI server role

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc13

func (*Socket) SetGssapiServicePrincipal

func (soc *Socket) SetGssapiServicePrincipal(value string) error

ZMQ_GSSAPI_SERVICE_PRINCIPAL: Set name of GSSAPI service principal

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc14

func (*Socket) SetHandshakeIvl

func (soc *Socket) SetHandshakeIvl(value time.Duration) error

ZMQ_HANDSHAKE_IVL: Set maximum handshake interval

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc15

func (*Socket) SetIdentity

func (soc *Socket) SetIdentity(value string) error

ZMQ_IDENTITY: Set socket identity

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc8

func (*Socket) SetImmediate

func (soc *Socket) SetImmediate(value bool) error

ZMQ_IMMEDIATE: Queue messages only to completed connections

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc23

func (*Socket) SetIpv6

func (soc *Socket) SetIpv6(value bool) error

ZMQ_IPV6: Enable IPv6 on socket

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc21

func (*Socket) SetLinger

func (soc *Socket) SetLinger(value time.Duration) error

ZMQ_LINGER: Set linger period for socket shutdown

Use -1 for infinite

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc13

func (*Socket) SetMaxmsgsize

func (soc *Socket) SetMaxmsgsize(value int64) error

ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc17

func (*Socket) SetMulticastHops

func (soc *Socket) SetMulticastHops(value int) error

ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc18

func (*Socket) SetPlainPassword

func (soc *Socket) SetPlainPassword(password string) error

ZMQ_PLAIN_PASSWORD: Set PLAIN security password

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc37

func (*Socket) SetPlainServer

func (soc *Socket) SetPlainServer(value int) error

ZMQ_PLAIN_SERVER: Set PLAIN server role

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc35

func (*Socket) SetPlainUsername

func (soc *Socket) SetPlainUsername(username string) error

ZMQ_PLAIN_USERNAME: Set PLAIN security username

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc36

func (*Socket) SetProbeRouter

func (soc *Socket) SetProbeRouter(value int) error

ZMQ_PROBE_ROUTER: bootstrap connections to ROUTER sockets

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc26

func (*Socket) SetRate

func (soc *Socket) SetRate(value int) error

ZMQ_RATE: Set multicast data rate

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc9

func (*Socket) SetRcvbuf

func (soc *Socket) SetRcvbuf(value int) error

ZMQ_RCVBUF: Set kernel receive buffer size

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc12

func (*Socket) SetRcvhwm

func (soc *Socket) SetRcvhwm(value int) error

ZMQ_RCVHWM: Set high water mark for inbound messages

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc4

func (*Socket) SetRcvtimeo

func (soc *Socket) SetRcvtimeo(value time.Duration) error

ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN

Use -1 for infinite

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc19

func (*Socket) SetReconnectIvl

func (soc *Socket) SetReconnectIvl(value time.Duration) error

ZMQ_RECONNECT_IVL: Set reconnection interval

Use -1 for no reconnection

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc14

func (*Socket) SetReconnectIvlMax

func (soc *Socket) SetReconnectIvlMax(value time.Duration) error

ZMQ_RECONNECT_IVL_MAX: Set maximum reconnection interval

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc15

func (*Socket) SetRecoveryIvl

func (soc *Socket) SetRecoveryIvl(value time.Duration) error

ZMQ_RECOVERY_IVL: Set multicast recovery interval

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc10

func (*Socket) SetReqCorrelate

func (soc *Socket) SetReqCorrelate(value int) error

ZMQ_REQ_CORRELATE: match replies with requests

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc28

func (*Socket) SetReqRelaxed

func (soc *Socket) SetReqRelaxed(value int) error

ZMQ_REQ_RELAXED: relax strict alternation between request and reply

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc29

func (*Socket) SetRouterHandover

func (soc *Socket) SetRouterHandover(value bool) error

ZMQ_ROUTER_HANDOVER: handle duplicate client identities on ROUTER sockets

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc35

func (*Socket) SetRouterMandatory

func (soc *Socket) SetRouterMandatory(value int) error

ZMQ_ROUTER_MANDATORY: accept only routable messages on ROUTER sockets

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc24

func (*Socket) SetRouterRaw

func (soc *Socket) SetRouterRaw(value int) error

ZMQ_ROUTER_RAW: switch ROUTER socket to raw mode

This option is deprecated, please use ZMQ_STREAM sockets instead.

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc25

func (*Socket) SetSndbuf

func (soc *Socket) SetSndbuf(value int) error

ZMQ_SNDBUF: Set kernel transmit buffer size

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc11

func (*Socket) SetSndhwm

func (soc *Socket) SetSndhwm(value int) error

ZMQ_SNDHWM: Set high water mark for outbound messages

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc3

func (*Socket) SetSndtimeo

func (soc *Socket) SetSndtimeo(value time.Duration) error

ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN

Use -1 for infinite

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc20

func (*Socket) SetSocksProxy

func (soc *Socket) SetSocksProxy(value string) error

ZMQ_SOCKS_PROXY: NOT DOCUMENTED

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

func (*Socket) SetSubscribe

func (soc *Socket) SetSubscribe(filter string) error

ZMQ_SUBSCRIBE: Establish message filter

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc6

func (*Socket) SetTcpAcceptFilter

func (soc *Socket) SetTcpAcceptFilter(filter string) error

ZMQ_TCP_ACCEPT_FILTER: Assign filters to allow new TCP connections

This option is deprecated, please use authentication via the ZAP API and IP address whitelisting / blacklisting.

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc34

func (*Socket) SetTcpKeepalive

func (soc *Socket) SetTcpKeepalive(value int) error

ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc30

func (*Socket) SetTcpKeepaliveCnt

func (soc *Socket) SetTcpKeepaliveCnt(value int) error

ZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc32

func (*Socket) SetTcpKeepaliveIdle

func (soc *Socket) SetTcpKeepaliveIdle(value int) error

ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS)

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc31

func (*Socket) SetTcpKeepaliveIntvl

func (soc *Socket) SetTcpKeepaliveIntvl(value int) error

ZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc33

func (*Socket) SetTos

func (soc *Socket) SetTos(value int) error

ZMQ_TOS: Set the Type-of-Service on socket

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

See: http://api.zeromq.org/4-1:zmq-setsockopt#toc46

func (*Socket) SetUnsubscribe

func (soc *Socket) SetUnsubscribe(filter string) error

ZMQ_UNSUBSCRIBE: Remove message filter

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc7

func (*Socket) SetXpubNodrop

func (soc *Socket) SetXpubNodrop(value bool) error

ZMQ_XPUB_NODROP: NOT DOCUMENTED

Returns ErrorNotImplemented41 with ZeroMQ version < 4.1

func (*Socket) SetXpubVerbose

func (soc *Socket) SetXpubVerbose(value int) error

ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc27

func (*Socket) SetZapDomain

func (soc *Socket) SetZapDomain(domain string) error

ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain

See: http://api.zeromq.org/4-0:zmq-setsockopt#toc42

func (Socket) String

func (soc Socket) String() string

Socket as string.

func (*Socket) Unbind

func (soc *Socket) Unbind(endpoint string) error

Stop accepting connections on a socket.

For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-bind#toc2

type State

type State int

Used by (soc *Socket)GetEvents()

func (State) String

func (s State) String() string

Socket state as string.

type Type

type Type int

Specifies the type of a socket, used by NewSocket()

func (Type) String

func (t Type) String() string

Socket type as string.

Directories

Path Synopsis
bstar
bstar - Binary Star reactor.
bstar - Binary Star reactor.
clone
Clone client API stack (multithreaded).
Clone client API stack (multithreaded).
flcliapi
flcliapi - Freelance Pattern agent class.
flcliapi - Freelance Pattern agent class.
intface
Interface class for Chapter 8.
Interface class for Chapter 8.
kvmsg
kvmsg class - key-value message class for example applications
kvmsg class - key-value message class for example applications
kvsimple
kvsimple - simple key-value message class for example applications.
kvsimple - simple key-value message class for example applications.
mdapi
Majordomo Protocol Client and Worker API.
Majordomo Protocol Client and Worker API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL