message

package
v0.83.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2019 License: MIT Imports: 20 Imported by: 11

Documentation

Overview

Package message defines a Message interface and Envelope type, and provides means of mapping Messages to Journals, framing to and from []bytes, publishing them, reading a stream of messages from a journal, and sequencing a journal stream of uncommitted messages into committed ones.

Index

Constants

View Source
const FixedFrameHeaderLength = 8

FixedFrameHeaderLength is the number of leading header bytes of each frame: A 4-byte magic word followed by a little-endian length.

Variables

View Source
var (
	// ErrDesyncDetected is returned by Unmarshal upon detection of an invalid frame.
	ErrDesyncDetected = errors.New("detected de-synchronization")
)
View Source
var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")

ErrEmptyListResponse is returned by a MappingFunc which received an empty ListResponse from a PartitionsFunc.

View Source
var ErrMustStartReplay = errors.New("must start reader")
View Source
var FixedFraming = new(fixedFraming)

FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.

View Source
var JSONFraming = new(jsonFraming)

JSONFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.

Functions

func UnpackLine

func UnpackLine(r *bufio.Reader) ([]byte, error)

UnpackLine returns bytes through to the first encountered newline "\n". If the complete line is in the Reader buffer, no alloc or copy is needed.

Types

type AckIntent added in v0.83.1

type AckIntent struct {
	Journal pb.Journal // Journal to be acknowledged.
	Intent  []byte     // Framed Message payload.
	// contains filtered or unexported fields
}

AckIntent is framed Intent and Journal which, when written, will acknowledge a set of pending Messages previously written via PublishUncommitted.

type Clock added in v0.83.1

type Clock uint64

Clock is a v1 UUID 60-bit timestamp (60 MSBs), followed by 4 bits of sequence counter. Both the timestamp and counter are monotonic (will never decrease), and each Tick increments the Clock. For UUID generation, Clock provides a total ordering over UUIDs of a given ProducerID.

func GetClock added in v0.83.1

func GetClock(uuid UUID) Clock

GetClock returns the clock timestamp and sequence as a Clock.

func NewClock added in v0.83.1

func NewClock(t time.Time) Clock

NewClock returns a Clock initialized to the given Time.

func (*Clock) Tick added in v0.83.1

func (c *Clock) Tick() Clock

Tick increments the Clock by one and returns the result. It is safe for concurrent use.

func (*Clock) Update added in v0.83.1

func (c *Clock) Update(t time.Time)

Update the Clock given a recent time observation. If |t| has a wall time which is less than the current Clock, no update occurs (in order to maintain monotonicity). Update is safe for concurrent use.

type Envelope

type Envelope struct {
	Journal    *pb.JournalSpec // JournalSpec of the Message.
	Begin, End pb.Offset       // [Begin, End) byte offset of the Message within the Journal.
	Message                    // Wrapped message.
}

Envelope wraps a Message with associated metadata.

type FixedFrameable added in v0.83.1

type FixedFrameable interface {
	ProtoSize() int
	MarshalTo([]byte) (int, error)
}

FixedFramable is the Frameable interface required by FixedFraming.

type Flags added in v0.83.1

type Flags uint16

Flags are the 10 least-significant bits of the v1 UUID clock sequence, which are reserved for flags.

const (
	// Flag_OUTSIDE_TXN indicates the message is not a participant in a
	// transaction and should be processed immediately.
	Flag_OUTSIDE_TXN Flags = 0x0
	// Flag_CONTINUE_TXN indicates the message implicitly begins or continues a
	// transaction. The accompanying message should be processed only after
	// reading a Flag_ACK_TXN having a larger clock.
	Flag_CONTINUE_TXN Flags = 0x1
	// Flag_ACK_TXN indicates the message acknowledges the commit of all
	// Flag_CONTINUE_TXN messages before it and having smaller clocks, allowing
	// those messages to be processed.
	//
	// A Flag_ACK_TXN may have a clock *earlier* than prior Flag_CONTINUE_TXNs,
	// in which case those Messages are to be considered "rolled back" and should
	// be discarded without processing.
	//
	// A read Flag_ACK_TXN clock should generally not be less than a prior read
	// Flag_ACK_TXN, as each such message is confirmed to have committed before
	// the next is written. Should the clock be less, it indicates that an
	// upstream store checkpoint was rolled-back to a prior version (eg, due to
	// disaster or missing WAL). When this happens, the upstream producer will
	// re-process some number of messages, and may publish Messages under new
	// UUIDs which partially or wholly duplicate messages published before.
	// In other words, the processing guarantee in this case is weakened from
	// exactly-once to at-least-once until the upstream producer catches up to
	// the progress of the furthest checkpoint ever achieved.
	Flag_ACK_TXN Flags = 0x2
)

func GetFlags added in v0.83.1

func GetFlags(uuid UUID) Flags

GetFlags returns the 10 least-significant bits of the clock sequence.

type Frameable added in v0.83.1

type Frameable interface{}

Frameable is an interface suitable for serialization by a Framing. The interface requirements of a Frameable are specific to the Framing used, and asserted at run-time. Generally a Frameable is a Message but the Framing interface doesn't require this.

type Framing

type Framing interface {
	// ContentType of the Framing.
	ContentType() string
	// Marshal a Message to a bufio.Writer. Marshal may assume the Message has
	// passed validation, if implemented for the message type. It may ignore
	// any error returned by the provided Writer.
	Marshal(Frameable, *bufio.Writer) error
	// Unpack reads and returns a complete framed message from the Reader,
	// including any applicable message header or suffix. It returns an error of
	// the underlying Reader, or of a framing corruption. The returned []byte may
	// be invalidated by a subsequent use of the Reader or another Unpack call.
	Unpack(*bufio.Reader) ([]byte, error)
	// Unmarshal a Frameable message from the supplied frame previously produced
	// by Unpack. It returns only message-level decoding errors, which do not
	// invalidate the Framing or the Reader (eg, further frames may be unpacked).
	Unmarshal([]byte, Frameable) error
}

Framing specifies the serialization used to encode Messages within a journal.

func FramingByContentType

func FramingByContentType(contentType string) (Framing, error)

FramingByContentType returns the Framing having the corresponding |contentType|, or returns an error if none match.

type Iterator added in v0.83.1

type Iterator interface {
	// Next returns the next message Envelopes in the sequence. It returns EOF
	// if none remain, or any other encountered error.
	Next() (Envelope, error)
}

Iterator iterates over message Envelopes.

type JournalProducer added in v0.83.1

type JournalProducer struct {
	Journal  pb.Journal
	Producer ProducerID
}

JournalProducer composes an Journal and ProducerID.

type Mappable added in v0.83.1

type Mappable interface{}

Mappable is an interface suitable for mapping by a MappingFunc. Typically a MappingKeyFunc will cast and assert Mappable's exact type at run-time. Generally a Mappable is a Message but the MappingFunc interface doesn't require this.

type MappingFunc

type MappingFunc func(Mappable) (pb.Journal, Framing, error)

MappingFunc maps a Mappable message to a responsible journal. Gazette imposes no formal requirement on exactly how that mapping is performed, or the nature of the mapped journal.

It's often desired to spread a collection of like messages across a number of journals, with each journal playing the role of a topic partition. Such partitions can be distinguished through a JournalSpec Label such as "app.gazette.dev/message-type: MyMessage". Note that "partition" and "topic" are useful terminology, but play no formal role and have no explicit implementation within Gazette (aside from their expression via Labels and LabelSelectors). See `labels` package documentation for naming conventions.

A Mapper implementation would typically:

  1. Apply domain knowledge to introspect the Mappable and determine a "topic", expressed as a LabelSelector.
  2. Query the broker List RPC to determine current partitions of the topic, caching and refreshing List results as needed (see client.PolledList).
  3. Use a ModuloMapping or RendezvousMapping to select among partitions.

func ModuloMapping

func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

ModuloMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and modulo arithmetic.

func RandomMapping

func RandomMapping(partitions PartitionsFunc) MappingFunc

RandomMapping returns a MappingFunc which maps a Mappable to a randomly selected Journal of the PartitionsFunc.

