lasr

package module
v0.0.0-...-98b01a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2022 License: MIT Imports: 13 Imported by: 0

README

Build Status GoDoc

lasr

A persistent work queue backed by BoltDB. This queue is useful when the producers and consumers can live in the same process.

Project goals

  • Data integrity over performance.
  • Simplicity over complexity.
  • Ease of use.
  • Minimal feature set.

Safety

lasr is designed to never lose information. When the Send method completes, messages have been safely written to disk. On Receive, messages are not deleted until Ack is called. Users should make sure they always respond to messages with Ack or Nack.

Misc

Dead-lettering is supported, but disabled by default.

Benchmarks

On 5th Gen Lenovo X1 Carbon with 512 GB SSD:

$ hey -m POST -D main.go -h2 -cpus 2 -n 20000 -c 10 http://localhost:8080

Summary:
  Total:        1.8671 secs
  Slowest:      0.0112 secs
  Fastest:      0.0001 secs
  Average:      0.0009 secs
  Requests/sec: 10711.7919

Response time histogram:
  0.000 [1]     |
  0.001 [14044] |∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  0.002 [5030]  |∎∎∎∎∎∎∎∎∎∎∎∎∎∎
  0.003 [709]   |∎∎
  0.005 [139]   |
  0.006 [36]    |
  0.007 [19]    |
  0.008 [8]     |
  0.009 [7]     |
  0.010 [3]     |
  0.011 [4]     |

Latency distribution:
  10% in 0.0001 secs
  25% in 0.0003 secs
  50% in 0.0008 secs
  75% in 0.0013 secs
  90% in 0.0018 secs
  95% in 0.0022 secs
  99% in 0.0034 secs

Details (average, fastest, slowest):
  DNS+dialup: 0.0000 secs, 0.0000 secs, 0.0056 secs
  DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0020 secs
  req write:  0.0000 secs, 0.0000 secs, 0.0042 secs
  resp wait:  0.0009 secs, 0.0000 secs, 0.0098 secs
  resp read:  0.0000 secs, 0.0000 secs, 0.0038 secs

Status code distribution:
  [200]	20000 responses

Documentation

Overview

Package lasr implements a persistent message queue backed by BoltDB. This queue is useful when the producers and consumers can live in the same process.

lasr is designed to never lose information. When the Send method completes, messages have been safely written to disk. On Receive, messages are not deleted until Ack is called. Users should make sure they always respond to messages with Ack or Nack.

Dead-lettering is supported, but disabled by default.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrAckNack is returned by Ack and Nack when of them has been called already.
	ErrAckNack = errors.New("lasr: Ack or Nack already called")

	// ErrQClosed is returned by Send, Receive and Close when the Q has already
	// been closed.
	ErrQClosed = errors.New("lasr: Q is closed")

	// ErrOptionsApplied is called when an Option is applied to a Q after NewQ
	// has already returned.
	ErrOptionsApplied = errors.New("lasr: options cannot be applied after New")
)
View Source
var (
	// MaxDelayTime is the maximum time that can be passed to Q.Delay().
	MaxDelayTime = time.Unix(0, 1<<63-1)
)

Functions

This section is empty.

Types

type ID

type ID interface {
	encoding.BinaryMarshaler
}

ID is used for uniquely identifying messages in a Q.

type Message

type Message struct {
	Body []byte
	ID   []byte
	// contains filtered or unexported fields
}

Message is a messaged returned from Q on Receive.

Message contains a Body and an ID. The ID will be equal to the ID that was returned on Send, Delay or Wait for this message.

func (*Message) Ack

func (m *Message) Ack() (err error)

Ack acknowledges successful receipt and processing of the Message.

func (*Message) Nack

func (m *Message) Nack(retry bool) (err error)

Nack negatively acknowledges successful receipt and processing of the Message. If Nack is called with retry True, then the Message will be placed back in the queue in its original position.

type Option

type Option func(q *Q) error

Options can be passed to NewQ.

func WithDeadLetters

func WithDeadLetters() Option

WithDeadLetters will cause nacked messages that are not retried to be added to a dead letters queue.

func WithMessageBufferSize

func WithMessageBufferSize(size int) Option

WithMessageBufferSize sets the message buffer size. By default, the message buffer size is 0. Values less than 0 are not allowed.

The buffer is used by Receive to efficiently ready messages for consumption. If the buffer is greater than 0, then multiple messages can retrieved in a single transaction.

Buffered messages come with a caveat: messages will move into the "unacked" state before Receive is called.

Buffered messages come at the cost of increased memory use. If messages are large in size, use this cautiously.

func WithSequencer

func WithSequencer(seq Sequencer) Option

WithSequencer will cause a Q to use a user-provided Sequencer.

type Q

type Q struct {
	// contains filtered or unexported fields
}

Q is a persistent message queue. Its methods are goroutine-safe. Q retains the data that is sent to it until messages are acked (or nacked without retry)

func DeadLetters

func DeadLetters(q *Q) (*Q, error)

If dead-lettering is enabled on q, DeadLetters will return a dead-letter queue that is named the same as q, but will emit dead-letters on Receive. The dead-letter queue itself does not support dead-lettering; nacked messages that are not retried will be deleted.

If dead-lettering is not enabled on q, an error will be returned.

func NewQ

func NewQ(db *bolt.DB, name string, options ...Option) (*Q, error)

NewQ creates a new Q. Only one queue should be created in a given bolt db, unless compaction is disabled.

func (*Q) Close

func (q *Q) Close() error

Close closes q. When q is closed, Send, Receive, and Close will return ErrQClosed. Close blocks until all messages in the "unacked" state are Acked or Nacked.

func (*Q) Compact

func (q *Q) Compact() (rerr error)

Compact performs compaction on the queue. All messages will be copied from the underlying database into another.

The queue is unavailable while compaction is occurring.

Compaction causes the underlying bolt database to be replaced, so callers should be aware that any other queues relying on the database may be invalidated.

func (*Q) Delay

func (q *Q) Delay(message []byte, when time.Time) (ID, error)

Delay is like Send, but the message will not enter the Ready state until after "when" has occurred.

If "when" has already occurred, then it will be set to time.Now().

func (*Q) Receive

func (q *Q) Receive(ctx context.Context) (*Message, error)

Receive receives a message from the queue. If no messages are available by the time the context is done, then the function will return a nil Message and the result of ctx.Err().

func (*Q) Send

func (q *Q) Send(message []byte) (ID, error)

Send sends a message to Q. When send completes with nil error, the message sent to Q will be in the Ready state.

func (*Q) String

func (q *Q) String() string

func (*Q) Wait

func (q *Q) Wait(msg []byte, on ...ID) (ID, error)

Wait causes a message to wait for other messages to Ack, before entering the Ready state.

When all of the messages Wait is waiting on have been Acked, then the message will enter the Ready state.

When there are no messages to wait on, Wait behaves the same as Send.

type Sequencer

type Sequencer interface {
	NextSequence() (ID, error)
}

Sequencer returns an ID with each call to NextSequence and any error that occurred.

A Sequencer should obey the following invariants:

* NextSequence is goroutine-safe.

* NextSequence will never generate the same ID.

* NextSequence will return IDs whose big-endian binary representation is incrementing.

Q is not guaranteed to use all of the IDs generated by its Sequencer.

type Uint64ID

type Uint64ID uint64

Uint64ID is the default ID used by lasr.

func (Uint64ID) MarshalBinary

func (id Uint64ID) MarshalBinary() ([]byte, error)

func (*Uint64ID) UnmarshalBinary

func (id *Uint64ID) UnmarshalBinary(b []byte) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL