Documentation ¶
Overview ¶
A Go interface to ZeroMQ (zmq, 0mq) version 4.
WARNING: API for ZeroMQ 4.1.0 specific features may change until 4.1.0 is officially released
For ZeroMQ version 3, see: http://github.com/pebbe/zmq3
For ZeroMQ version 2, see: http://github.com/pebbe/zmq2
See also the wiki: https://github.com/pebbe/zmq4/wiki
A note on the use of a context:
This package provides a default context. This is what will be used by the functions without a context receiver, that create a socket or manipulate the context. Package developers that import this package should probably not use the default context with its associated functions, but create their own context(s). See: type Context.
Example (Multiple_contexts) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" "time" ) func main() { chQuit := make(chan interface{}) chReactor := make(chan bool) addr1 := "tcp://127.0.0.1:9997" addr2 := "tcp://127.0.0.1:9998" serv_ctx1, err := zmq.NewContext() if checkErr(err) { return } serv1, err := serv_ctx1.NewSocket(zmq.REP) if checkErr(err) { return } err = serv1.Bind(addr1) if checkErr(err) { return } defer func() { serv1.Close() serv_ctx1.Term() }() serv_ctx2, err := zmq.NewContext() if checkErr(err) { return } serv2, err := serv_ctx2.NewSocket(zmq.REP) if checkErr(err) { return } err = serv2.Bind(addr2) if checkErr(err) { return } defer func() { serv2.Close() serv_ctx2.Term() }() new_service := func(sock *zmq.Socket, addr string) { socket_handler := func(state zmq.State) error { msg, err := sock.RecvMessage(0) if checkErr(err) { return err } _, err = sock.SendMessage(addr, msg) if checkErr(err) { return err } return nil } quit_handler := func(interface{}) error { return errors.New("quit") } defer func() { chReactor <- true }() reactor := zmq.NewReactor() reactor.AddSocket(sock, zmq.POLLIN, socket_handler) reactor.AddChannel(chQuit, 1, quit_handler) err = reactor.Run(100 * time.Millisecond) fmt.Println(err) } go new_service(serv1, addr1) go new_service(serv2, addr2) time.Sleep(time.Second) // default context sock1, err := zmq.NewSocket(zmq.REQ) if checkErr(err) { return } sock2, err := zmq.NewSocket(zmq.REQ) if checkErr(err) { return } err = sock1.Connect(addr1) if checkErr(err) { return } err = sock2.Connect(addr2) if checkErr(err) { return } _, err = sock1.SendMessage(addr1) if checkErr(err) { return } _, err = sock2.SendMessage(addr2) if checkErr(err) { return } msg, err := sock1.RecvMessage(0) fmt.Println(err, msg) msg, err = sock2.RecvMessage(0) fmt.Println(err, msg) err = sock1.Close() if checkErr(err) { return } err = sock2.Close() if checkErr(err) { return } // non-default contexts ctx1, err := zmq.NewContext() if checkErr(err) { return } ctx2, err := zmq.NewContext() if checkErr(err) { return } sock1, err = ctx1.NewSocket(zmq.REQ) if checkErr(err) { return } sock2, err = ctx2.NewSocket(zmq.REQ) if checkErr(err) { return } err = sock1.Connect(addr1) if checkErr(err) { return } err = sock2.Connect(addr2) if checkErr(err) { return } _, err = sock1.SendMessage(addr1) if checkErr(err) { return } _, err = sock2.SendMessage(addr2) if checkErr(err) { return } msg, err = sock1.RecvMessage(0) fmt.Println(err, msg) msg, err = sock2.RecvMessage(0) fmt.Println(err, msg) err = sock1.Close() if checkErr(err) { return } err = sock2.Close() if checkErr(err) { return } err = ctx1.Term() if checkErr(err) { return } err = ctx2.Term() if checkErr(err) { return } // close(chQuit) doesn't work because the reactor removes closed channels, instead of acting on them chQuit <- true <-chReactor chQuit <- true <-chReactor fmt.Println("Done") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: <nil> [tcp://127.0.0.1:9997 tcp://127.0.0.1:9997] <nil> [tcp://127.0.0.1:9998 tcp://127.0.0.1:9998] <nil> [tcp://127.0.0.1:9997 tcp://127.0.0.1:9997] <nil> [tcp://127.0.0.1:9998 tcp://127.0.0.1:9998] quit quit Done
Example (Socket_event) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "time" ) func rep_socket_monitor(addr string) { s, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } defer s.Close() err = s.Connect(addr) if checkErr(err) { return } for { a, b, _, err := s.RecvEvent(0) if checkErr(err) { break } fmt.Println(a, b) if a == zmq.EVENT_CLOSED { break } } } func main() { // REP socket rep, err := zmq.NewSocket(zmq.REP) if checkErr(err) { return } // REP socket monitor, all events err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL) if checkErr(err) { rep.Close() return } go rep_socket_monitor("inproc://monitor.rep") time.Sleep(time.Second) // Generate an event rep.Bind("tcp://*:9689") if checkErr(err) { rep.Close() return } rep.Close() // Allow some time for event detection time.Sleep(time.Second) fmt.Println("Done") }
Output: EVENT_LISTENING tcp://0.0.0.0:9689 EVENT_CLOSED tcp://0.0.0.0:9689 Done
Example (Test_abstract_ipc) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" ) func main() { addr := "ipc://@/tmp/tester" // This is Linux only if runtime.GOOS != "linux" { fmt.Printf("%q\n", addr) fmt.Println("Done") return } sb, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sb.Bind(addr) if checkErr(err) { return } endpoint, err := sb.GetLastEndpoint() if checkErr(err) { return } fmt.Printf("%q\n", endpoint) sc, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sc.Connect(addr) if checkErr(err) { return } bounce(sb, sc) err = sc.Close() if checkErr(err) { return } err = sb.Close() if checkErr(err) { return } fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: "ipc://@/tmp/tester" Done
Example (Test_conflate) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" "strconv" "time" ) func main() { bind_to := "tcp://127.0.0.1:5555" err := zmq.SetIoThreads(1) if checkErr(err) { return } s_in, err := zmq.NewSocket(zmq.PULL) if checkErr(err) { return } err = s_in.SetConflate(true) if checkErr(err) { return } err = s_in.Bind(bind_to) if checkErr(err) { return } s_out, err := zmq.NewSocket(zmq.PUSH) if checkErr(err) { return } err = s_out.Connect(bind_to) if checkErr(err) { return } message_count := 20 for j := 0; j < message_count; j++ { _, err = s_out.Send(fmt.Sprint(j), 0) if checkErr(err) { return } } time.Sleep(time.Second) payload_recved, err := s_in.Recv(0) if checkErr(err) { return } i, err := strconv.Atoi(payload_recved) if checkErr(err) { return } if i != message_count-1 { checkErr(errors.New("payload_recved != message_count - 1")) return } err = s_in.Close() if checkErr(err) { return } err = s_out.Close() if checkErr(err) { return } fmt.Println("Done") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: Done
Example (Test_connect_resolve) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "runtime" ) func main() { sock, err := zmq.NewSocket(zmq.PUB) if checkErr(err) { return } err = sock.Connect("tcp://localhost:1234") checkErr(err) err = sock.Connect("tcp://localhost:invalid") fmt.Println(err) err = sock.Connect("tcp://in val id:1234") fmt.Println(err) err = sock.Connect("invalid://localhost:1234") fmt.Println(err) err = sock.Close() checkErr(err) fmt.Println("Done") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: invalid argument invalid argument protocol not supported Done
Example (Test_ctx_options) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" ) func main() { i, err := zmq.GetMaxSockets() fmt.Println(i == zmq.MaxSocketsDflt, err) i, err = zmq.GetIoThreads() fmt.Println(i == zmq.IoThreadsDflt, err) b, err := zmq.GetIpv6() fmt.Println(b, err) zmq.SetIpv6(true) b, err = zmq.GetIpv6() fmt.Println(b, err) router, _ := zmq.NewSocket(zmq.ROUTER) b, err = router.GetIpv6() fmt.Println(b, err) fmt.Println(router.Close()) zmq.SetIpv6(false) fmt.Println("Done") }
Output: true <nil> true <nil> false <nil> true <nil> true <nil> <nil> Done
Example (Test_disconnect_inproc) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "runtime" "time" ) func main() { publicationsReceived := 0 isSubscribed := false pubSocket, err := zmq.NewSocket(zmq.XPUB) if checkErr(err) { return } subSocket, err := zmq.NewSocket(zmq.SUB) if checkErr(err) { return } err = subSocket.SetSubscribe("foo") if checkErr(err) { return } err = pubSocket.Bind("inproc://someInProcDescriptor") if checkErr(err) { return } iteration := 0 poller := zmq.NewPoller() poller.Add(subSocket, zmq.POLLIN) // read publications poller.Add(pubSocket, zmq.POLLIN) // read subscriptions for { sockets, err := poller.Poll(100 * time.Millisecond) if checkErr(err) { break // Interrupted } for _, socket := range sockets { if socket.Socket == pubSocket { for { buffer, err := pubSocket.Recv(0) if checkErr(err) { return } fmt.Printf("pubSocket: %q\n", buffer) if buffer[0] == 0 { fmt.Println("pubSocket, isSubscribed == true:", isSubscribed == true) isSubscribed = false } else { fmt.Println("pubSocket, isSubscribed == false:", isSubscribed == false) isSubscribed = true } more, err := pubSocket.GetRcvmore() if checkErr(err) { return } if !more { break // Last message part } } break } } for _, socket := range sockets { if socket.Socket == subSocket { for { msg, err := subSocket.Recv(0) if checkErr(err) { return } fmt.Printf("subSocket: %q\n", msg) more, err := subSocket.GetRcvmore() if checkErr(err) { return } if !more { publicationsReceived++ break // Last message part } } break } } if iteration == 1 { err := subSocket.Connect("inproc://someInProcDescriptor") checkErr(err) } if iteration == 4 { err := subSocket.Disconnect("inproc://someInProcDescriptor") checkErr(err) } if iteration > 4 && len(sockets) == 0 { break } _, err = pubSocket.Send("foo", zmq.SNDMORE) checkErr(err) _, err = pubSocket.Send("this is foo!", 0) checkErr(err) iteration++ } fmt.Println("publicationsReceived == 3:", publicationsReceived == 3) fmt.Println("!isSubscribed:", !isSubscribed) err = pubSocket.Close() checkErr(err) err = subSocket.Close() checkErr(err) fmt.Println("Done") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: pubSocket: "\x01foo" pubSocket, isSubscribed == false: true subSocket: "foo" subSocket: "this is foo!" subSocket: "foo" subSocket: "this is foo!" subSocket: "foo" subSocket: "this is foo!" pubSocket: "\x00foo" pubSocket, isSubscribed == true: true publicationsReceived == 3: true !isSubscribed: true Done
Example (Test_fork) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "runtime" ) func main() { address := "tcp://127.0.0.1:6571" NUM_MESSAGES := 5 // Create and bind pull socket to receive messages pull, err := zmq.NewSocket(zmq.PULL) if checkErr(err) { return } err = pull.Bind(address) if checkErr(err) { return } done := make(chan bool) go func() { defer func() { done <- true }() // Create new socket, connect and send some messages push, err := zmq.NewSocket(zmq.PUSH) if checkErr(err) { return } defer func() { err := push.Close() checkErr(err) }() err = push.Connect(address) if checkErr(err) { return } for count := 0; count < NUM_MESSAGES; count++ { _, err = push.Send("Hello", 0) if checkErr(err) { return } } }() for count := 0; count < NUM_MESSAGES; count++ { msg, err := pull.Recv(0) fmt.Printf("%q %v\n", msg, err) } err = pull.Close() checkErr(err) <-done fmt.Println("Done") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: "Hello" <nil> "Hello" <nil> "Hello" <nil> "Hello" <nil> "Hello" <nil> Done
Example (Test_hwm) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "runtime" "time" ) func main() { MAX_SENDS := 10000 BIND_FIRST := 1 CONNECT_FIRST := 2 test_defaults := func() int { // Set up bind socket bind_socket, err := zmq.NewSocket(zmq.PULL) if checkErr(err) { return 0 } defer func() { err := bind_socket.Close() checkErr(err) }() err = bind_socket.Bind("inproc://a") if checkErr(err) { return 0 } // Set up connect socket connect_socket, err := zmq.NewSocket(zmq.PUSH) if checkErr(err) { return 0 } defer func() { err := connect_socket.Close() checkErr(err) }() err = connect_socket.Connect("inproc://a") if checkErr(err) { return 0 } // Send until we block send_count := 0 for send_count < MAX_SENDS { _, err := connect_socket.Send("", zmq.DONTWAIT) if err != nil { fmt.Println("Send:", err) break } send_count++ } // Now receive all sent messages recv_count := 0 for { _, err := bind_socket.Recv(zmq.DONTWAIT) if err != nil { fmt.Println("Recv:", err) break } recv_count++ } fmt.Println("send_count == recv_count:", send_count == recv_count) return send_count } count_msg := func(send_hwm, recv_hwm, testType int) int { var bind_socket, connect_socket *zmq.Socket var err error if testType == BIND_FIRST { // Set up bind socket bind_socket, err = zmq.NewSocket(zmq.PULL) if checkErr(err) { return 0 } defer func() { err := bind_socket.Close() checkErr(err) }() err = bind_socket.SetRcvhwm(recv_hwm) if checkErr(err) { return 0 } err = bind_socket.Bind("inproc://a") if checkErr(err) { return 0 } // Set up connect socket connect_socket, err = zmq.NewSocket(zmq.PUSH) if checkErr(err) { return 0 } defer func() { err := connect_socket.Close() checkErr(err) }() err = connect_socket.SetSndhwm(send_hwm) if checkErr(err) { return 0 } err = connect_socket.Connect("inproc://a") if checkErr(err) { return 0 } } else { // Set up connect socket connect_socket, err = zmq.NewSocket(zmq.PUSH) if checkErr(err) { return 0 } defer func() { err := connect_socket.Close() checkErr(err) }() err = connect_socket.SetSndhwm(send_hwm) if checkErr(err) { return 0 } err = connect_socket.Connect("inproc://a") if checkErr(err) { return 0 } // Set up bind socket bind_socket, err = zmq.NewSocket(zmq.PULL) if checkErr(err) { return 0 } defer func() { err := bind_socket.Close() checkErr(err) }() err = bind_socket.SetRcvhwm(recv_hwm) if checkErr(err) { return 0 } err = bind_socket.Bind("inproc://a") if checkErr(err) { return 0 } } // Send until we block send_count := 0 for send_count < MAX_SENDS { _, err := connect_socket.Send("", zmq.DONTWAIT) if err != nil { fmt.Println("Send:", err) break } send_count++ } // Now receive all sent messages recv_count := 0 for { _, err := bind_socket.Recv(zmq.DONTWAIT) if err != nil { fmt.Println("Recv:", err) break } recv_count++ } fmt.Println("send_count == recv_count:", send_count == recv_count) // Now it should be possible to send one more. _, err = connect_socket.Send("", 0) if checkErr(err) { return 0 } // Consume the remaining message. _, err = bind_socket.Recv(0) checkErr(err) return send_count } test_inproc_bind_first := func(send_hwm, recv_hwm int) int { return count_msg(send_hwm, recv_hwm, BIND_FIRST) } test_inproc_connect_first := func(send_hwm, recv_hwm int) int { return count_msg(send_hwm, recv_hwm, CONNECT_FIRST) } test_inproc_connect_and_close_first := func(send_hwm, recv_hwm int) int { // Set up connect socket connect_socket, err := zmq.NewSocket(zmq.PUSH) if checkErr(err) { return 0 } err = connect_socket.SetSndhwm(send_hwm) if checkErr(err) { connect_socket.Close() return 0 } err = connect_socket.Connect("inproc://a") if checkErr(err) { connect_socket.Close() return 0 } // Send until we block send_count := 0 for send_count < MAX_SENDS { _, err := connect_socket.Send("", zmq.DONTWAIT) if err != nil { fmt.Println("Send:", err) break } send_count++ } // Close connect err = connect_socket.Close() if checkErr(err) { return 0 } // Set up bind socket bind_socket, err := zmq.NewSocket(zmq.PULL) if checkErr(err) { return 0 } defer func() { err := bind_socket.Close() checkErr(err) }() err = bind_socket.SetRcvhwm(recv_hwm) if checkErr(err) { return 0 } err = bind_socket.Bind("inproc://a") if checkErr(err) { return 0 } // Now receive all sent messages recv_count := 0 for { _, err := bind_socket.Recv(zmq.DONTWAIT) if err != nil { fmt.Println("Recv:", err) break } recv_count++ } fmt.Println("send_count == recv_count:", send_count == recv_count) return send_count } // Default values are 1000 on send and 1000 one receive, so 2000 total fmt.Println("Default values") count := test_defaults() fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) // Infinite send and receive buffer fmt.Println("\nInfinite send and receive") count = test_inproc_bind_first(0, 0) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) count = test_inproc_connect_first(0, 0) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) // Infinite send buffer fmt.Println("\nInfinite send buffer") count = test_inproc_bind_first(1, 0) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) count = test_inproc_connect_first(1, 0) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) // Infinite receive buffer fmt.Println("\nInfinite receive buffer") count = test_inproc_bind_first(0, 1) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) count = test_inproc_connect_first(0, 1) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) // Send and recv buffers hwm 1, so total that can be queued is 2 fmt.Println("\nSend and recv buffers hwm 1") count = test_inproc_bind_first(1, 1) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) count = test_inproc_connect_first(1, 1) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) // Send hwm of 1, send before bind so total that can be queued is 1 fmt.Println("\nSend hwm of 1, send before bind") count = test_inproc_connect_and_close_first(1, 0) fmt.Println("count:", count) time.Sleep(100 * time.Millisecond) fmt.Println("\nDone") } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: Default values Send: resource temporarily unavailable Recv: resource temporarily unavailable send_count == recv_count: true count: 2000 Infinite send and receive Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Infinite send buffer Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Infinite receive buffer Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Recv: resource temporarily unavailable send_count == recv_count: true count: 10000 Send and recv buffers hwm 1 Send: resource temporarily unavailable Recv: resource temporarily unavailable send_count == recv_count: true count: 2 Send: resource temporarily unavailable Recv: resource temporarily unavailable send_count == recv_count: true count: 2 Send hwm of 1, send before bind Send: resource temporarily unavailable Recv: resource temporarily unavailable send_count == recv_count: true count: 1 Done
Example (Test_pair_ipc) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" ) func main() { sb, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sb.Bind("ipc:///tmp/tester") if checkErr(err) { return } sc, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sc.Connect("ipc:///tmp/tester") if checkErr(err) { return } bounce(sb, sc) err = sc.Close() if checkErr(err) { return } err = sb.Close() if checkErr(err) { return } fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: Done
Example (Test_pair_tcp) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" ) func main() { sb, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sb.Bind("tcp://127.0.0.1:9736") if checkErr(err) { return } sc, err := zmq.NewSocket(zmq.PAIR) if checkErr(err) { return } err = sc.Connect("tcp://127.0.0.1:9736") if checkErr(err) { return } bounce(sb, sc) err = sc.Close() if checkErr(err) { return } err = sb.Close() if checkErr(err) { return } fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: Done
Example (Test_remote_endpoint) ¶
if _, minor, _ := zmq.Version(); minor < 1 { fmt.Println("127.0.0.1") fmt.Println("Done") return } addr := "tcp://127.0.0.1:9560" rep, err := zmq.NewSocket(zmq.REP) if checkErr(err) { return } req, err := zmq.NewSocket(zmq.REQ) if checkErr(err) { rep.Close() return } err = rep.Bind(addr) if checkErr(err) { rep.Close() req.Close() return } err = req.Connect(addr) if checkErr(err) { rep.Close() req.Close() return } tmp := "test" _, err = req.Send(tmp, 0) if checkErr(err) { rep.Close() req.Close() return } // get message with remote endpoint msg, props, err := rep.RecvWithMetadata(0, "Remote-Endpoint") if checkErr(err) { rep.Close() req.Close() return } if msg != tmp { fmt.Println(tmp, "!=", msg) } fmt.Println(props["Peer-Address"]) err = rep.Close() if checkErr(err) { req.Close() return } err = req.Close() if checkErr(err) { return } fmt.Println("Done")
Output: 127.0.0.1 Done
Example (Test_security_curve) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" "time" ) func main() { time.Sleep(100 * time.Millisecond) // Generate new keypairs for this test client_public, client_secret, err := zmq.NewCurveKeypair() if checkErr(err) { return } server_public, server_secret, err := zmq.NewCurveKeypair() if checkErr(err) { return } handler, err := zmq.NewSocket(zmq.REP) if checkErr(err) { return } err = handler.Bind("inproc://zeromq.zap.01") if checkErr(err) { return } doHandler := func(state zmq.State) error { msg, err := handler.RecvMessage(0) if err != nil { return err // Terminating } version := msg[0] sequence := msg[1] // domain := msg[2] // address := msg[3] identity := msg[4] mechanism := msg[5] client_key := msg[6] client_key_text := zmq.Z85encode(client_key) if version != "1.0" { return errors.New("version != 1.0") } if mechanism != "CURVE" { return errors.New("mechanism != CURVE") } if identity != "IDENT" { return errors.New("identity != IDENT") } if client_key_text == client_public { handler.SendMessage(version, sequence, "200", "OK", "anonymous", "") } else { handler.SendMessage(version, sequence, "400", "Invalid client public key", "", "") } return nil } doQuit := func(i interface{}) error { err := handler.Close() checkErr(err) fmt.Println("Handler closed") return errors.New("Quit") } quit := make(chan interface{}) reactor := zmq.NewReactor() reactor.AddSocket(handler, zmq.POLLIN, doHandler) reactor.AddChannel(quit, 0, doQuit) go func() { reactor.Run(100 * time.Millisecond) fmt.Println("Reactor finished") quit <- true }() defer func() { quit <- true <-quit close(quit) }() // Server socket will accept connections server, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = server.SetCurveServer(1) if checkErr(err) { return } err = server.SetCurveSecretkey(server_secret) if checkErr(err) { return } err = server.SetIdentity("IDENT") if checkErr(err) { return } server.Bind("tcp://127.0.0.1:9998") if checkErr(err) { return } err = server.SetRcvtimeo(time.Second) if checkErr(err) { return } // Check CURVE security with valid credentials client, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetCurveServerkey(server_public) if checkErr(err) { return } err = client.SetCurvePublickey(client_public) if checkErr(err) { return } err = client.SetCurveSecretkey(client_secret) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } bounce(server, client) err = client.Close() if checkErr(err) { return } time.Sleep(100 * time.Millisecond) // Check CURVE security with a garbage server key // This will be caught by the curve_server class, not passed to ZAP garbage_key := "0000111122223333444455556666777788889999" client, err = zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetCurveServerkey(garbage_key) if checkErr(err) { return } err = client.SetCurvePublickey(client_public) if checkErr(err) { return } err = client.SetCurveSecretkey(client_secret) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) client.SetLinger(0) err = client.Close() if checkErr(err) { return } time.Sleep(100 * time.Millisecond) // Check CURVE security with a garbage client secret key // This will be caught by the curve_server class, not passed to ZAP client, err = zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetCurveServerkey(server_public) if checkErr(err) { return } err = client.SetCurvePublickey(garbage_key) if checkErr(err) { return } err = client.SetCurveSecretkey(client_secret) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) client.SetLinger(0) err = client.Close() if checkErr(err) { return } time.Sleep(100 * time.Millisecond) // Check CURVE security with a garbage client secret key // This will be caught by the curve_server class, not passed to ZAP client, err = zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetCurveServerkey(server_public) if checkErr(err) { return } err = client.SetCurvePublickey(client_public) if checkErr(err) { return } err = client.SetCurveSecretkey(garbage_key) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) client.SetLinger(0) err = client.Close() if checkErr(err) { return } time.Sleep(100 * time.Millisecond) // Check CURVE security with bogus client credentials // This must be caught by the ZAP handler bogus_public, bogus_secret, _ := zmq.NewCurveKeypair() client, err = zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetCurveServerkey(server_public) if checkErr(err) { return } err = client.SetCurvePublickey(bogus_public) if checkErr(err) { return } err = client.SetCurveSecretkey(bogus_secret) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) client.SetLinger(0) err = client.Close() if checkErr(err) { return } // Shutdown err = server.Close() checkErr(err) fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: resource temporarily unavailable resource temporarily unavailable resource temporarily unavailable resource temporarily unavailable Done Handler closed Reactor finished
Example (Test_security_null) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" "time" ) func main() { time.Sleep(100 * time.Millisecond) handler, err := zmq.NewSocket(zmq.REP) if checkErr(err) { return } err = handler.Bind("inproc://zeromq.zap.01") if checkErr(err) { return } doHandler := func(state zmq.State) error { msg, err := handler.RecvMessage(0) if err != nil { return err // Terminating } version := msg[0] sequence := msg[1] domain := msg[2] // address := msg[3] // identity := msg[4] mechanism := msg[5] if version != "1.0" { return errors.New("version != 1.0") } if mechanism != "NULL" { return errors.New("mechanism != NULL") } if domain == "TEST" { handler.SendMessage(version, sequence, "200", "OK", "anonymous", "") } else { handler.SendMessage(version, sequence, "400", "BAD DOMAIN", "", "") } return nil } doQuit := func(i interface{}) error { err := handler.Close() checkErr(err) fmt.Println("Handler closed") return errors.New("Quit") } quit := make(chan interface{}) reactor := zmq.NewReactor() reactor.AddSocket(handler, zmq.POLLIN, doHandler) reactor.AddChannel(quit, 0, doQuit) go func() { reactor.Run(100 * time.Millisecond) fmt.Println("Reactor finished") quit <- true }() defer func() { quit <- true <-quit close(quit) }() // We bounce between a binding server and a connecting client server, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } client, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } // We first test client/server with no ZAP domain // Libzmq does not call our ZAP handler, the connect must succeed err = server.Bind("tcp://127.0.0.1:9683") if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9683") if checkErr(err) { return } bounce(server, client) server.Unbind("tcp://127.0.0.1:9683") client.Disconnect("tcp://127.0.0.1:9683") // Now define a ZAP domain for the server; this enables // authentication. We're using the wrong domain so this test // must fail. err = server.SetZapDomain("WRONG") if checkErr(err) { return } err = server.Bind("tcp://127.0.0.1:9687") if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9687") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } err = server.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) server.Unbind("tcp://127.0.0.1:9687") client.Disconnect("tcp://127.0.0.1:9687") // Now use the right domain, the test must pass err = server.SetZapDomain("TEST") if checkErr(err) { return } err = server.Bind("tcp://127.0.0.1:9688") if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9688") if checkErr(err) { return } bounce(server, client) server.Unbind("tcp://127.0.0.1:9688") client.Disconnect("tcp://127.0.0.1:9688") err = client.Close() checkErr(err) err = server.Close() checkErr(err) fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: resource temporarily unavailable Done Handler closed Reactor finished
Example (Test_security_plain) ¶
package main import ( zmq "github.com/pebbe/zmq4" "errors" "fmt" "runtime" "time" ) func main() { time.Sleep(100 * time.Millisecond) handler, err := zmq.NewSocket(zmq.REP) if checkErr(err) { return } err = handler.Bind("inproc://zeromq.zap.01") if checkErr(err) { return } doHandler := func(state zmq.State) error { msg, err := handler.RecvMessage(0) if err != nil { return err // Terminating } version := msg[0] sequence := msg[1] // domain := msg[2] // address := msg[3] identity := msg[4] mechanism := msg[5] username := msg[6] password := msg[7] if version != "1.0" { return errors.New("version != 1.0") } if mechanism != "PLAIN" { return errors.New("mechanism != PLAIN") } if identity != "IDENT" { return errors.New("identity != IDENT") } if username == "admin" && password == "password" { handler.SendMessage(version, sequence, "200", "OK", "anonymous", "") } else { handler.SendMessage(version, sequence, "400", "Invalid username or password", "", "") } return nil } doQuit := func(i interface{}) error { err := handler.Close() checkErr(err) fmt.Println("Handler closed") return errors.New("Quit") } quit := make(chan interface{}) reactor := zmq.NewReactor() reactor.AddSocket(handler, zmq.POLLIN, doHandler) reactor.AddChannel(quit, 0, doQuit) go func() { reactor.Run(100 * time.Millisecond) fmt.Println("Reactor finished") quit <- true }() defer func() { quit <- true <-quit close(quit) }() // Server socket will accept connections server, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = server.SetIdentity("IDENT") if checkErr(err) { return } err = server.SetPlainServer(1) if checkErr(err) { return } err = server.Bind("tcp://127.0.0.1:9998") if checkErr(err) { return } // Check PLAIN security with correct username/password client, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } err = client.SetPlainUsername("admin") if checkErr(err) { return } err = client.SetPlainPassword("password") if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } bounce(server, client) err = client.Close() if checkErr(err) { return } // Check PLAIN security with badly configured client (as_server) // This will be caught by the plain_server class, not passed to ZAP client, err = zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } client.SetPlainServer(1) if checkErr(err) { return } err = client.Connect("tcp://127.0.0.1:9998") if checkErr(err) { return } err = client.SetRcvtimeo(time.Second) if checkErr(err) { return } err = server.SetRcvtimeo(time.Second) if checkErr(err) { return } bounce(server, client) client.SetLinger(0) err = client.Close() if checkErr(err) { return } err = server.Close() checkErr(err) fmt.Println("Done") } func bounce(server, client *zmq.Socket) { content := "12345678ABCDEFGH12345678abcdefgh" rc, err := client.Send(content, zmq.SNDMORE|zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = client.Send(content, zmq.DONTWAIT) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err := server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err := server.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = server.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = server.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } rc, err = server.Send(content, zmq.SNDMORE) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } rc, err = server.Send(content, 0) if checkErr0(err) { return } if rc != 32 { checkErr0(errors.New("rc != 32")) } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if !rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore ==", rcvmore))) return } msg, err = client.Recv(0) if checkErr0(err) { return } if msg != content { checkErr0(errors.New(fmt.Sprintf("%q != %q", msg, content))) } rcvmore, err = client.GetRcvmore() if checkErr0(err) { return } if rcvmore { checkErr0(errors.New(fmt.Sprint("rcvmore == ", rcvmore))) return } } func checkErr0(err error) bool { if err != nil { fmt.Println(err) return true } return false } func checkErr(err error) bool { if err != nil { _, filename, lineno, ok := runtime.Caller(1) if ok { fmt.Printf("%v:%v: %v\n", filename, lineno, err) } else { fmt.Println(err) } return true } return false }
Output: resource temporarily unavailable Done Handler closed Reactor finished
Example (Version) ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" ) func main() { major, _, _ := zmq.Version() fmt.Println("Version:", major) }
Output: Version: 4
Index ¶
- Constants
- Variables
- func AuthAllow(domain string, addresses ...string)
- func AuthCurveAdd(domain string, pubkeys ...string)
- func AuthCurveRemove(domain string, pubkeys ...string)
- func AuthCurveRemoveAll(domain string)
- func AuthDeny(domain string, addresses ...string)
- func AuthMetaBlob(key, value string) (blob []byte, err error)
- func AuthPlainAdd(domain, username, password string)
- func AuthPlainRemove(domain string, usernames ...string)
- func AuthPlainRemoveAll(domain string)
- func AuthSetMetadataHandler(...)
- func AuthSetVerbose(verbose bool)
- func AuthStart() (err error)
- func AuthStop()
- func Error(e int) string
- func GetIoThreads() (int, error)
- func GetIpv6() (bool, error)
- func GetMaxSockets() (int, error)
- func HasCurve() bool
- func HasGssapi() bool
- func HasIpc() bool
- func HasNorm() bool
- func HasPgm() bool
- func HasTipc() bool
- func NewCurveKeypair() (z85_public_key, z85_secret_key string, err error)
- func Proxy(frontend, backend, capture *Socket) error
- func ProxySteerable(frontend, backend, capture, control *Socket) error
- func SetIoThreads(n int) error
- func SetIpv6(i bool) error
- func SetMaxSockets(n int) error
- func Term() error
- func Version() (major, minor, patch int)
- func Z85decode(s string) string
- func Z85encode(data string) string
- type Context
- func (ctx *Context) GetIoThreads() (int, error)
- func (ctx *Context) GetIpv6() (bool, error)
- func (ctx *Context) GetMaxSockets() (int, error)
- func (ctx *Context) NewSocket(t Type) (soc *Socket, err error)
- func (ctx *Context) SetIoThreads(n int) error
- func (ctx *Context) SetIpv6(i bool) error
- func (ctx *Context) SetMaxSockets(n int) error
- func (ctx *Context) Term() error
- type Errno
- type Event
- type Flag
- type Mechanism
- type Polled
- type Poller
- type Reactor
- func (r *Reactor) AddChannel(ch <-chan interface{}, limit int, handler func(interface{}) error) (id uint64)
- func (r *Reactor) AddChannelTime(ch <-chan time.Time, limit int, handler func(interface{}) error) (id uint64)
- func (r *Reactor) AddSocket(soc *Socket, events State, handler func(State) error)
- func (r *Reactor) RemoveChannel(id uint64)
- func (r *Reactor) RemoveSocket(soc *Socket)
- func (r *Reactor) Run(interval time.Duration) (err error)
- func (r *Reactor) SetVerbose(verbose bool)
- type Socket
- func (soc *Socket) Bind(endpoint string) error
- func (client *Socket) ClientAuthCurve(server_public_key, client_public_key, client_secret_key string) error
- func (client *Socket) ClientAuthPlain(username, password string) error
- func (soc *Socket) Close() error
- func (soc *Socket) Connect(endpoint string) error
- func (soc *Socket) Disconnect(endpoint string) error
- func (soc *Socket) GetAffinity() (uint64, error)
- func (soc *Socket) GetBacklog() (int, error)
- func (soc *Socket) GetCurvePublickeyRaw() (string, error)
- func (soc *Socket) GetCurvePublickeykeyZ85() (string, error)
- func (soc *Socket) GetCurveSecretkeyRaw() (string, error)
- func (soc *Socket) GetCurveSecretkeyZ85() (string, error)
- func (soc *Socket) GetCurveServerkeyRaw() (string, error)
- func (soc *Socket) GetCurveServerkeyZ85() (string, error)
- func (soc *Socket) GetEvents() (State, error)
- func (soc *Socket) GetFd() (int, error)
- func (soc *Socket) GetGssapiPlaintext() (bool, error)
- func (soc *Socket) GetGssapiPrincipal() (string, error)
- func (soc *Socket) GetGssapiServer() (bool, error)
- func (soc *Socket) GetGssapiServicePrincipal() (string, error)
- func (soc *Socket) GetHandshakeIvlt() (time.Duration, error)
- func (soc *Socket) GetIdentity() (string, error)
- func (soc *Socket) GetImmediate() (bool, error)
- func (soc *Socket) GetIpv6() (bool, error)
- func (soc *Socket) GetLastEndpoint() (string, error)
- func (soc *Socket) GetLinger() (time.Duration, error)
- func (soc *Socket) GetMaxmsgsize() (int64, error)
- func (soc *Socket) GetMechanism() (Mechanism, error)
- func (soc *Socket) GetMulticastHops() (int, error)
- func (soc *Socket) GetPlainPassword() (string, error)
- func (soc *Socket) GetPlainServer() (int, error)
- func (soc *Socket) GetPlainUsername() (string, error)
- func (soc *Socket) GetRate() (int, error)
- func (soc *Socket) GetRcvbuf() (int, error)
- func (soc *Socket) GetRcvhwm() (int, error)
- func (soc *Socket) GetRcvmore() (bool, error)
- func (soc *Socket) GetRcvtimeo() (time.Duration, error)
- func (soc *Socket) GetReconnectIvl() (time.Duration, error)
- func (soc *Socket) GetReconnectIvlMax() (time.Duration, error)
- func (soc *Socket) GetRecoveryIvl() (time.Duration, error)
- func (soc *Socket) GetSndbuf() (int, error)
- func (soc *Socket) GetSndhwm() (int, error)
- func (soc *Socket) GetSndtimeo() (time.Duration, error)
- func (soc *Socket) GetSocksProxy() (string, error)
- func (soc *Socket) GetTcpKeepalive() (int, error)
- func (soc *Socket) GetTcpKeepaliveCnt() (int, error)
- func (soc *Socket) GetTcpKeepaliveIdle() (int, error)
- func (soc *Socket) GetTcpKeepaliveIntvl() (int, error)
- func (soc *Socket) GetTos() (int, error)
- func (soc *Socket) GetType() (Type, error)
- func (soc *Socket) GetZapDomain() (string, error)
- func (soc *Socket) Monitor(addr string, events Event) error
- func (soc *Socket) Recv(flags Flag) (string, error)
- func (soc *Socket) RecvBytes(flags Flag) ([]byte, error)
- func (soc *Socket) RecvBytesWithMetadata(flags Flag, properties ...string) (msg []byte, metadata map[string]string, err error)
- func (soc *Socket) RecvEvent(flags Flag) (event_type Event, addr string, value int, err error)
- func (soc *Socket) RecvMessage(flags Flag) (msg []string, err error)
- func (soc *Socket) RecvMessageBytes(flags Flag) (msg [][]byte, err error)
- func (soc *Socket) RecvMessageBytesWithMetadata(flags Flag, properties ...string) (msg [][]byte, metadata map[string]string, err error)
- func (soc *Socket) RecvMessageWithMetadata(flags Flag, properties ...string) (msg []string, metadata map[string]string, err error)
- func (soc *Socket) RecvWithMetadata(flags Flag, properties ...string) (msg string, metadata map[string]string, err error)
- func (soc *Socket) Send(data string, flags Flag) (int, error)
- func (soc *Socket) SendBytes(data []byte, flags Flag) (int, error)
- func (soc *Socket) SendMessage(parts ...interface{}) (total int, err error)
- func (soc *Socket) SendMessageDontwait(parts ...interface{}) (total int, err error)
- func (server *Socket) ServerAuthCurve(domain, secret_key string) error
- func (server *Socket) ServerAuthNull(domain string) error
- func (server *Socket) ServerAuthPlain(domain string) error
- func (soc *Socket) SetAffinity(value uint64) error
- func (soc *Socket) SetBacklog(value int) error
- func (soc *Socket) SetConflate(value bool) error
- func (soc *Socket) SetConnectRid(value string) error
- func (soc *Socket) SetCurvePublickey(key string) error
- func (soc *Socket) SetCurveSecretkey(key string) error
- func (soc *Socket) SetCurveServer(value int) error
- func (soc *Socket) SetCurveServerkey(key string) error
- func (soc *Socket) SetGssapiPlaintext(value bool) error
- func (soc *Socket) SetGssapiPrincipal(value string) error
- func (soc *Socket) SetGssapiServer(value bool) error
- func (soc *Socket) SetGssapiServicePrincipal(value string) error
- func (soc *Socket) SetHandshakeIvl(value time.Duration) error
- func (soc *Socket) SetIdentity(value string) error
- func (soc *Socket) SetImmediate(value bool) error
- func (soc *Socket) SetIpv6(value bool) error
- func (soc *Socket) SetLinger(value time.Duration) error
- func (soc *Socket) SetMaxmsgsize(value int64) error
- func (soc *Socket) SetMulticastHops(value int) error
- func (soc *Socket) SetPlainPassword(password string) error
- func (soc *Socket) SetPlainServer(value int) error
- func (soc *Socket) SetPlainUsername(username string) error
- func (soc *Socket) SetProbeRouter(value int) error
- func (soc *Socket) SetRate(value int) error
- func (soc *Socket) SetRcvbuf(value int) error
- func (soc *Socket) SetRcvhwm(value int) error
- func (soc *Socket) SetRcvtimeo(value time.Duration) error
- func (soc *Socket) SetReconnectIvl(value time.Duration) error
- func (soc *Socket) SetReconnectIvlMax(value time.Duration) error
- func (soc *Socket) SetRecoveryIvl(value time.Duration) error
- func (soc *Socket) SetReqCorrelate(value int) error
- func (soc *Socket) SetReqRelaxed(value int) error
- func (soc *Socket) SetRouterHandover(value bool) error
- func (soc *Socket) SetRouterMandatory(value int) error
- func (soc *Socket) SetRouterRaw(value int) error
- func (soc *Socket) SetSndbuf(value int) error
- func (soc *Socket) SetSndhwm(value int) error
- func (soc *Socket) SetSndtimeo(value time.Duration) error
- func (soc *Socket) SetSocksProxy(value string) error
- func (soc *Socket) SetSubscribe(filter string) error
- func (soc *Socket) SetTcpAcceptFilter(filter string) error
- func (soc *Socket) SetTcpKeepalive(value int) error
- func (soc *Socket) SetTcpKeepaliveCnt(value int) error
- func (soc *Socket) SetTcpKeepaliveIdle(value int) error
- func (soc *Socket) SetTcpKeepaliveIntvl(value int) error
- func (soc *Socket) SetTos(value int) error
- func (soc *Socket) SetUnsubscribe(filter string) error
- func (soc *Socket) SetXpubNodrop(value bool) error
- func (soc *Socket) SetXpubVerbose(value int) error
- func (soc *Socket) SetZapDomain(domain string) error
- func (soc Socket) String() string
- func (soc *Socket) Unbind(endpoint string) error
- type State
- type Type
Examples ¶
- Package (Multiple_contexts)
- Package (Socket_event)
- Package (Test_abstract_ipc)
- Package (Test_conflate)
- Package (Test_connect_resolve)
- Package (Test_ctx_options)
- Package (Test_disconnect_inproc)
- Package (Test_fork)
- Package (Test_hwm)
- Package (Test_pair_ipc)
- Package (Test_pair_tcp)
- Package (Test_remote_endpoint)
- Package (Test_security_curve)
- Package (Test_security_null)
- Package (Test_security_plain)
- Package (Version)
- AuthStart
Constants ¶
const ( // On Windows platform some of the standard POSIX errnos are not defined. EADDRINUSE = Errno(C.EADDRINUSE) EADDRNOTAVAIL = Errno(C.EADDRNOTAVAIL) EAFNOSUPPORT = Errno(C.EAFNOSUPPORT) ECONNABORTED = Errno(C.ECONNABORTED) ECONNREFUSED = Errno(C.ECONNREFUSED) ECONNRESET = Errno(C.ECONNRESET) EHOSTUNREACH = Errno(C.EHOSTUNREACH) EINPROGRESS = Errno(C.EINPROGRESS) EMSGSIZE = Errno(C.EMSGSIZE) ENETDOWN = Errno(C.ENETDOWN) ENETRESET = Errno(C.ENETRESET) ENETUNREACH = Errno(C.ENETUNREACH) ENOBUFS = Errno(C.ENOBUFS) ENOTCONN = Errno(C.ENOTCONN) ENOTSOCK = Errno(C.ENOTSOCK) ENOTSUP = Errno(C.ENOTSUP) EPROTONOSUPPORT = Errno(C.EPROTONOSUPPORT) ETIMEDOUT = Errno(C.ETIMEDOUT) // Native 0MQ error codes. EFSM = Errno(C.EFSM) EMTHREAD = Errno(C.EMTHREAD) ENOCOMPATPROTO = Errno(C.ENOCOMPATPROTO) ETERM = Errno(C.ETERM) )
const ( MaxSocketsDflt = int(C.ZMQ_MAX_SOCKETS_DFLT) IoThreadsDflt = int(C.ZMQ_IO_THREADS_DFLT) )
const ( // Constants for NewSocket() // See: http://api.zeromq.org/4-0:zmq-socket#toc3 REQ = Type(C.ZMQ_REQ) REP = Type(C.ZMQ_REP) DEALER = Type(C.ZMQ_DEALER) ROUTER = Type(C.ZMQ_ROUTER) PUB = Type(C.ZMQ_PUB) SUB = Type(C.ZMQ_SUB) XPUB = Type(C.ZMQ_XPUB) XSUB = Type(C.ZMQ_XSUB) PUSH = Type(C.ZMQ_PUSH) PULL = Type(C.ZMQ_PULL) PAIR = Type(C.ZMQ_PAIR) STREAM = Type(C.ZMQ_STREAM) )
const ( // Flags for (*Socket)Send(), (*Socket)Recv() // For Send, see: http://api.zeromq.org/4-0:zmq-send#toc2 // For Recv, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2 DONTWAIT = Flag(C.ZMQ_DONTWAIT) SNDMORE = Flag(C.ZMQ_SNDMORE) )
const ( // Flags for (*Socket)Monitor() and (*Socket)RecvEvent() // See: http://api.zeromq.org/4-0:zmq-socket-monitor#toc3 EVENT_ALL = Event(C.ZMQ_EVENT_ALL) EVENT_CONNECTED = Event(C.ZMQ_EVENT_CONNECTED) EVENT_CONNECT_DELAYED = Event(C.ZMQ_EVENT_CONNECT_DELAYED) EVENT_CONNECT_RETRIED = Event(C.ZMQ_EVENT_CONNECT_RETRIED) EVENT_LISTENING = Event(C.ZMQ_EVENT_LISTENING) EVENT_BIND_FAILED = Event(C.ZMQ_EVENT_BIND_FAILED) EVENT_ACCEPTED = Event(C.ZMQ_EVENT_ACCEPTED) EVENT_ACCEPT_FAILED = Event(C.ZMQ_EVENT_ACCEPT_FAILED) EVENT_CLOSED = Event(C.ZMQ_EVENT_CLOSED) EVENT_CLOSE_FAILED = Event(C.ZMQ_EVENT_CLOSE_FAILED) EVENT_DISCONNECTED = Event(C.ZMQ_EVENT_DISCONNECTED) EVENT_MONITOR_STOPPED = Event(C.ZMQ_EVENT_MONITOR_STOPPED) )
const ( // Flags for (*Socket)GetEvents() // See: http://api.zeromq.org/4-0:zmq-getsockopt#toc25 POLLIN = State(C.ZMQ_POLLIN) POLLOUT = State(C.ZMQ_POLLOUT) )
const ( // Constants for (*Socket)GetMechanism() // See: http://api.zeromq.org/4-0:zmq-getsockopt#toc31 NULL = Mechanism(C.ZMQ_NULL) PLAIN = Mechanism(C.ZMQ_PLAIN) CURVE = Mechanism(C.ZMQ_CURVE) GSSAPI = Mechanism(C.ZMQ_GSSAPI) )
const CURVE_ALLOW_ANY = "*"
Variables ¶
var ( ErrorContextClosed = errors.New("Context is closed") ErrorSocketClosed = errors.New("Socket is closed") ErrorMoreExpected = errors.New("More expected") ErrorNotImplemented405 = errors.New("Not implemented, requires 0MQ version 4.0.5") ErrorNotImplemented41 = errors.New("Not implemented, requires 0MQ version 4.1") )
Functions ¶
func AuthAllow ¶
Allow (whitelist) some addresses for a domain.
An address can be a single IP address, or an IP address and mask in CIDR notation.
For NULL, all clients from these addresses will be accepted.
For PLAIN and CURVE, they will be allowed to continue with authentication.
You can call this method multiple times to whitelist multiple IP addresses.
If you whitelist a single address for a domain, any non-whitelisted addresses for that domain are treated as blacklisted.
Use domain "*" for all domains.
For backward compatibility: if domain can be parsed as an IP address, it will be interpreted as another address, and it and all remaining addresses will be added to all domains.
func AuthCurveAdd ¶
Add public user keys for CURVE authentication for a given domain.
To cover all domains, use "*".
Public keys are in Z85 printable text format.
To allow all client keys without checking, specify CURVE_ALLOW_ANY for the key.
func AuthCurveRemove ¶
Remove user keys from CURVE authentication for a given domain.
func AuthCurveRemoveAll ¶
func AuthCurveRemoveAll(domain string)
Remove all user keys from CURVE authentication for a given domain.
func AuthDeny ¶
Deny (blacklist) some addresses for a domain.
An address can be a single IP address, or an IP address and mask in CIDR notation.
For all security mechanisms, this rejects the connection without any further authentication.
Use either a whitelist for a domain, or a blacklist for a domain, not both. If you define both a whitelist and a blacklist for a domain, only the whitelist takes effect.
Use domain "*" for all domains.
For backward compatibility: if domain can be parsed as an IP address, it will be interpreted as another address, and it and all remaining addresses will be added to all domains.
func AuthMetaBlob ¶
This encodes a key/value pair into the format used by a ZAP handler.
Returns an error if key is more then 255 characters long.
func AuthPlainAdd ¶
func AuthPlainAdd(domain, username, password string)
Add a user for PLAIN authentication for a given domain.
Set `domain` to "*" to apply to all domains.
func AuthPlainRemove ¶
Remove users from PLAIN authentication for a given domain.
func AuthPlainRemoveAll ¶
func AuthPlainRemoveAll(domain string)
Remove all users from PLAIN authentication for a given domain.
func AuthSetMetadataHandler ¶
func AuthSetMetadataHandler( handler func( version, request_id, domain, address, identity, mechanism string, credentials ...string) (metadata map[string]string))
This function sets the metadata handler that is called by the ZAP handler to retrieve key/value properties that should be set on reply messages in case of a status code "200" (succes).
Default properties are `Socket-Type`, which is already set, and `Identity` and `User-Id` that are empty by default. The last two can be set, and more properties can be added.
The `User-Id` property is used for the `user id` frame of the reply message. All other properties are stored in the `metadata` frame of the reply message.
The default handler returns an empty map.
For the meaning of the handler arguments, and other details, see: http://rfc.zeromq.org/spec:27#toc10
func AuthSetVerbose ¶
func AuthSetVerbose(verbose bool)
Enable verbose tracing of commands and activity.
func AuthStart ¶
func AuthStart() (err error)
Start authentication.
Note that until you add policies, all incoming NULL connections are allowed (classic ZeroMQ behaviour), and all PLAIN and CURVE connections are denied.
Example ¶
package main import ( zmq "github.com/pebbe/zmq4" "fmt" "log" ) func main() { checkErr := func(err error) bool { if err == nil { return false } log.Println(err) return true } zmq.AuthSetVerbose(false) // Start authentication engine err := zmq.AuthStart() if checkErr(err) { return } defer zmq.AuthStop() zmq.AuthSetMetadataHandler( func(version, request_id, domain, address, identity, mechanism string, credentials ...string) (metadata map[string]string) { return map[string]string{ "Identity": identity, "User-Id": "anonymous", "Hello": "World!", "Foo": "Bar", } }) zmq.AuthAllow("domain1", "127.0.0.1") // We need two certificates, one for the client and one for // the server. The client must know the server's public key // to make a CURVE connection. client_public, client_secret, err := zmq.NewCurveKeypair() if checkErr(err) { return } server_public, server_secret, err := zmq.NewCurveKeypair() if checkErr(err) { return } // Tell authenticator to use this public client key zmq.AuthCurveAdd("domain1", client_public) // Create and bind server socket server, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } defer server.Close() server.SetIdentity("Server1") server.ServerAuthCurve("domain1", server_secret) err = server.Bind("tcp://*:9000") if checkErr(err) { return } // Create and connect client socket client, err := zmq.NewSocket(zmq.DEALER) if checkErr(err) { return } defer client.Close() server.SetIdentity("Client1") client.ClientAuthCurve(server_public, client_public, client_secret) err = client.Connect("tcp://127.0.0.1:9000") if checkErr(err) { return } // Send a message from client to server _, err = client.SendMessage("Greatings", "Earthlings!") if checkErr(err) { return } // Receive message and metadata on the server keys := []string{"Identity", "User-Id", "Socket-Type", "Hello", "Foo", "Fuz"} message, metadata, err := server.RecvMessageWithMetadata(0, keys...) if checkErr(err) { return } fmt.Println(message) if _, minor, _ := zmq.Version(); minor < 1 { // Metadata requires at least ZeroMQ version 4.1 fmt.Println(`Identity: "Server1" true`) fmt.Println(`User-Id: "anonymous" true`) fmt.Println(`Socket-Type: "DEALER" true`) fmt.Println(`Hello: "World!" true`) fmt.Println(`Foo: "Bar" true`) fmt.Println(`Fuz: "" false`) } else { for _, key := range keys { value, ok := metadata[key] fmt.Printf("%v: %q %v\n", key, value, ok) } } }
Output: [Greatings Earthlings!] Identity: "Server1" true User-Id: "anonymous" true Socket-Type: "DEALER" true Hello: "World!" true Foo: "Bar" true Fuz: "" false
func GetIoThreads ¶
Returns the size of the 0MQ thread pool in the default context.
func GetMaxSockets ¶
Returns the maximum number of sockets allowed in the default context.
func HasCurve ¶
func HasCurve() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the CURVE security mechanism
func HasGssapi ¶
func HasGssapi() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the GSSAPI security mechanism
func HasIpc ¶
func HasIpc() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the ipc:// protocol
func HasNorm ¶
func HasNorm() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the norm:// protocol
func HasPgm ¶
func HasPgm() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the pgm:// protocol
func HasTipc ¶
func HasTipc() bool
Returns false for ZeroMQ version < 4.1.0
Else: returns true if the library supports the tipc:// protocol
func NewCurveKeypair ¶
Generate a new CURVE keypair
func ProxySteerable ¶
Start built-in ØMQ proxy with PAUSE/RESUME/TERMINATE control flow
Returns ErrorNotImplemented405 with ZeroMQ version < 4.0.5
func SetIoThreads ¶
Specifies the size of the 0MQ thread pool to handle I/O operations in the default context. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one. This option only applies before creating any sockets.
Default value 1
func SetIpv6 ¶
Sets the IPv6 value for all sockets created in the default context from this point onwards. A value of true means IPv6 is enabled, while false means the socket will use only IPv4. When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
Default value false
func SetMaxSockets ¶
Sets the maximum number of sockets allowed in the default context.
Default value 1024
func Term ¶
func Term() error
Terminates the default context.
For linger behavior, see: http://api.zeromq.org/4-0:zmq-ctx-term
Types ¶
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
A context that is not the default context.
func (*Context) GetIoThreads ¶
Returns the size of the 0MQ thread pool.
func (*Context) GetMaxSockets ¶
Returns the maximum number of sockets allowed.
func (*Context) NewSocket ¶
Create 0MQ socket in the given context.
WARNING: The Socket is not thread safe. This means that you cannot access the same Socket from different goroutines without using something like a mutex.
For a description of socket types, see: http://api.zeromq.org/4-0:zmq-socket#toc3
func (*Context) SetIoThreads ¶
Specifies the size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one. This option only applies before creating any sockets.
Default value 1
func (*Context) SetIpv6 ¶
Sets the IPv6 value for all sockets created in the context from this point onwards. A value of true means IPv6 is enabled, while false means the socket will use only IPv4. When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
Default value false
func (*Context) Term ¶
Terminates the context.
For linger behavior, see: http://api.zeromq.org/4-0:zmq-ctx-term
type Errno ¶
type Errno uintptr
An Errno is an unsigned number describing an error condition as returned by a call to ZeroMQ. It implements the error interface. The number is either a standard system error, or an error defined by the C library of ZeroMQ.
type Mechanism ¶
type Mechanism int
Specifies the security mechanism, used by (*Socket)GetMechanism()
type Polled ¶
type Polled struct { Socket *Socket // socket with matched event(s) Events State // actual matched event(s) }
Return type for (*Poller)Poll
type Poller ¶
type Poller struct {
// contains filtered or unexported fields
}
func (*Poller) Poll ¶
Input/output multiplexing
If timeout < 0, wait forever until a matching event is detected
Only sockets with matching socket events are returned in the list.
Example:
poller := zmq.NewPoller() poller.Add(socket0, zmq.POLLIN) poller.Add(socket1, zmq.POLLIN) // Process messages from both sockets for { sockets, _ := poller.Poll(-1) for _, socket := range sockets { switch s := socket.Socket; s { case socket0: msg, _ := s.Recv(0) // Process msg case socket1: msg, _ := s.Recv(0) // Process msg } } }
func (*Poller) PollAll ¶
This is like (*Poller)Poll, but it returns a list of all sockets, in the same order as they were added to the poller, not just those sockets that had an event.
For each socket in the list, you have to check the Events field to see if there was actually an event.
When error is not nil, the return list contains no sockets.
type Reactor ¶
type Reactor struct {
// contains filtered or unexported fields
}
func NewReactor ¶
func NewReactor() *Reactor
Create a reactor to mix the handling of sockets and channels (timers or other channels).
Example:
reactor := zmq.NewReactor() reactor.AddSocket(socket1, zmq.POLLIN, socket1_handler) reactor.AddSocket(socket2, zmq.POLLIN, socket2_handler) reactor.AddChannelTime(time.Tick(time.Second), 1, ticker_handler) reactor.Run(time.Second)
func (*Reactor) AddChannel ¶
func (r *Reactor) AddChannel(ch <-chan interface{}, limit int, handler func(interface{}) error) (id uint64)
Add channel handler to the reactor.
Returns id of added handler, that can be used later to remove it.
If limit is positive, at most this many items will be handled in each run through the main loop, otherwise it will process as many items as possible.
The handler function receives the value received from the channel.
func (*Reactor) AddChannelTime ¶
func (r *Reactor) AddChannelTime(ch <-chan time.Time, limit int, handler func(interface{}) error) (id uint64)
This function wraps AddChannel, using a channel of type time.Time instead of type interface{}.
func (*Reactor) AddSocket ¶
Add socket handler to the reactor.
You can have only one handler per socket. Adding a second one will remove the first.
The handler receives the socket state as an argument: POLLIN, POLLOUT, or both.
func (*Reactor) RemoveChannel ¶
Remove a channel from the reactor.
Closed channels are removed automaticly.
func (*Reactor) RemoveSocket ¶
Remove a socket handler from the reactor.
func (*Reactor) Run ¶
Run the reactor.
The interval determines the time-out on the polling of sockets. Interval must be positive if there are channels. If there are no channels, you can set interval to -1.
The run alternates between polling/handling sockets (using the interval as timeout), and reading/handling channels. The reading of channels is without time-out: if there is no activity on any channel, the run continues to poll sockets immediately.
The run exits when any handler returns an error, returning that same error.
func (*Reactor) SetVerbose ¶
type Socket ¶
type Socket struct {
// contains filtered or unexported fields
}
Socket functions starting with `Set` or `Get` are used for setting and getting socket options.
func NewSocket ¶
Create 0MQ socket in the default context.
WARNING: The Socket is not thread safe. This means that you cannot access the same Socket from different goroutines without using something like a mutex.
For a description of socket types, see: http://api.zeromq.org/4-0:zmq-socket#toc3
func (*Socket) Bind ¶
Accept incoming connections on a socket.
For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-bind#toc2
func (*Socket) ClientAuthCurve ¶
func (client *Socket) ClientAuthCurve(server_public_key, client_public_key, client_secret_key string) error
Set CURVE client role.
func (*Socket) ClientAuthPlain ¶
Set PLAIN client role.
func (*Socket) Connect ¶
Create outgoing connection from socket.
For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-connect#toc2
func (*Socket) Disconnect ¶
Disconnect a socket.
For a description of endpoint, see: http://api.zeromq.org/4-0:zmq-connect#toc2
func (*Socket) GetAffinity ¶
ZMQ_AFFINITY: Retrieve I/O thread affinity
func (*Socket) GetBacklog ¶
ZMQ_BACKLOG: Retrieve maximum length of the queue of outstanding connections
func (*Socket) GetCurvePublickeyRaw ¶
ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key
func (*Socket) GetCurvePublickeykeyZ85 ¶
ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key
func (*Socket) GetCurveSecretkeyRaw ¶
ZMQ_CURVE_SECRETKEY: Retrieve current CURVE secret key
func (*Socket) GetCurveSecretkeyZ85 ¶
ZMQ_CURVE_SECRETKEY: Retrieve current CURVE secret key
func (*Socket) GetCurveServerkeyRaw ¶
ZMQ_CURVE_SERVERKEY: Retrieve current CURVE server key
func (*Socket) GetCurveServerkeyZ85 ¶
ZMQ_CURVE_SERVERKEY: Retrieve current CURVE server key
func (*Socket) GetGssapiPlaintext ¶
ZMQ_GSSAPI_PLAINTEXT: Retrieve GSSAPI plaintext or encrypted status
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetGssapiPrincipal ¶
ZMQ_GSSAPI_PRINCIPAL: Retrieve the name of the GSSAPI principal
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetGssapiServer ¶
ZMQ_GSSAPI_SERVER: Retrieve current GSSAPI server role
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetGssapiServicePrincipal ¶
ZMQ_GSSAPI_SERVICE_PRINCIPAL: Retrieve the name of the GSSAPI service principal
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetHandshakeIvlt ¶
ZMQ_HANDSHAKE_IVL: Retrieve maximum handshake interval
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetIdentity ¶
ZMQ_IDENTITY: Retrieve socket identity
func (*Socket) GetImmediate ¶
ZMQ_IMMEDIATE: Retrieve attach-on-connect value
func (*Socket) GetLastEndpoint ¶
ZMQ_LAST_ENDPOINT: Retrieve the last endpoint set
func (*Socket) GetLinger ¶
ZMQ_LINGER: Retrieve linger period for socket shutdown
Returns time.Duration(-1) for infinite ¶
func (*Socket) GetMaxmsgsize ¶
ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size
func (*Socket) GetMechanism ¶
ZMQ_MECHANISM: Retrieve current security mechanism
func (*Socket) GetMulticastHops ¶
ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets
func (*Socket) GetPlainPassword ¶
ZMQ_PLAIN_PASSWORD: Retrieve current password
func (*Socket) GetPlainServer ¶
ZMQ_PLAIN_SERVER: Retrieve current PLAIN server role
func (*Socket) GetPlainUsername ¶
ZMQ_PLAIN_USERNAME: Retrieve current PLAIN username
func (*Socket) GetRcvmore ¶
ZMQ_RCVMORE: More message data parts to follow
func (*Socket) GetRcvtimeo ¶
ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
Returns time.Duration(-1) for infinite ¶
func (*Socket) GetReconnectIvl ¶
ZMQ_RECONNECT_IVL: Retrieve reconnection interval
Returns time.Duration(-1) for no reconnection ¶
func (*Socket) GetReconnectIvlMax ¶
ZMQ_RECONNECT_IVL_MAX: Retrieve maximum reconnection interval
func (*Socket) GetRecoveryIvl ¶
ZMQ_RECOVERY_IVL: Get multicast recovery interval
func (*Socket) GetSndtimeo ¶
ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
Returns time.Duration(-1) for infinite ¶
func (*Socket) GetSocksProxy ¶
ZMQ_SOCKS_PROXY: NOT DOCUMENTED
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetTcpKeepalive ¶
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
func (*Socket) GetTcpKeepaliveCnt ¶
ZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option
func (*Socket) GetTcpKeepaliveIdle ¶
ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS)
func (*Socket) GetTcpKeepaliveIntvl ¶
ZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option
func (*Socket) GetTos ¶
ZMQ_TOS: Retrieve the Type-of-Service socket override status
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) GetZapDomain ¶
ZMQ_ZAP_DOMAIN: Retrieve RFC 27 authentication domain
func (*Socket) Monitor ¶
Register a monitoring callback.
See: http://api.zeromq.org/4-0:zmq-socket-monitor#toc2
WARNING: Closing a context with a monitoring callback will lead to random crashes. This is a bug in the ZeroMQ library. The monitoring callback has the same context as the socket it was created for.
Example:
package main import ( zmq "github.com/pebbe/zmq4" "log" "time" ) func rep_socket_monitor(addr string) { s, err := zmq.NewSocket(zmq.PAIR) if err != nil { log.Fatalln(err) } err = s.Connect(addr) if err != nil { log.Fatalln(err) } for { a, b, c, err := s.RecvEvent(0) if err != nil { log.Println(err) break } log.Println(a, b, c) } s.Close() } func main() { // REP socket rep, err := zmq.NewSocket(zmq.REP) if err != nil { log.Fatalln(err) } // REP socket monitor, all events err = rep.Monitor("inproc://monitor.rep", zmq.EVENT_ALL) if err != nil { log.Fatalln(err) } go rep_socket_monitor("inproc://monitor.rep") // Generate an event rep.Bind("tcp://*:5555") if err != nil { log.Fatalln(err) } // Allow some time for event detection time.Sleep(time.Second) rep.Close() zmq.Term() }
func (*Socket) Recv ¶
Receive a message part from a socket.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
func (*Socket) RecvBytes ¶
Receive a message part from a socket.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
func (*Socket) RecvBytesWithMetadata ¶
func (soc *Socket) RecvBytesWithMetadata(flags Flag, properties ...string) (msg []byte, metadata map[string]string, err error)
Receive a message part with metadata.
This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.
The returned metadata map contains only those properties that exist on the message.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3
func (*Socket) RecvEvent ¶
Receive a message part from a socket interpreted as an event.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
For a description of event_type, see: http://api.zeromq.org/4-0:zmq-socket-monitor#toc3
For an example, see: func (*Socket) Monitor
func (*Socket) RecvMessageBytes ¶
Receive parts as message from socket.
Returns last non-nil error code.
func (*Socket) RecvMessageBytesWithMetadata ¶
func (soc *Socket) RecvMessageBytesWithMetadata(flags Flag, properties ...string) (msg [][]byte, metadata map[string]string, err error)
Receive parts as message from socket, including metadata.
Metadata is picked from the first message part.
For details about metadata, see RecvBytesWithMetadata().
Returns last non-nil error code.
func (*Socket) RecvMessageWithMetadata ¶
func (soc *Socket) RecvMessageWithMetadata(flags Flag, properties ...string) (msg []string, metadata map[string]string, err error)
Receive parts as message from socket, including metadata.
Metadata is picked from the first message part.
For details about metadata, see RecvWithMetadata().
Returns last non-nil error code.
func (*Socket) RecvWithMetadata ¶
func (soc *Socket) RecvWithMetadata(flags Flag, properties ...string) (msg string, metadata map[string]string, err error)
Receive a message part with metadata.
This requires ZeroMQ version 4.1.0. Lower versions will return the message part without metadata.
The returned metadata map contains only those properties that exist on the message.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-msg-recv#toc2
For a description of metadata, see: http://api.zeromq.org/4-1:zmq-msg-gets#toc3
func (*Socket) Send ¶
Send a message part on a socket.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-send#toc2
func (*Socket) SendBytes ¶
Send a message part on a socket.
For a description of flags, see: http://api.zeromq.org/4-0:zmq-send#toc2
func (*Socket) SendMessage ¶
Send multi-part message on socket.
Any `[]string' or `[][]byte' is split into separate `string's or `[]byte's
Any other part that isn't a `string' or `[]byte' is converted to `string' with `fmt.Sprintf("%v", part)'.
Returns total bytes sent.
func (*Socket) SendMessageDontwait ¶
Like SendMessage(), but adding the DONTWAIT flag.
func (*Socket) ServerAuthCurve ¶
Set CURVE server role.
func (*Socket) ServerAuthNull ¶
Set NULL server role.
func (*Socket) ServerAuthPlain ¶
Set PLAIN server role.
func (*Socket) SetAffinity ¶
ZMQ_AFFINITY: Set I/O thread affinity
func (*Socket) SetBacklog ¶
ZMQ_BACKLOG: Set maximum length of the queue of outstanding connections
func (*Socket) SetConflate ¶
ZMQ_CONFLATE: Keep only last message
func (*Socket) SetConnectRid ¶
ZMQ_CONNECT_RID: Assign the next outbound connection id
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetCurvePublickey ¶
ZMQ_CURVE_PUBLICKEY: Set CURVE public key
func (*Socket) SetCurveSecretkey ¶
ZMQ_CURVE_SECRETKEY: Set CURVE secret key
func (*Socket) SetCurveServer ¶
ZMQ_CURVE_SERVER: Set CURVE server role
func (*Socket) SetCurveServerkey ¶
ZMQ_CURVE_SERVERKEY: Set CURVE server key
func (*Socket) SetGssapiPlaintext ¶
ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetGssapiPrincipal ¶
ZMQ_GSSAPI_PRINCIPAL: Set name of GSSAPI principal
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetGssapiServer ¶
ZMQ_GSSAPI_SERVER: Set GSSAPI server role
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetGssapiServicePrincipal ¶
ZMQ_GSSAPI_SERVICE_PRINCIPAL: Set name of GSSAPI service principal
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetHandshakeIvl ¶
ZMQ_HANDSHAKE_IVL: Set maximum handshake interval
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetIdentity ¶
ZMQ_IDENTITY: Set socket identity
func (*Socket) SetImmediate ¶
ZMQ_IMMEDIATE: Queue messages only to completed connections
func (*Socket) SetMaxmsgsize ¶
ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size
func (*Socket) SetMulticastHops ¶
ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets
func (*Socket) SetPlainPassword ¶
ZMQ_PLAIN_PASSWORD: Set PLAIN security password
func (*Socket) SetPlainServer ¶
ZMQ_PLAIN_SERVER: Set PLAIN server role
func (*Socket) SetPlainUsername ¶
ZMQ_PLAIN_USERNAME: Set PLAIN security username
func (*Socket) SetProbeRouter ¶
ZMQ_PROBE_ROUTER: bootstrap connections to ROUTER sockets
func (*Socket) SetRcvtimeo ¶
ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
Use -1 for infinite ¶
func (*Socket) SetReconnectIvl ¶
ZMQ_RECONNECT_IVL: Set reconnection interval
Use -1 for no reconnection ¶
func (*Socket) SetReconnectIvlMax ¶
ZMQ_RECONNECT_IVL_MAX: Set maximum reconnection interval
func (*Socket) SetRecoveryIvl ¶
ZMQ_RECOVERY_IVL: Set multicast recovery interval
func (*Socket) SetReqCorrelate ¶
ZMQ_REQ_CORRELATE: match replies with requests
func (*Socket) SetReqRelaxed ¶
ZMQ_REQ_RELAXED: relax strict alternation between request and reply
func (*Socket) SetRouterHandover ¶
ZMQ_ROUTER_HANDOVER: handle duplicate client identities on ROUTER sockets
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetRouterMandatory ¶
ZMQ_ROUTER_MANDATORY: accept only routable messages on ROUTER sockets
func (*Socket) SetRouterRaw ¶
ZMQ_ROUTER_RAW: switch ROUTER socket to raw mode
This option is deprecated, please use ZMQ_STREAM sockets instead.
func (*Socket) SetSndtimeo ¶
ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
Use -1 for infinite ¶
func (*Socket) SetSocksProxy ¶
ZMQ_SOCKS_PROXY: NOT DOCUMENTED
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetSubscribe ¶
ZMQ_SUBSCRIBE: Establish message filter
func (*Socket) SetTcpAcceptFilter ¶
ZMQ_TCP_ACCEPT_FILTER: Assign filters to allow new TCP connections
This option is deprecated, please use authentication via the ZAP API and IP address whitelisting / blacklisting.
func (*Socket) SetTcpKeepalive ¶
ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option
func (*Socket) SetTcpKeepaliveCnt ¶
ZMQ_TCP_KEEPALIVE_CNT: Override TCP_KEEPCNT socket option
func (*Socket) SetTcpKeepaliveIdle ¶
ZMQ_TCP_KEEPALIVE_IDLE: Override TCP_KEEPCNT(or TCP_KEEPALIVE on some OS)
func (*Socket) SetTcpKeepaliveIntvl ¶
ZMQ_TCP_KEEPALIVE_INTVL: Override TCP_KEEPINTVL socket option
func (*Socket) SetTos ¶
ZMQ_TOS: Set the Type-of-Service on socket
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetUnsubscribe ¶
ZMQ_UNSUBSCRIBE: Remove message filter
func (*Socket) SetXpubNodrop ¶
ZMQ_XPUB_NODROP: NOT DOCUMENTED
Returns ErrorNotImplemented41 with ZeroMQ version < 4.1
func (*Socket) SetXpubVerbose ¶
ZMQ_XPUB_VERBOSE: provide all subscription messages on XPUB sockets
func (*Socket) SetZapDomain ¶
ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
bstar
bstar - Binary Star reactor.
|
bstar - Binary Star reactor. |
clone
Clone client API stack (multithreaded).
|
Clone client API stack (multithreaded). |
flcliapi
flcliapi - Freelance Pattern agent class.
|
flcliapi - Freelance Pattern agent class. |
intface
Interface class for Chapter 8.
|
Interface class for Chapter 8. |
kvmsg
kvmsg class - key-value message class for example applications
|
kvmsg class - key-value message class for example applications |
kvsimple
kvsimple - simple key-value message class for example applications.
|
kvsimple - simple key-value message class for example applications. |
mdapi
Majordomo Protocol Client and Worker API.
|
Majordomo Protocol Client and Worker API. |