func RendezvousMapping

func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

RendezvousMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is more expensive to compute than using modulo arithmetic, but is still efficient and minimizes reassignments which occur when journals are added or removed.

type MappingKeyFunc

type MappingKeyFunc func(Mappable, io.Writer)

MappingKeyFunc extracts an appropriate mapping key from the Mappable by writing its value into the provided io.Writer, whose Write() is guaranteed to never return an error.

type Message

type Message interface {
	// GetUUID() returns the UUID previously set on the Message. If the Message
	// is not capable of tracking UUIDs, GetUUID() returns a zero-valued UUID
	// to opt the Message out of exactly-once processing semantics. In this
	// case, SetUUID is also a no-op.
	GetUUID() UUID
	// SetUUID sets the UUID of the Message.
	SetUUID(UUID)
	// NewAcknowledgement returns a new Message instance of this same type which
	// will represent an acknowledgement of this (and future) Messages published
	// to the Journal within the context of a transaction.
	NewAcknowledgement(pb.Journal) Message
}

Message is a user-defined, serializable payload type which may carry a UUID.

type NewMessageFunc added in v0.83.1

type NewMessageFunc func(*pb.JournalSpec) (Message, error)

NewMessageFunc returns a Message instance of an appropriate type for the reading the given JournalSpec. Implementations may want to introspect the JournalSpec, for example by examining application-specific labels therein. An error is returned if an appropriate Message type cannot be determined.

type PartitionsFunc

type PartitionsFunc func() *pb.ListResponse

PartitionsFunc returns a ListResponse of journal partitions from which a MappingFunc may select. The returned instance pointer may change across invocations, but a returned ListResponse may not be modified. PartitionsFunc should seek to preserve pointer equality of result instances when no substantive change has occurred. See also: client.PolledList.

type ProducerID added in v0.83.1

type ProducerID [6]byte

ProducerID is the unique node identifier portion of a v1 UUID.

func GetProducerID added in v0.83.1

func GetProducerID(uuid UUID) ProducerID

GetProducerID returns the node identifier of a UUID as a ProducerID.

func NewProducerID added in v0.83.1

func NewProducerID() ProducerID

NewProducerID returns a cryptographically random ProducerID which is very, very likely to be unique (47 bits of entropy, a space of ~141 trillion) provided that each ProducerID has a reasonably long lifetime (eg on the order of a process, not of a request).

type ProducerState added in v0.83.1

type ProducerState struct {
	JournalProducer
	// LastAck is the Clock of the Producer's last ACK_TXN or OUTSIDE_TXN.
	LastAck Clock
	// Begin is the offset of the first message byte having CONTINUE_TXN that's
	// larger than LastAck. Eg, it's the offset which opens the next transaction.
	// If there is no such message, Begin is -1.
	Begin pb.Offset
}

ProducerState is a snapshot of a Producer's state within a Journal. It's marshalled into consumer checkpoints to allow a Sequencer to recover producer sequence states after a consumer process fault.

type Publisher added in v0.83.1

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher maps, sequences, and asynchronously appends messages to Journals. It supports two modes of publishing: PublishCommitted and PublishUncommitted. Committed messages are immediately read-able by a read-committed reader. Uncommitted messages are immediately read-able by a read-uncommitted reader, but not by a read-committed reader until a future "acknowledgement" (ACK) message marks them as committed -- an ACK which may not ever come.

To publish as a transaction, the client first issues a number of PublishUncommitted calls. Once all pending messages have been published, BuildAckIntents returns []AckIntents which will inform readers that published messages have committed and should be processed. To ensure atomicity of the published transaction, []AckIntents must be written to stable storage *before* being applied, and must be re-tried on fault.

As a rule of thumb, API servers or other pure "producers" of events in Gazette should use PublishCommitted. Gazette consumers should use PublishUncommitted to achieve end-to-end exactly once semantics: upon commit, each consumer transaction will automatically acknowledge all such messages published over the course of the transaction.

Consumers *may* instead use PublishCommitted, which may improve latency slightly (as read-committed readers need not wait for the consumer transaction to commit), but must be aware that its use weakens the effective processing guarantee to at-least-once.

func NewPublisher added in v0.83.1

func NewPublisher(ajc client.AsyncJournalClient, clock *Clock) *Publisher

NewPublisher returns a new Publisher using the given AsyncJournalClient and optional *Clock. If *Clock is nil, then an internal Clock is allocated and is updated with time.Now() on each message published. If a non-nil *Clock is provided, it should be updated by the caller at a convenient time resolution, which can greatly reduce the frequency of time() system calls.

func (*Publisher) BuildAckIntents added in v0.83.1

func (p *Publisher) BuildAckIntents() ([]AckIntent, error)

BuildAckIntents returns the []AckIntents which acknowledge all pending Messages published since its last invocation. It's the caller's job to actually append the intents to their respective journals, and only *after* checkpoint-ing the intents to a stable store so that they may be re-played in their entirety should a fault occur. Without doing this, in the presence of faults it's impossible to ensure that ACKs are written to _all_ journals, and not just some of them (or none).

Applications running as Gazette consumers *must not* call BuildAckIntents themselves. This is done on the application's behalf, as part of building the checkpoints which are committed with consumer transactions.

Uses of PublishUncommitted outside of consumer applications, however, *are* responsible for building, committing, and writing []AckIntents themselves.

func (*Publisher) PublishCommitted added in v0.83.1

func (p *Publisher) PublishCommitted(mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

PublishCommitted sequences the Message for immediate consumption, maps it to a Journal, and begins an AsyncAppend of its marshaled content. It is safe for concurrent use.

func (*Publisher) PublishUncommitted added in v0.83.1

func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) error

PublishUncommitted sequences the Message as uncommitted, maps it to a Journal, and begins an AsyncAppend of its marshaled content. The Journal is tracked and included in the results of a future []AckIntents. PublishUncommitted is *not* safe for concurrent use.

type ReadCommittedIter added in v0.83.1

type ReadCommittedIter struct {
	// contains filtered or unexported fields
}

ReadCommittedIter is an Iterator over read-committed messages.

func NewReadCommittedIter added in v0.83.1

func NewReadCommittedIter(rr *client.RetryReader, newMsg NewMessageFunc, seq *Sequencer) *ReadCommittedIter

NewReadCommittedIter returns a ReadCommittedIter over message Envelopes read from the RetryReader. The provided Sequencer is used to sequence committed messages. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

func (*ReadCommittedIter) Next added in v0.83.1

func (it *ReadCommittedIter) Next() (Envelope, error)

Next returns the next read-committed message Envelope in the sequence. It returns EOF if none remain, or any other encountered error.

type ReadUncommittedIter added in v0.83.1

type ReadUncommittedIter struct {
	// contains filtered or unexported fields
}

ReadUncommittedIter is an Iterator over read-uncommitted messages.

func NewReadUncommittedIter added in v0.83.1

func NewReadUncommittedIter(rr *client.RetryReader, newMsg NewMessageFunc) *ReadUncommittedIter

NewReadUncommittedIter returns a ReadUncommittedIter over message Envelopes read from the RetryReader. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

func (*ReadUncommittedIter) Next added in v0.83.1

func (it *ReadUncommittedIter) Next() (Envelope, error)

Next reads and returns the next Envelope, or an unrecoverable error. Several recoverable errors are handled and logged, but not returned:

  • A framing Unmarshal() error (as opposed to Unpack). This is an error with respect to a specific framed message in the journal, and not an error of the underlying Journal stream. It can happen if improperly framed or simply malformed data is written to a journal. Typically we want to keep reading in this case as the error already happened (when the frame was written) and we still care about Envelopes occurring afterwards.
  • ErrOffsetJump indicates the next byte of available content is at an offset larger than the one requested. This can happen if a range of content was deleted from the journal. Next will log a warning but continue reading at the jumped offset, which is reflected in the returned Envelope.

type Sequencer added in v0.83.1

type Sequencer struct {
	// contains filtered or unexported fields
}

Sequencer sequences ordered, uncommitted and potentially duplicated messages into acknowledged, exactly-once messages. Read uncommitted messages are fed to QueueUncommitted(), after which the client must repeatedly call DequeCommitted() to drain all acknowledged messages until io.EOF is returned.

Sequencer maintains an internal ring buffer of messages, which is usually sufficient to directly read committed messages. When recovering from a checkpoint, or if a very long sequence or old producer is acknowledged, it may be necessary to start a replay of already-read messages. In this case:

  • DequeCommitted() will return ErrMustStartReplay.
  • ReplayRange() will return the exact offset range required.
  • The client must then supply an appropriate Iterator to StartReplay().

Having done this, calls to DequeCommitted() may resume to drain messages.

func NewSequencer added in v0.83.1

func NewSequencer(states []ProducerState, buffer int) *Sequencer

NewSequencer returns a new Sequencer initialized from the given producer |states|, and with an internal ring buffer of size |buffer|.

func (*Sequencer) DequeCommitted added in v0.83.1

func (w *Sequencer) DequeCommitted() (env Envelope, err error)

DequeCommitted returns the next acknowledged message, or io.EOF if no acknowledged messages remain. It must be called repeatedly after each QueueUncommitted() until it returns io.EOF. If messages are no longer within the Sequencer's buffer, it returns ErrMustStartReplay and the caller must first StartReplay() before trying again.

func (*Sequencer) HasPending added in v0.83.1

func (w *Sequencer) HasPending(since pb.Offsets) bool

HasPending returns true if any partial sequence has a first offset larger than those of |since| (eg, the sequence started since |since| was read). Assuming liveness of producers, it hints that further messages are forthcoming.

func (*Sequencer) ProducerStates added in v0.83.1

func (w *Sequencer) ProducerStates(pruneHorizon time.Duration) []ProducerState

ProducerStates returns a snapshot of producers and their states, after pruning any producers having surpassed |pruneHorizon| in age relative to the most recent producer within their journal. If |pruneHorizon| is zero, no pruning is done. ProducerStates panics if messages still remain to deque.

func (*Sequencer) QueueUncommitted added in v0.83.1

func (w *Sequencer) QueueUncommitted(env Envelope)

QueueUncommitted adds a read, uncommitted Envelope to the Sequencer. It panics if called while messages remain to deque.

func (*Sequencer) ReplayRange added in v0.83.1

func (w *Sequencer) ReplayRange() (begin, end pb.Offset)

ReplayRange returns the [begin, end) exclusive byte offsets to be replayed. Panics if ErrMustStartReplay was not returned by DequeCommitted().

func (*Sequencer) StartReplay added in v0.83.1

func (w *Sequencer) StartReplay(it Iterator)

StartReplay is called with a read-uncommitted Iterator over ReplayRange(). Panics if ErrMustStartReplay was not returned by DequeCommitted().

type UUID added in v0.83.1

type UUID = uuid.UUID

UUID is a RFC 4122 v1 variant Universally Unique Identifier which uniquely identifies each message. As a v1 UUID, it incorporates a clock timestamp and sequence, as well as a node identifier (which, within the context of Gazette, is also known as a ProducerID).

Each sequence of UUIDs produced by Gazette use a strongly random ProducerID, and as such the RFC 4122 purpose of the clock sequence isn't required. Instead, Gazette uses clock sequence bits of UUIDs it generates in the following way:

  • The first 2 bits are reserved to represent the variant, as per RFC 4122.
  • The next 4 bits extend the 60 bit timestamp with a counter, which allows for a per-producer UUID generation rate of 160M UUIDs / second before running ahead of wall-clock time. The timestamp and counter are monotonic, and together provide a total ordering of UUIDs from each ProducerID.
  • The remaining 10 bits are flags, eg for representing transaction semantics.

func BuildUUID added in v0.83.1

func BuildUUID(id ProducerID, clock Clock, flags Flags) UUID

BuildUUID builds v1 UUIDs per RFC 4122.

type Validator added in v0.83.1

type Validator = pb.Validator

Validator is an optional interface of a Message able to Validate itself. An attempt to publish a Message which does not Validate() will error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL