Documentation ¶
Overview ¶
Package mq implements interprocess queues logic. It provides access to system mq mechanisms, such as sysv mq and linux mq. Also, it provides access to multi-platform priority queue, FastMq.
Index ¶
- Constants
- func Destroy(name string) error
- func DestroyFastMq(name string) error
- func DestroyLinuxMessageQueue(name string) error
- func DestroySystemVMessageQueue(name string) error
- func FastMqAttrs(name string) (int, int, error)
- func IsTemporary(err error) bool
- func SetLinuxMqBlocking(name string, block bool) error
- type Blocker
- type Buffered
- type FastMq
- func (mq *FastMq) Cap() int
- func (mq *FastMq) Close() error
- func (mq *FastMq) Destroy() error
- func (mq *FastMq) Empty() bool
- func (mq *FastMq) Full() bool
- func (mq *FastMq) Receive(data []byte) (int, error)
- func (mq *FastMq) ReceivePriority(data []byte) (int, int, error)
- func (mq *FastMq) ReceivePriorityTimeout(data []byte, timeout time.Duration) (int, int, error)
- func (mq *FastMq) ReceiveTimeout(data []byte, timeout time.Duration) (int, error)
- func (mq *FastMq) Send(data []byte) error
- func (mq *FastMq) SendPriority(data []byte, prio int) error
- func (mq *FastMq) SendPriorityTimeout(data []byte, prio int, timeout time.Duration) error
- func (mq *FastMq) SendTimeout(data []byte, timeout time.Duration) error
- func (mq *FastMq) SetBlocking(block bool) error
- type LinuxMessageQueue
- func (mq *LinuxMessageQueue) Cap() int
- func (mq *LinuxMessageQueue) Close() error
- func (mq *LinuxMessageQueue) Destroy() error
- func (mq *LinuxMessageQueue) ID() int
- func (mq *LinuxMessageQueue) Notify(ch chan<- int) error
- func (mq *LinuxMessageQueue) NotifyCancel() error
- func (mq *LinuxMessageQueue) Receive(data []byte) (int, error)
- func (mq *LinuxMessageQueue) ReceivePriority(data []byte) (int, int, error)
- func (mq *LinuxMessageQueue) ReceiveTimeout(data []byte, timeout time.Duration) (int, error)
- func (mq *LinuxMessageQueue) ReceiveTimeoutPriority(input []byte, timeout time.Duration) (int, int, error)
- func (mq *LinuxMessageQueue) Send(data []byte) error
- func (mq *LinuxMessageQueue) SendPriority(data []byte, prio int) error
- func (mq *LinuxMessageQueue) SendTimeout(data []byte, timeout time.Duration) error
- func (mq *LinuxMessageQueue) SendTimeoutPriority(data []byte, prio int, timeout time.Duration) error
- func (mq *LinuxMessageQueue) SetBlocking(block bool) error
- type Messenger
- type PriorityMessenger
- type SystemVMessageQueue
- type TimedMessenger
Examples ¶
Constants ¶
const ( // DefaultFastMqMaxSize is the default fast mq queue size. DefaultFastMqMaxSize = 8 // DefaultFastMqMessageSize is the fast mq message size. DefaultFastMqMessageSize = 8192 )
const ( // DefaultLinuxMqMaxSize is the default linux mq queue size. DefaultLinuxMqMaxSize = 8 // DefaultLinuxMqMessageSize is the linux mq message size. // Its max value can be set via procfs. DefaultLinuxMqMessageSize = 8192 )
const ( // O_NONBLOCK flag makes mq send/receive operations non-blocking. O_NONBLOCK = common.O_NONBLOCK )
Variables ¶
This section is empty.
Functions ¶
func DestroyFastMq ¶ added in v0.2.0
DestroyFastMq permanently removes a FastMq.
func DestroyLinuxMessageQueue ¶
DestroyLinuxMessageQueue removes the queue permanently.
func DestroySystemVMessageQueue ¶
DestroySystemVMessageQueue permanently removes queue with a given name.
func FastMqAttrs ¶ added in v0.3.0
FastMqAttrs returns capacity and max message size of the existing mq.
func IsTemporary ¶ added in v0.2.0
IsTemporary returns true, if an error is a timeout error.
func SetLinuxMqBlocking ¶
SetLinuxMqBlocking sets whether the operations on a linux mq block. This will apply for all send/receive operations on any instance of the linux mq with the given name.
Types ¶
type Blocker ¶ added in v0.2.0
Blocker is an object, which can work in blocking and non-blocking modes.
type Buffered ¶ added in v0.2.0
type Buffered interface {
Cap() int
}
Buffered is an object with internal buffer of the given capacity.
type FastMq ¶ added in v0.2.0
type FastMq struct {
// contains filtered or unexported fields
}
FastMq is a priority message queue based on shared memory. Currently it is the only implementation for windows.
func CreateFastMq ¶ added in v0.2.0
func CreateFastMq(name string, flag int, perm os.FileMode, maxQueueSize, maxMsgSize int) (*FastMq, error)
CreateFastMq creates new FastMq.
name - mq name. implementation will create a shm object with this name. flag - flag is a combination of os.O_EXCL, and O_NONBLOCK. perm - object's permission bits. maxQueueSize - queue capacity. maxMsgSize - maximum message size.
func OpenFastMq ¶ added in v0.2.0
OpenFastMq opens an existing message queue. It returns an error, if it does not exist.
name - unique mq name. flag - 0 or O_NONBLOCK.
func (*FastMq) Receive ¶ added in v0.2.0
Receive receives a message. It blocks if the queue is empty.
func (*FastMq) ReceivePriority ¶ added in v0.2.0
ReceivePriority receives a message and returns its priority. It blocks if the queue is empty.
func (*FastMq) ReceivePriorityTimeout ¶ added in v0.2.0
ReceivePriorityTimeout receives a message and returns its priority. It blocks if the queue is empty, waiting for not longer, then the timeout.
func (*FastMq) ReceiveTimeout ¶ added in v0.3.0
ReceiveTimeout receives a message. It blocks if the queue is empty. It blocks if the queue is empty, waiting for not longer, then the timeout.
func (*FastMq) SendPriority ¶ added in v0.2.0
SendPriority sends a message with the given priority. It blocks if the queue is full.
func (*FastMq) SendPriorityTimeout ¶ added in v0.2.0
SendPriorityTimeout sends a message with the given priority. It blocks if the queue is full, waiting for not longer, then the timeout.
func (*FastMq) SendTimeout ¶ added in v0.3.0
SendTimeout sends a message with the default priority 0. It blocks if the queue is full, waiting for not longer, then the timeout.
func (*FastMq) SetBlocking ¶ added in v0.2.0
SetBlocking sets whether the send/receive operations on the queue block. This applies to the current instance only.
type LinuxMessageQueue ¶
type LinuxMessageQueue struct {
// contains filtered or unexported fields
}
LinuxMessageQueue is a linux-specific ipc mechanism based on message passing.
func CreateLinuxMessageQueue ¶
func CreateLinuxMessageQueue(name string, flag int, perm os.FileMode, maxQueueSize, maxMsgSize int) (*LinuxMessageQueue, error)
CreateLinuxMessageQueue creates new queue with the given name and permissions.
name - unique mq name. flag - flag is a combination of os.O_EXCL and O_NONBLOCK. perm - object's permission bits. maxQueueSize - queue capacity. maxMsgSize - maximum message size.
func OpenLinuxMessageQueue ¶
func OpenLinuxMessageQueue(name string, flag int) (*LinuxMessageQueue, error)
OpenLinuxMessageQueue opens an existing message queue. It returns an error, if it does not exist.
name - unique mq name. flag - flag is a combination of (os.O_RDONLY or os.O_WRONLY or os.O_RDWR) and O_NONBLOCK. O_RDONLY Open the queue to receive messages only. O_WRONLY Open the queue to send messages only. O_RDWR Open the queue to both send and receive messages.
func (*LinuxMessageQueue) Cap ¶
func (mq *LinuxMessageQueue) Cap() int
Cap returns the size of the mq buffer.
func (*LinuxMessageQueue) Close ¶
func (mq *LinuxMessageQueue) Close() error
Close closes the queue.
func (*LinuxMessageQueue) Destroy ¶
func (mq *LinuxMessageQueue) Destroy() error
Destroy closes the queue and removes it permanently.
func (*LinuxMessageQueue) ID ¶
func (mq *LinuxMessageQueue) ID() int
ID returns unique id of the queue.
func (*LinuxMessageQueue) Notify ¶
func (mq *LinuxMessageQueue) Notify(ch chan<- int) error
Notify notifies about new messages in the queue by sending id of the queue to the channel. If there are messages in the queue, no notification will be sent unless all of them are read.
func (*LinuxMessageQueue) NotifyCancel ¶
func (mq *LinuxMessageQueue) NotifyCancel() error
NotifyCancel cancels notification subscription.
func (*LinuxMessageQueue) Receive ¶
func (mq *LinuxMessageQueue) Receive(data []byte) (int, error)
Receive receives a message. It blocks if the queue is empty. Returns message len.
func (*LinuxMessageQueue) ReceivePriority ¶
func (mq *LinuxMessageQueue) ReceivePriority(data []byte) (int, int, error)
ReceivePriority receives a message, returning its priority. It blocks if the queue is empty. Returns message len and priority.
func (*LinuxMessageQueue) ReceiveTimeout ¶
ReceiveTimeout receives a message. It blocks if the queue is empty, waiting for a message unless timeout is passed. Returns message len.
func (*LinuxMessageQueue) ReceiveTimeoutPriority ¶
func (mq *LinuxMessageQueue) ReceiveTimeoutPriority(input []byte, timeout time.Duration) (int, int, error)
ReceiveTimeoutPriority receives a message, returning its priority. It blocks if the queue is empty, waiting for a message unless timeout is passed. Returns message len and priority.
func (*LinuxMessageQueue) Send ¶
func (mq *LinuxMessageQueue) Send(data []byte) error
Send sends a message with a default (0) priority. It blocks if the queue is full.
func (*LinuxMessageQueue) SendPriority ¶
func (mq *LinuxMessageQueue) SendPriority(data []byte, prio int) error
SendPriority sends a message with a given priority. It blocks if the queue is full.
func (*LinuxMessageQueue) SendTimeout ¶
func (mq *LinuxMessageQueue) SendTimeout(data []byte, timeout time.Duration) error
SendTimeout sends a message with a default (0) priority. It blocks if the queue is full, waiting for a message unless timeout is passed.
func (*LinuxMessageQueue) SendTimeoutPriority ¶
func (mq *LinuxMessageQueue) SendTimeoutPriority(data []byte, prio int, timeout time.Duration) error
SendTimeoutPriority sends a message with a given priority. It blocks if the queue is full, waiting for a message unless timeout is passed.
func (*LinuxMessageQueue) SetBlocking ¶
func (mq *LinuxMessageQueue) SetBlocking(block bool) error
SetBlocking sets whether the send/receive operations on the queue block. This applies to the current instance only.
type Messenger ¶
type Messenger interface { // Send sends the data. It blocks if there are no readers and the queue is full Send(data []byte) error // Receive reads data from the queue. It blocks if the queue is empty. // Returns message len. Receive(data []byte) (int, error) io.Closer }
Messenger is an interface which must be satisfied by any message queue implementation on any platform.
Example ¶
Destroy("mq") mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { if err := mq.Send(data); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() received := make([]byte, len(data)) l, err := mq2.Receive(received) if err != nil { panic("receive") } if l != len(data) { panic("wrong len") } for i, b := range received { if b != data[i] { panic("wrong data") } }
Output:
func New ¶
New creates a mq with a given name and permissions. It uses the default implementation. If there are several implementations on a platform, you can use explicit create functions.
name - unique queue name. flag - create flags. You can specify: os.O_EXCL if you don't want to open a queue if it exists. O_NONBLOCK if you don't want to block on send/receive. This flag may not be supported by a particular implementation. To be sure, you can convert Messenger to Blocker and call SetBlocking to set/unset non-blocking mode. perm - permissions for the new queue.
type PriorityMessenger ¶ added in v0.2.0
type PriorityMessenger interface { Messenger Buffered // SendPriority sends the data. The message will be inserted in the mq according to its priority. SendPriority(data []byte, prio int) error // ReceivePriority reads a message and returns its len and priority. ReceivePriority(data []byte) (int, int, error) }
PriorityMessenger is a Messenger, which orders messages according to their priority. Semantic is similar to linux native mq: Messages are placed on the queue in decreasing order of priority, with newer messages of the same priority being placed after older messages with the same priority.
Example ¶
Destroy("mq") mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() // not all implementations support prioritized send/receive. tmq, ok := mq.(PriorityMessenger) if !ok { panic("not a prio messenger") } data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { if err := tmq.SendPriority(data, 0); err != nil { panic("send") } if err := tmq.SendPriority(data, 1); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() tmq2, ok := mq2.(PriorityMessenger) if !ok { panic("not a prio messenger") } received := make([]byte, len(data)) _, prio, err := tmq2.ReceivePriority(received) if err != nil || prio != 1 { panic("receive") } _, prio, err = tmq2.ReceivePriority(received) if err != nil || prio != 0 { panic("receive") }
Output:
type SystemVMessageQueue ¶
type SystemVMessageQueue struct {
// contains filtered or unexported fields
}
SystemVMessageQueue is a System V ipc mechanism based on message passing.
func CreateSystemVMessageQueue ¶
func CreateSystemVMessageQueue(name string, flag int, perm os.FileMode) (*SystemVMessageQueue, error)
CreateSystemVMessageQueue creates new queue with the given name and permissions.
name - unique mq name. flag - flag is a combination of os.O_EXCL and O_NONBLOCK. perm - object's permission bits.
func OpenSystemVMessageQueue ¶
func OpenSystemVMessageQueue(name string, flags int) (*SystemVMessageQueue, error)
OpenSystemVMessageQueue opens existing message queue.
name - unique mq name. flag - 0 and O_NONBLOCK.
func (*SystemVMessageQueue) Close ¶
func (mq *SystemVMessageQueue) Close() error
Close closes the queue. As there is no need to close SystemV mq, this function returns nil. It was added to satisfy io.Closer
func (*SystemVMessageQueue) Destroy ¶
func (mq *SystemVMessageQueue) Destroy() error
Destroy closes the queue and removes it permanently.
func (*SystemVMessageQueue) Receive ¶
func (mq *SystemVMessageQueue) Receive(data []byte) (int, error)
Receive receives a message. It blocks if the queue is empty.
func (*SystemVMessageQueue) Send ¶
func (mq *SystemVMessageQueue) Send(data []byte) error
Send sends a message. It blocks if the queue is full.
func (*SystemVMessageQueue) SetBlocking ¶
func (mq *SystemVMessageQueue) SetBlocking(block bool) error
SetBlocking sets whether the send/receive operations on the queue block.
type TimedMessenger ¶
type TimedMessenger interface { Messenger // SendTimeout sends the data. It blocks if there are no readers and the queue if full. // It waits for not more, than timeout. SendTimeout(data []byte, timeout time.Duration) error // ReceiveTimeout reads data from the queue. It blocks if the queue is empty. // It waits for not more, than timeout. Returns message len. ReceiveTimeout(data []byte, timeout time.Duration) (int, error) }
TimedMessenger is a Messenger, which supports send/receive timeouts. Passing 0 as a timeout makes a call non-blocking. Passing negative value as a timeout makes the timeout infinite.
Example ¶
Destroy("mq") mq, err := New("mq", os.O_CREATE|os.O_EXCL, 0666) if err != nil { panic("new queue") } defer mq.Close() data := []byte{1, 2, 3, 4, 5, 6, 7, 8} go func() { // send after [0..500] ms delay. time.Sleep(time.Duration((rand.Int() % 6)) * time.Millisecond * 100) if err := mq.Send(data); err != nil { panic("send") } }() mq2, err := Open("mq", 0) if err != nil { panic("open") } defer mq2.Close() // not all implementations support timed send/receive. tmq, ok := mq2.(TimedMessenger) if !ok { panic("not a timed messenger") } received := make([]byte, len(data)) // depending on send delay we either get a timeout error, or receive the data. l, err := tmq.ReceiveTimeout(received, 500*time.Millisecond) if err != nil { if !IsTemporary(err) { panic(err) } else { // handle timeout. return } } if l != len(data) { panic("wrong len") } for i, b := range received { if b != data[i] { panic("wrong data") } }
Output: