quasar

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2020 License: MIT Imports: 12 Imported by: 0

README

quasar

Build Status Coverage Status GoDoc Release Go Report Card

A library that implements a family of low-level tools to build persistent messaging systems.

Documentation

Index

Constants

View Source
const EncodedSequenceLength = 20

EncodedSequenceLength defines the expected length of an encoded sequence.

Variables

View Source
var ErrConsumerClosed = errors.New("consumer closed")

ErrConsumerClosed is yielded to callbacks if the consumer has been closed.

View Source
var ErrConsumerDeadlock = errors.New("consumer deadlock")

ErrConsumerDeadlock is returned by the consumer if the specified deadline has been reached.

View Source
var ErrInvalidSequence = errors.New("invalid sequence")

ErrInvalidSequence is yielded to callbacks if the provided sequences that has not yet been processed by the consumer.

View Source
var ErrLimitReached = errors.New("limit reached")

ErrLimitReached is returned for write attempts that go beyond the allowed ledger length.

View Source
var ErrNotMonotonic = errors.New("not monotonic")

ErrNotMonotonic is returned for write attempts that are not monotonic.

View Source
var ErrProducerClosed = errors.New("producer closed")

ErrProducerClosed is yielded to callbacks if the producer has been closed.

Functions

func CompileSequences added in v0.5.0

func CompileSequences(table map[uint64]bool) []uint64

CompileSequences will compile a list of positive sequences from the provided mark table. It will compress positive adjacent tail sequences and inject a fake positive sequence at the beginning if the first entry in the table is negative.

func DecodeSequence

func DecodeSequence(key []byte) (uint64, error)

DecodeSequence will decode a sequence.

func DecodeSequences added in v0.2.0

func DecodeSequences(value []byte) ([]uint64, error)

DecodeSequences will decode a list of compacted sequences.

func EncodeSequence

func EncodeSequence(s uint64, compact bool) []byte

EncodeSequence will encode a sequence.

func EncodeSequences added in v0.2.0

func EncodeSequences(list []uint64) []byte

EncodeSequences will encode a list of compacted sequences.

func GenerateSequence

func GenerateSequence(n uint32) uint64

GenerateSequence will generate a locally monotonic sequence that consists of the current time and an ordinal number. The returned sequence is the first of n consecutive numbers and will either overflow in 2106 or if generated more than ca. 4 billion times a second.

func JoinSequence

func JoinSequence(ts time.Time, n uint32) uint64

JoinSequence constructs a sequence from a 32 bit timestamp and 32 bit ordinal number.

func SplitSequence

func SplitSequence(s uint64) (time.Time, uint32)

SplitSequence explodes the sequence in its timestamp and ordinal number.

Types

type Buffer added in v0.5.0

type Buffer struct {
	// contains filtered or unexported fields
}

Buffer is a circular buffer to store entries.

func NewBuffer added in v0.5.0

func NewBuffer(size int) *Buffer

NewBuffer creates and returns a new buffer.

func (*Buffer) Index added in v0.8.0

func (b *Buffer) Index(index int) (Entry, bool)

Index will return the entry on the specified position in the buffer. Negative indexes are counted backwards.

func (*Buffer) Length added in v0.5.0

func (b *Buffer) Length() int

Length will return the length of the buffer.

func (*Buffer) Push added in v0.5.0

func (b *Buffer) Push(entries ...Entry)

Push will add entries to the buffer.

func (*Buffer) Reset added in v0.5.0

func (b *Buffer) Reset()

Reset will reset the buffer.

func (*Buffer) Scan added in v0.5.0

func (b *Buffer) Scan(fn func(Entry) bool)

Scan will iterate over the buffered entries until false is returned.

func (*Buffer) Trim added in v0.5.0

func (b *Buffer) Trim(fn func(Entry) bool)

Trim will remove entries from the buffer until false is returned.

type Cleaner

type Cleaner struct {
	// contains filtered or unexported fields
}

Cleaner will periodically delete entries from a ledger honoring the configured retention and threshold as well as positions from the specified tables. Failed cleanings are retried and the errors yielded to the configured callback.

func NewCleaner

func NewCleaner(ledger *Ledger, config CleanerConfig) *Cleaner

NewCleaner will create and return a new cleaner.

func (*Cleaner) Close

func (c *Cleaner) Close()

Close will close the cleaner.

type CleanerConfig added in v0.3.0

type CleanerConfig struct {
	// The amount of entries to keep available in the ledger.
	Retention int

	// The maximum amount of entries to keep in the ledger.
	Threshold int

	// The interval of cleanings.
	Interval time.Duration

	// The tables to check for positions.
	Tables []*Table

	// The callback used to yield errors.
	Errors func(error)
}

CleanerConfig is used to configure a cleaner.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer manages consuming messages of a ledger.

func NewConsumer

func NewConsumer(ledger *Ledger, table *Table, config ConsumerConfig) *Consumer

NewConsumer will create and return a new consumer.

func (*Consumer) Close

func (c *Consumer) Close()

Close will close the consumer.

func (*Consumer) Mark added in v0.5.0

func (c *Consumer) Mark(sequence uint64, cumulative bool, ack func(error)) bool

Mark will acknowledge and mark the consumption of the specified sequence. The specified callback is called with the result of the processed mark. If Skip is configured the callback might called later once the mark will be persisted. The method returns whether the mark has been successfully queued and its callback will be called with the result or an error if the consumer is closed.

type ConsumerConfig added in v0.3.0

type ConsumerConfig struct {
	// The name of the persistent consumer. If empty, the consumer will not
	// persist its positions.
	Name string

	// The start position of the consumer if not recovered from the table.
	Start uint64

	// The channel on which available entries are sent.
	Entries chan<- Entry

	// The callback that is called with errors before the consumer dies.
	Errors func(error)

	// The amount of entries to fetch from the ledger at once.
	Batch int

	// The maximal size of the unmarked sequence range.
	Window int

	// The number of acks to skip before sequences are written to the table.
	Skip int

	// The time after skipped marks are persisted to the table.
	Timeout time.Duration

	// The time after which the consumer crashes if it cannot make progress.
	Deadline time.Duration
}

ConsumerConfig is used to configure a consumer.

type DB

type DB struct {
	*pebble.DB
	// contains filtered or unexported fields
}

DB is a generic database.

func OpenDB

func OpenDB(directory string, config DBConfig) (*DB, error)

OpenDB will open or create the specified db. A function is returned that must be called before closing the returned db to close the GC routine.

func (*DB) Close added in v0.9.1

func (db *DB) Close() error

Close will close the db.

type DBConfig added in v0.3.0

type DBConfig struct {
	// Whether all writes should be synced.
	SyncWrites bool

	// The sink used for logging.
	Logger func(format string, args ...interface{})
}

DBConfig is used to configure a DB.

type Entry

type Entry struct {
	// The entries sequence (must be greater than zero).
	Sequence uint64

	// The entries payload that is written to disk.
	Payload []byte

	// The reference to a shared object. This can be used with cached ledgers
	// to retain a reference to a decoded object of the entry.
	Object interface{}
}

Entry is a single entry in the ledger.

type Ledger

type Ledger struct {
	// contains filtered or unexported fields
}

Ledger manages the storage of sequential entries.

func CreateLedger

func CreateLedger(db *DB, config LedgerConfig) (*Ledger, error)

CreateLedger will create a ledger that stores entries in the provided db. Read, write and delete requested can be issued concurrently to maximize performance. However, only one goroutine may write entries at the same time.

func (*Ledger) Delete

func (l *Ledger) Delete(sequence uint64) (int, error)

Delete will remove all entries up to and including the specified sequence from the ledger.

func (*Ledger) Head

func (l *Ledger) Head() uint64

Head will return the last committed sequence. This value can be checked periodically to asses whether new entries have been added.

func (*Ledger) Index

func (l *Ledger) Index(index int) (uint64, bool, error)

Index will return the sequence of the specified index in the ledger. Negative indexes are counted backwards from the head. If the index exceeds the current length, the sequence of the last entry and false is returned. If the ledger is empty the current head (zero if unused) and false will be returned.

func (*Ledger) Length

func (l *Ledger) Length() int

Length will return the number of stored entries.

func (*Ledger) Read

func (l *Ledger) Read(sequence uint64, amount int) ([]Entry, error)

Read will read entries from and including the specified sequence up to the requested amount of entries.

func (*Ledger) Subscribe

func (l *Ledger) Subscribe(receiver chan<- uint64)

Subscribe will subscribe the specified channel to changes to the last sequence stored in the ledger. Notifications will be skipped if the specified channel is not writable for some reason.

func (*Ledger) Tail added in v0.9.0

func (l *Ledger) Tail() uint64

Tail will return the last deleted sequence. This value can be checked periodically to asses whether entries haven been deleted.

func (*Ledger) Unsubscribe

func (l *Ledger) Unsubscribe(receiver chan<- uint64)

Unsubscribe will remove a previously subscribed receiver.

func (*Ledger) Write

func (l *Ledger) Write(entries ...Entry) error

Write will write the specified entries to the ledger. No entries have been written if an error has been returned. Monotonicity of the written entries is checked against the current head.

type LedgerConfig added in v0.3.0

type LedgerConfig struct {
	// The prefix for all ledger keys.
	Prefix string

	// The amount of entries to cache in memory.
	Cache int

	// The maximum length of the ledger. Write() will return ErrLimitReached if
	// the ledger is longer than this value.
	Limit int
}

LedgerConfig is used to configure a ledger.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides an interface to efficiently batch entries and write them to a ledger.

func NewProducer

func NewProducer(ledger *Ledger, config ProducerConfig) *Producer

NewProducer will create and return a producer.

func (*Producer) Close

func (p *Producer) Close()

Close will close the producer. Unprocessed entries will be canceled and the callbacks receive ErrProducerClosed if available.

func (*Producer) Write

func (p *Producer) Write(entry Entry, ack func(error)) bool

Write will asynchronously write the specified message and call the provided callback with the result. The method returns whether the entry has been accepted and that the ack, if present, will be called with the result of the operation.

type ProducerConfig added in v0.3.0

type ProducerConfig struct {
	// The maximum size of the written entry batches.
	Batch int

	// The timeout after an unfinished batch is written in any case.
	Timeout time.Duration

	// The number of times a failed write due to ErrLimitReached is retried.
	Retry int

	// The time after which a failed write due to ErrLimitReached is retried.
	Delay time.Duration

	// If enabled, the producer will filter out entries that have a lower
	// sequence than the current ledger head.
	Filter bool
}

ProducerConfig is used to configure a producer.

type Queue added in v0.6.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a managed ledger and table with a cleaner.

func CreateQueue added in v0.6.0

func CreateQueue(db *DB, config QueueConfig) (*Queue, error)

CreateQueue will create and return a new queue based on the provided settings.

func (*Queue) Close added in v0.6.0

func (q *Queue) Close()

Close will close the queue.

func (*Queue) Consumer added in v0.6.0

func (q *Queue) Consumer(config ConsumerConfig) *Consumer

Consumer will create a new consumer with the provided config.

func (*Queue) Ledger added in v0.6.0

func (q *Queue) Ledger() *Ledger

Ledger will return the queues ledger.

func (*Queue) Producer added in v0.6.0

func (q *Queue) Producer(config ProducerConfig) *Producer

Producer will create a new producer with the provided config.

func (*Queue) Table added in v0.6.0

func (q *Queue) Table() *Table

Table will return the queues table.

type QueueConfig added in v0.6.0

type QueueConfig struct {
	// The prefix for all keys.
	Prefix string

	// The amount of ledger entries to cache in memory.
	LedgerCache int

	// The maximum length of the ledger.
	LedgerLimit int

	// Whether the table should be fully cached in memory.
	TableCache bool

	// The amount of entries to keep around.
	CleanRetention int

	// The point after which entries are dropped no matter what.
	CleanThreshold int

	// The interval of periodic cleanings.
	CleanInterval time.Duration

	// The callback used to yield cleaner errors.
	CleanErrors func(error)
}

QueueConfig is used to configure a queue.

type Table

type Table struct {
	// contains filtered or unexported fields
}

Table manages the storage of positions markers.

func CreateTable

func CreateTable(db *DB, config TableConfig) (*Table, error)

CreateTable will create a table that stores position markers.

func (*Table) All added in v0.7.1

func (t *Table) All() (map[string][]uint64, error)

All will return a map with all stored positions.

func (*Table) Delete

func (t *Table) Delete(name string) error

Delete will remove the specified positions from the table.

func (*Table) Get

func (t *Table) Get(name string) ([]uint64, error)

Get will read the specified positions from the table.

func (*Table) Range

func (t *Table) Range() (uint64, uint64, bool, error)

Range will return the range of stored positions and whether there are any stored positions at all.

func (*Table) Set

func (t *Table) Set(name string, positions []uint64) error

Set will write the specified positions to the table.

type TableConfig added in v0.3.0

type TableConfig struct {
	// The prefix for all table keys.
	Prefix string

	// Enable to keep all positions in memory.
	Cache bool
}

TableConfig is used to configure a table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL