Version: v2.0.212+incompatible Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2019 License: MIT Imports: 12 Imported by: 10



Package message defines a Message interface and Envelope type, and provides a Framing interface and implementations. It additionally defines function types and related routines for mapping a Message to a suitable journal.



View Source
const FixedFrameHeaderLength = 8

FixedFrameHeaderLength is the number of leading header bytes of each frame: A 4-byte magic word followed by a little-endian length.


View Source
var (
	// ErrDesyncDetected is returned by Unmarshal upon detection of an invalid frame.
	ErrDesyncDetected = errors.New("detected de-synchronization")
View Source
var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")

ErrEmptyListResponse is returned by a MappingFunc which received an empty ListResponse from a PartitionsFunc.

View Source
var FixedFraming = new(fixedFraming)

FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.

View Source
var JSONFraming = new(jsonFraming)

JSONFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.


func Publish

func Publish(broker client.AsyncJournalClient, mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

Publish maps the Message to its target journal and begins an Append of the Message's marshaled content under the mapped journal framing. If Message implements Validate, the message is first validated and any error returned.

func UnpackLine

func UnpackLine(r *bufio.Reader) ([]byte, error)

UnpackLine returns bytes through to the first encountered newline "\n". If the complete line is in the Reader buffer, no alloc or copy is needed.


type Envelope

type Envelope struct {
	Fragment    *protocol.Fragment
	JournalSpec *protocol.JournalSpec
	NextOffset  int64 // Offset of the next Message within the Journal.

Envelope combines a Message with its Journal, Fragment and byte offset.

type Fixupable

type Fixupable interface {
	Fixup() error

Fixupable is an optional Message type capable of being "fixed up" after decoding. This provides an opportunity to apply custom migrations or initialization after a generic or code-generated unmarshal has completed.

type Framing

type Framing interface {
	// ContentType of the Framing.
	ContentType() string
	// Marshal a Message to a bufio.Writer. Marshal may assume the Message has
	// passed validation, if implemented for the message type. It may ignore
	// any error returned by the provided Writer.
	Marshal(Message, *bufio.Writer) error
	// Unpack reads and returns a complete framed message from the Reader,
	// including any applicable message header or suffix. It returns an error of
	// the underlying Reader, or of a framing corruption. The returned []byte may
	// be invalidated by a subsequent use of the Reader or another Unpack call.
	Unpack(*bufio.Reader) ([]byte, error)
	// Unmarshals Message from the supplied frame, previously produced by Unpack.
	// It returns a Message-level decoding error, which does not invalidate the
	// framing or Reader (eg, further frames may be unpacked).
	Unmarshal([]byte, Message) error

Framing specifies the serialization used to encode Messages within a topic.

func FramingByContentType

func FramingByContentType(contentType string) (Framing, error)

FramingByContentType returns the Framing having the corresponding |contentType|, or returns an error if none match.

type MappingFunc

type MappingFunc func(msg Message) (protocol.Journal, Framing, error)

MappingFunc maps a Message to a responsible journal. Gazette imposes no formal requirement on exactly how that mapping is performed, or the nature of the mapped journal.

By convention, users will group a number of like journals together into a topic, with each journal playing the role of a partition of the topic. Such partitions can be easily distinguished through a JournalSpec Label such as "topic=my/topic/name". Note that "partition" and "topic" are useful terminology, but play no formal role and have no explicit implementation within Gazette (aside from their expression via Labels and LabelSelectors).

A Mapper implementation would typically:

1) Apply domain knowledge to introspect the Message and determine a topic.
2) Query the broker List RPC to determine current partitions of the topic,
   caching and periodically refreshing List results as needed.
3) Use a modulo or rendezvous hash mapping to select among partitions.

func ModuloMapping

func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

ModuloMapping returns a MappingFunc which maps a Message into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and modulo arithmetic.

func RandomMapping

func RandomMapping(partitions PartitionsFunc) MappingFunc

RandomMapping returns a MappingFunc which maps a Message to a randomly selected Journal of the PartitionsFunc.

func RendezvousMapping

func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

RendezvousMapping returns a MappingFunc which maps a Message into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is more expensive to compute than using modulo arithmetic, but is still efficient and minimizes reassignments which occur when journals are added or removed.

type MappingKeyFunc

type MappingKeyFunc func(Message, []byte) []byte

MappingKeyFunc extracts an appropriate mapping key from the Message, optionally using the provided temporary buffer, and returns it.

type Message

type Message interface{}

Message is a user-defined, serializable type.

type PartitionsFunc

type PartitionsFunc func() *protocol.ListResponse

PartitionsFunc returns a ListResponse of journal partitions from which a MappingFunc may select. The returned instance pointer may change across invocations, but a returned ListResponse may not be modified. PartitionsFunc should seek to preserve pointer equality of result instances when no substantive change has occurred. See also: client.PolledList.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto