Documentation

Overview

    Package message is a client-only library which implements exactly-once message semantics atop raw, at-least-once Journal byte-streams.

    It specifies a common Message interface type which must be implemented by consumer Applications, and a RFC 4122 v1 UUID type which enables de-duplication and atomic commits of multiple messages.

    MappingKeyFunc extracts a stable mapping identifier from a custom message type. To map messages on a session ID:

    var mapOnSessionFn MappingKeyFunc = func(m Mappable, w io.Writer) {
        w.Write([]byte(m.(*MyMsgType).SessionID))
    }
    

    MappingFunc then defines the means of mapping messages to a journal. Several routines, like ModuloMapping, help in the construction of MappingFuncs and can be used to implement "data shuffles" which stably map messages having a shared mapping key to a common journal.

    Combine with client.PolledList to build MappingFuncs that publish to a dynamic, automatically updating "topic" of selected journal partitions:

    var myClient pb.AsyncJournalClient = ...
    
    var partitions, _ = pb.ParseLabelSelector("logs=pageviews, source=mobile")
    var pl, _ = client.NewPolledList(ctx, myClient, time.Minute, pb.ListRequest{
        Selector: partitions,
    })
    // Use RendezvousMapping to minimally shuffle the mapping of
    // SessionID <=> journal when the topic partitioning is updated.
    var mapFn = RendezvousMapping(mapOnSessionFn, pl.List)
    

    Then, use a Publisher to publish messages:

    var pub = NewPublisher(myClient, nil)
    for _, msg := range messages {
        // Each message is mapped on its SessionID to a current topic
        // partition (ie, journal), sequenced with a UUID, marshalled,
        // and queued for appended to the mapped journal.
        pub.PublishCommitted(mapFn, msg)
    }
    for op := myClient.PendingExcept("") {
        <-op.Done() // Wait for all async appends to complete.
    }
    

    When reading, NewMessageFunc provides the package with a means of constructing new messages of the users's type.

    var newMsgFn NewMessageFunc = func(*pb.JournalSpec) (Message, error) {
        return new(MyMsgType), nil
    }
    

    ReadUncommittedIter reads "uncommitted" messages from a journal. Uncommitted messages may include duplicates, or messages which are never acknowledged or are later explicitly rolled back.

    var rr = client.NewRetryReader(ctx, rjc, pb.ReadRequest{
        Journal:    "my/journal",
        Block:      true,
    })
    var it = NewReadUncommittedIter(rr, newMsgFn)
    for {
        var env, err = it.Next()
    
        // Handle |env| and |err|.
    }
    

    Use a Sequencer to sequence read-uncommitted messages into read-committed ones, and a ReadCommittedIter to read only committed messages from the journal. ReadCommittedIter is nothing more than the composition of a ReadUncommittedIter with a Sequencer.

    var seq = NewSequencer(nil, 4096)
    var it = NewReadCommittedIter(rr, newMsgFn, seq)
    for {
        var env, err = it.Next()
    
        // Handle |env| and |err|. We're assured the message has been
        // acknowledged and is not a duplicate.
    }
    

    Journals must declare their associated message Framing via the "content-type" label. The journal Framing is used to encode and decode Message instances written to the journal. Use RegisterFraming, typically from a package init() function, to register new Framing instances and make them available for use in applications. This package registers a Framing for the following content-types on its import:

    * test/csv:                     Uses "encoding/csv". See CSVFrameable.
    * application/x-ndjson:         Uses "encoing/json".
    * application/x-protobuf-fixed: Encodes ProtoFrameable messages with a preamble
       of [4]byte{0x66, 0x33, 0x93, 0x36}, followed by a 4-byte little endian unsigned
       length, followed by a marshalled protobuf message.
    

    See the "labels" package for definitions of well-known label names and values such as content-types.

    Index

    Constants

    View Source
    const FixedFrameHeaderLength = 8

      FixedFrameHeaderLength is the number of leading header bytes of a fixed frame, consisting of the word [4]byte{0x66, 0x33, 0x93, 0x36} followed by a 4-byte little-endian unsigned length.

      Variables

      View Source
      var (
      	// FixedFrameWord is a fixed 4-byte word value which precedes all fixed frame encodings.
      	FixedFrameWord = [4]byte{0x66, 0x33, 0x93, 0x36}
      	// ErrDesyncDetected is returned by UnpackFixedFrame upon detection of an invalid frame header.
      	ErrDesyncDetected = errors.New("detected de-synchronization")
      )
      View Source
      var ErrEmptyListResponse = fmt.Errorf("empty ListResponse")

        ErrEmptyListResponse is returned by a MappingFunc which received an empty ListResponse from a PartitionsFunc.

        View Source
        var ErrMustStartReplay = errors.New("must start reader")

          ErrMustStartReplay is returned by Sequencer to indicate that a journal replay must be started before the dequeue may continue.

          Functions

          func EncodeFixedProtoFrame

          func EncodeFixedProtoFrame(p ProtoFrameable, b []byte) ([]byte, error)

            Encode a ProtoFrameable by appending a fixed frame into the []byte buffer, which will be grown if needed and returned.

            func RegisterFraming

            func RegisterFraming(f Framing)

              RegisterFraming registers the Framing by its ContentType. A previously registered instance will be replaced. RegisterFraming is not safe for concurrent use, including a concurrent call to FramingByContentType. Typically it should be called from package init functions.

              func UnpackFixedFrame

              func UnpackFixedFrame(r *bufio.Reader) ([]byte, error)

                UnpackFixedFrame returns the next fixed frame of content from the Reader, including the frame header. If the magic word is not detected (indicating a de-sync), UnpackFixedFrame attempts to discard through to the next magic word, returning the interleaved but de-synchronized content along with ErrDesyncDetected.

                func UnpackLine

                func UnpackLine(r *bufio.Reader) ([]byte, error)

                  UnpackLine returns bytes through to the first encountered newline "\n". If the complete line is available in the Reader buffer, it is returned directly without a copy or allocation, and the next call to the Reader's Read will invalidate it.

                  Types

                  type AckIntent

                  type AckIntent struct {
                  	Journal pb.Journal // Journal to be acknowledged.
                  	Intent  []byte     // Framed Message payload.
                  	// contains filtered or unexported fields
                  }

                    AckIntent is framed "intent" message and its journal which, when appended to the journal, will acknowledge a set of pending messages previously written to that journal via PublishUncommitted.

                    type CSVFrameable

                    type CSVFrameable interface {
                    	//  MarshalCSV returns CSV records describing of the message.
                    	MarshalCSV() ([]string, error)
                    	// UnmarshalCSV applies the records to unmarshal the message
                    	// from its CSV description. It must copy the []string records if it
                    	// wishes to retain them after returning.
                    	UnmarshalCSV([]string) error
                    }

                      CSVFramable is the interface of a Frameable required by a CSV Framing.

                      type CSVRecord

                      type CSVRecord []string

                        CSVRecord is a minimal implementation of CSVFrameable and Message. It requires that the first field is a string-encoded UUID.

                        func (CSVRecord) GetUUID

                        func (r CSVRecord) GetUUID() UUID

                        func (CSVRecord) MarshalCSV

                        func (r CSVRecord) MarshalCSV() ([]string, error)

                          MarshalCSV returns the CSVRecord directly.

                          func (CSVRecord) NewAcknowledgement

                          func (r CSVRecord) NewAcknowledgement(pb.Journal) Message

                          func (CSVRecord) SetUUID

                          func (r CSVRecord) SetUUID(uuid UUID)

                          func (*CSVRecord) UnmarshalCSV

                          func (r *CSVRecord) UnmarshalCSV(fields []string) error

                            UnmarshalCSV copies the []string to this CSVRecord, and verifies the first column parses as a UUID.

                            type Clock

                            type Clock uint64

                              Clock is a v1 UUID 60-bit timestamp (60 MSBs), followed by 4 bits of sequence counter. Both the timestamp and counter are monotonic (will never decrease), and each Tick increments the Clock. For UUID generation, Clock provides a total ordering over UUIDs of a given ProducerID.

                              func GetClock

                              func GetClock(uuid UUID) Clock

                                GetClock returns the clock timestamp and sequence as a Clock.

                                func NewClock

                                func NewClock(t time.Time) Clock

                                  NewClock returns a Clock initialized to the given Time.

                                  func (*Clock) Tick

                                  func (c *Clock) Tick() Clock

                                    Tick increments the Clock by one and returns the result. It is safe for concurrent use.

                                    func (*Clock) Update

                                    func (c *Clock) Update(t time.Time)

                                      Update the Clock given a recent Time observation. If the Time has a wall time which is less than the current Clock, no update occurs (in order to maintain monotonicity). Update is safe for concurrent use.

                                      type Envelope

                                      type Envelope struct {
                                      	Journal    *pb.JournalSpec // JournalSpec of the Message.
                                      	Begin, End pb.Offset       // [Begin, End) byte offset of the Message within the Journal.
                                      	Message                    // Wrapped message.
                                      }

                                        Envelope wraps a Message with associated metadata.

                                        type Flags

                                        type Flags uint16

                                          Flags are the 10 least-significant bits of the v1 UUID clock sequence, which Gazette employs for representing message transaction semantics.

                                          const (
                                          	// Flag_OUTSIDE_TXN indicates the message is not a participant in a
                                          	// transaction and should be processed immediately.
                                          	Flag_OUTSIDE_TXN Flags = 0x0
                                          	// Flag_CONTINUE_TXN indicates the message implicitly begins or continues a
                                          	// transaction. The accompanying message should be processed only after
                                          	// reading a Flag_ACK_TXN having a larger clock.
                                          	Flag_CONTINUE_TXN Flags = 0x1
                                          	// Flag_ACK_TXN indicates the message acknowledges the commit of all
                                          	// Flag_CONTINUE_TXN messages before it and having smaller clocks, allowing
                                          	// those messages to be processed.
                                          	//
                                          	// A Flag_ACK_TXN may have a clock *earlier* than prior Flag_CONTINUE_TXNs,
                                          	// in which case those Messages are to be considered "rolled back" and should
                                          	// be discarded without processing.
                                          	//
                                          	// A read Flag_ACK_TXN clock should generally not be less than a prior read
                                          	// Flag_ACK_TXN, as each such message is confirmed to have committed before
                                          	// the next is written. Should the clock be less, it indicates that an
                                          	// upstream store checkpoint was rolled-back to a prior version (eg, due to
                                          	// N>R faults or misuse of the WAL). When this happens, the upstream producer
                                          	// will re-process some number of messages, and may publish Messages under new
                                          	// UUIDs which partially or wholly duplicate messages published before.
                                          	// In other words, the processing guarantee in this case is weakened from
                                          	// exactly-once to at-least-once until the upstream producer catches up to
                                          	// the progress of the furthest checkpoint ever achieved.
                                          	Flag_ACK_TXN Flags = 0x2
                                          )

                                          func GetFlags

                                          func GetFlags(uuid UUID) Flags

                                            GetFlags returns the 10 least-significant bits of the clock sequence.

                                            type Frameable

                                            type Frameable interface{}

                                              Frameable is an interface suitable for serialization by a Framing. The interface requirements of a Frameable are specific to the Framing used, and asserted at run-time. Generally an instance of Frameable is also an instance of Message, but the Framing interface doesn't require this.

                                              type Framing

                                              type Framing interface {
                                              	// ContentType of the Framing.
                                              	ContentType() string
                                              	// Marshal a Message to a bufio.Writer. Marshal may assume the Message has
                                              	// passed validation, if implemented for the message type. It may ignore
                                              	// any error returned by the provided Writer.
                                              	Marshal(Frameable, *bufio.Writer) error
                                              	// NewUnmarshalFunc returns an UnmarshalFunc which will unmarshal Frameable
                                              	// instances from the provided Reader.
                                              	NewUnmarshalFunc(*bufio.Reader) UnmarshalFunc
                                              }

                                                Framing specifies the means by which Messages are marshalled to and from a Journal.

                                                func FramingByContentType

                                                func FramingByContentType(contentType string) (Framing, error)

                                                  FramingByContentType returns the message Framing having the corresponding content-type, or returns an error if none match. It is safe for concurrent use.

                                                  type Iterator

                                                  type Iterator interface {
                                                  	// Next returns the next message Envelopes in the sequence. It returns EOF
                                                  	// if none remain, or any other encountered error.
                                                  	Next() (Envelope, error)
                                                  }

                                                    Iterator iterates over message Envelopes. It's implemented by ReadUncommittedIter and ReadCommittedIter.

                                                    type JSONMarshalerTo

                                                    type JSONMarshalerTo interface {
                                                    	MarshalJSONTo(*bufio.Writer) (int, error)
                                                    }

                                                      JSONMarshalerTo should be implemented (along with json.Unmarshaler) by the message being Marshaled if it needs to specify its JSON encoding method. If this interface is not implemented jsonFraming will default to encoding/json returns the number of bytes written and any error that occurs

                                                      type JournalProducer

                                                      type JournalProducer struct {
                                                      	Journal  pb.Journal
                                                      	Producer ProducerID
                                                      }

                                                        JournalProducer composes an Journal and ProducerID.

                                                        type Mappable

                                                        type Mappable interface{}

                                                          Mappable is an interface suitable for mapping by a MappingFunc. Typically a MappingKeyFunc will cast and assert Mappable's exact type at run-time. Generally a Mappable is a Message but the MappingFunc interface doesn't require this.

                                                          type MappingFunc

                                                          type MappingFunc func(Mappable) (_ pb.Journal, contentType string, _ error)

                                                            MappingFunc maps a Mappable message to a responsible journal. Gazette imposes no formal requirement on exactly how that mapping is performed, or the nature of the mapped journal.

                                                            It's often desired to spread a collection of like messages across a number of journals, with each journal playing the role of a topic partition. Such partitions can be distinguished through a JournalSpec Label such as "app.gazette.dev/message-type: MyMessage". Note that "partition" and "topic" are useful terminology, but play no formal role and have no explicit implementation within Gazette (aside from their expression via Labels and LabelSelectors). See `labels` package documentation for naming conventions.

                                                            A Mapper implementation would typically:

                                                            1) Apply domain knowledge to introspect the Mappable and determine a "topic",
                                                               expressed as a LabelSelector.
                                                            2) Query the broker List RPC to determine current partitions of the topic,
                                                               caching and refreshing List results as needed (see client.PolledList).
                                                            3) Use a ModuloMapping or RendezvousMapping to select among partitions.
                                                            

                                                            The MappingFunc returns the contentType of journal messages, which must have a registered Framing.

                                                            func ModuloMapping

                                                            func ModuloMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

                                                              ModuloMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and modulo arithmetic.

                                                              func RandomMapping

                                                              func RandomMapping(partitions PartitionsFunc) MappingFunc

                                                                RandomMapping returns a MappingFunc which maps a Mappable to a randomly selected Journal of the PartitionsFunc.

                                                                func RendezvousMapping

                                                                func RendezvousMapping(key MappingKeyFunc, partitions PartitionsFunc) MappingFunc

                                                                  RendezvousMapping returns a MappingFunc which maps a Mappable into a stable Journal of the PartitionsFunc, selected via 32-bit FNV-1a of the MappingKeyFunc and Highest Random Weight (aka "rendezvous") hashing. HRW is more expensive to compute than using modulo arithmetic, but is still efficient and minimizes reassignments which occur when journals are added or removed.

                                                                  type MappingKeyFunc

                                                                  type MappingKeyFunc func(Mappable, io.Writer)

                                                                    MappingKeyFunc extracts an appropriate mapping key from the Mappable by writing its value into the provided io.Writer, whose Write() is guaranteed to never return an error.

                                                                    type Message

                                                                    type Message interface {
                                                                    	// GetUUID returns the UUID previously set on the Message. If the Message
                                                                    	// is not capable of tracking UUIDs, GetUUID returns a zero-valued UUID
                                                                    	// to opt the Message out of exactly-once processing semantics. In this
                                                                    	// case, SetUUID is also a no-op.
                                                                    	GetUUID() UUID
                                                                    	// SetUUID sets the UUID of the Message.
                                                                    	SetUUID(UUID)
                                                                    	// NewAcknowledgement returns a new Message instance of this same type which
                                                                    	// will represent an acknowledgement of this (and future) Messages published
                                                                    	// to the Journal within the context of a transaction.
                                                                    	NewAcknowledgement(pb.Journal) Message
                                                                    }

                                                                      Message is an arbitrary user-defined type which may be serialized to and de-serialized from a journal. Examples include plain Go structs which are marshalled using reflection, or types generated by the gogo/protobuf compiler.

                                                                      A Message's implementation is largely independent of the particular _way_ in which serialization to a journal is done, known as a Framing. The same Message instance could be serialized using either JSON or Protobuf, for example. The choice of Framing is controlled by a journal's "content-type" label. Note that some Framings may impose additional run-time interface requirements on Messages, such as ProtoFrameable or CSVFrameable.

                                                                      A journal holds only raw Message serializations. Gazette therefore asks that Messages help with representation by taking, persisting, and when asked, returning UUIDs generated by Gazette. UUIDs may also be directly useful to users, as they're universally unique and they encode a precise publishing timestamp.

                                                                      In some cases, user types may be unable to represent a UUID. The interface can be implemented with no-ops to opt the type out of exactly-once processing, falling back to at-least-once semantics.

                                                                      type NewMessageFunc

                                                                      type NewMessageFunc func(*pb.JournalSpec) (Message, error)

                                                                        NewMessageFunc returns a Message instance of an appropriate type for the reading the given JournalSpec. Implementations may want to introspect the JournalSpec, for example by examining application-specific labels therein. An error is returned if an appropriate Message type cannot be determined.

                                                                        type PartitionsFunc

                                                                        type PartitionsFunc func() *pb.ListResponse

                                                                          PartitionsFunc returns a ListResponse of journal partitions from which a MappingFunc may select. The returned instance pointer may change across invocations, but a returned ListResponse may not be modified. PartitionsFunc should seek to preserve pointer equality of result instances when no substantive change has occurred. See also: client.PolledList.

                                                                          type ProducerID

                                                                          type ProducerID [6]byte

                                                                            ProducerID is the unique node identifier portion of a v1 UUID.

                                                                            func GetProducerID

                                                                            func GetProducerID(uuid UUID) ProducerID

                                                                              GetProducerID returns the node identifier of a UUID as a ProducerID.

                                                                              func NewProducerID

                                                                              func NewProducerID() ProducerID

                                                                                NewProducerID returns a cryptographically random ProducerID which is very, very likely to be unique (47 bits of entropy, a space of ~141 trillion) provided that each ProducerID has a reasonably long lifetime (eg on the order of a process, not of a request).

                                                                                type ProducerState

                                                                                type ProducerState struct {
                                                                                	JournalProducer
                                                                                	// LastAck is the Clock of the Producer's last ACK_TXN or OUTSIDE_TXN.
                                                                                	LastAck Clock
                                                                                	// Begin is the offset of the first message byte having CONTINUE_TXN that's
                                                                                	// larger than LastAck. Eg, it's the offset which opens the next transaction.
                                                                                	// If there is no such message, Begin is -1.
                                                                                	Begin pb.Offset
                                                                                }

                                                                                  ProducerState is a snapshot of a Producer's state within a Journal. It's marshalled into consumer checkpoints to allow a Sequencer to recover producer sequence states after a consumer process fault.

                                                                                  type ProtoFrameable

                                                                                  type ProtoFrameable interface {
                                                                                  	ProtoSize() int
                                                                                  	MarshalTo([]byte) (int, error)
                                                                                  	Unmarshal([]byte) error
                                                                                  }

                                                                                    ProtoFrameable is the Frameable interface required by a Framing of protobuf messages.

                                                                                    type Publisher

                                                                                    type Publisher struct {
                                                                                    	// contains filtered or unexported fields
                                                                                    }

                                                                                      Publisher maps, sequences, and asynchronously appends messages to Journals. It supports two modes of publishing: PublishCommitted and PublishUncommitted. Committed messages are immediately read-able by a read-committed reader. Uncommitted messages are immediately read-able by a read-uncommitted reader, but not by a read-committed reader until a future "acknowledgement" (ACK) message marks them as committed -- an ACK which may not ever come.

                                                                                      To publish as a transaction, the client first issues a number of PublishUncommitted calls. Once all pending messages have been published, BuildAckIntents returns []AckIntents which will inform readers that published messages have committed and should be processed. To ensure atomicity of the published transaction, []AckIntents must be written to stable storage *before* being applied, and must be re-tried on fault.

                                                                                      As a rule of thumb, API servers or other pure "producers" of events in Gazette should use PublishCommitted. Gazette consumers should use PublishUncommitted to achieve end-to-end exactly once semantics: upon commit, each consumer transaction will automatically acknowledge all such messages published over the course of the transaction.

                                                                                      Consumers *may* instead use PublishCommitted, which may improve latency slightly (as read-committed readers need not wait for the consumer transaction to commit), but must be aware that its use weakens the effective processing guarantee to at-least-once.

                                                                                      func NewPublisher

                                                                                      func NewPublisher(ajc client.AsyncJournalClient, clock *Clock) *Publisher

                                                                                        NewPublisher returns a new Publisher using the given AsyncJournalClient and optional *Clock. If *Clock is nil, then an internal Clock is allocated and is updated with time.Now on each message published. If a non-nil *Clock is provided, it should be updated by the caller at a convenient time resolution, which can greatly reduce the frequency of time system calls.

                                                                                        func (*Publisher) BuildAckIntents

                                                                                        func (p *Publisher) BuildAckIntents() ([]AckIntent, error)

                                                                                          BuildAckIntents returns the []AckIntents which acknowledge all pending Messages published since its last invocation. It's the caller's job to actually append the intents to their respective journals, and only *after* checkpoint-ing the intents to a stable store so that they may be re-played in their entirety should a fault occur. Without doing this, in the presence of faults it's impossible to ensure that ACKs are written to _all_ journals, and not just some of them (or none).

                                                                                          Applications running as Gazette consumers *must not* call BuildAckIntents themselves. This is done on the application's behalf, as part of building the checkpoints which are committed with consumer transactions.

                                                                                          Uses of PublishUncommitted outside of consumer applications, however, *are* responsible for building, committing, and writing []AckIntents themselves.

                                                                                          func (*Publisher) PublishCommitted

                                                                                          func (p *Publisher) PublishCommitted(mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

                                                                                            PublishCommitted maps the Message to a Journal and begins an AsyncAppend of its marshaled content, with a UUID sequenced for immediate consumption. An error is returned if:

                                                                                            * The Message implements Validator, and it returns an error.
                                                                                            * The MappingFunc returns an error while mapping the Message to a journal.
                                                                                            * The journal's Framing returns an error while marshaling the Message,
                                                                                              or an os.PathError occurs while spooling the frame to a temporary file
                                                                                              (eg, because local disk is full).
                                                                                            

                                                                                            A particular MappingFunc error to be aware of is ErrEmptyListResponse, returned by mapping routines of this package when there are no journals that currently match the mapping's selector. The caller may wish to retry at a later time in the case of ErrEmptyListResponse or os.PathError.

                                                                                            Note that the message UUID will not yet be set when Validator or MappingFunc is invoked. This is because generation of UUIDs must be synchronized over the journal to which the Message is written to preserve ordering, and this cannot be known until mapping has been done.

                                                                                            If desired, the caller may select on Done of the returned *AsyncAppend to be notified as soon as this particular Message has committed to the journal. This might be appropriate when publishing as part of an HTTP request, where status is to reported to the client.

                                                                                            Callers are also free to altogether ignore the returned *AsyncAppend, perhaps within a non-blocking "fire and forget" of collected logs or metrics.

                                                                                            Another option is to issue a periodic "write barrier", where the caller uses PendingExcept of the underlying AsyncJournalClient and waits over the returned OpFutures. At that time the caller is assured that all prior publishes have committed, without having to track or wait for them individually.

                                                                                            PublishCommitted is safe for concurrent use.

                                                                                            func (*Publisher) PublishUncommitted

                                                                                            func (p *Publisher) PublishUncommitted(mapping MappingFunc, msg Message) (*client.AsyncAppend, error)

                                                                                              PublishUncommitted is like PublishCommitted but sequences the Message as part of an open transaction. The Message must later be acknowledged before it will be visible to read-committed readers. The Journal is tracked and included in the results of the next BuildAckIntents. PublishUncommitted is *not* safe for concurrent use.

                                                                                              type ReadCommittedIter

                                                                                              type ReadCommittedIter struct {
                                                                                              	// contains filtered or unexported fields
                                                                                              }

                                                                                                ReadCommittedIter is an Iterator over read-committed messages. It's little more than the composition of a provided Sequencer with an underlying ReadUncommittedIter.

                                                                                                If a dequeue of the Sequencer returns ErrMustStartReplay, then ReadCommittedIter will automatically start the appropriate replay in order to continue its iteration.

                                                                                                func NewReadCommittedIter

                                                                                                func NewReadCommittedIter(rr *client.RetryReader, newMsg NewMessageFunc, seq *Sequencer) *ReadCommittedIter

                                                                                                  NewReadCommittedIter returns a ReadCommittedIter over message Envelopes read from the RetryReader. The provided Sequencer is used to sequence committed messages. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

                                                                                                  func (*ReadCommittedIter) Next

                                                                                                  func (it *ReadCommittedIter) Next() (Envelope, error)

                                                                                                    Next returns the next read-committed message Envelope in the sequence. It returns EOF if none remain, or any other encountered error.

                                                                                                    type ReadUncommittedIter

                                                                                                    type ReadUncommittedIter struct {
                                                                                                    	// contains filtered or unexported fields
                                                                                                    }

                                                                                                      ReadUncommittedIter is an Iterator over read-uncommitted messages.

                                                                                                      func NewReadUncommittedIter

                                                                                                      func NewReadUncommittedIter(rr *client.RetryReader, newMsg NewMessageFunc) *ReadUncommittedIter

                                                                                                        NewReadUncommittedIter returns a ReadUncommittedIter over message Envelopes read from the RetryReader. The reader's journal must have an appropriate labels.ContentType label, which is used to determine the message Framing.

                                                                                                        func (*ReadUncommittedIter) Next

                                                                                                        func (it *ReadUncommittedIter) Next() (Envelope, error)

                                                                                                          Next reads and returns the next Envelope or error.

                                                                                                          type Sequencer

                                                                                                          type Sequencer struct {
                                                                                                          	// contains filtered or unexported fields
                                                                                                          }

                                                                                                            Sequencer observes read-uncommitted messages from journals and sequences them into acknowledged, read-committed messages. Read uncommitted messages are fed to QueueUncommitted, after which the client must repeatedly call DequeCommitted to drain all acknowledged messages until io.EOF is returned.

                                                                                                            In more detail, messages observed by QueueUncommitted may acknowledge one or more pending messages previously observed by QueueUncommitted. For example, a non-duplicate message with Flag_OUTSIDE_TXN acknowledges itself, and a message with Flag_ACK_TXN also acknowledges messages having a lower clock. DequeCommitted will drain the complete set of now-committed messages, and then return io.EOF.

                                                                                                            An advantage of the design is that no head-of-line blocking occurs: committed messages are immediately deque'd upon observing their corresponding ACK_TXN, even if they're interleaved with still-pending messages of other producers.

                                                                                                            Sequencer maintains an internal ring buffer of messages, which is usually sufficient to directly read committed messages. When recovering from a checkpoint, or if a very long sequence or old producer is acknowledged, it may be necessary to start a replay of already-read messages. In this case:

                                                                                                            * DequeCommitted will return ErrMustStartReplay.
                                                                                                            * ReplayRange will return the exact offset range required.
                                                                                                            * The client must then supply an appropriate Iterator to StartReplay.
                                                                                                            

                                                                                                            Having done this, calls to DequeCommitted may resume to drain messages.

                                                                                                            func NewSequencer

                                                                                                            func NewSequencer(states []ProducerState, buffer int) *Sequencer

                                                                                                              NewSequencer returns a new Sequencer initialized from the given ProducerStates, and with an internal ring buffer of the given size.

                                                                                                              func (*Sequencer) DequeCommitted

                                                                                                              func (w *Sequencer) DequeCommitted() (env Envelope, err error)

                                                                                                                DequeCommitted returns the next acknowledged message, or io.EOF if no acknowledged messages remain. It must be called repeatedly after each QueueUncommitted until it returns io.EOF. If messages are no longer within the Sequencer's buffer, it returns ErrMustStartReplay and the caller must first StartReplay before trying again.

                                                                                                                func (*Sequencer) HasPending

                                                                                                                func (w *Sequencer) HasPending(since pb.Offsets) bool

                                                                                                                  HasPending returns true if any partial sequence has a first offset larger than those of the Offsets (eg, the sequence started since |since| was read). Assuming liveness of producers, it hints that further messages are forthcoming.

                                                                                                                  func (*Sequencer) ProducerStates

                                                                                                                  func (w *Sequencer) ProducerStates(pruneHorizon time.Duration) []ProducerState

                                                                                                                    ProducerStates returns a snapshot of producers and their states, after pruning any producers having surpassed pruneHorizon in age relative to the most recent producer within their journal. If pruneHorizon is zero, no pruning is done. ProducerStates panics if messages still remain to deque.

                                                                                                                    func (*Sequencer) QueueUncommitted

                                                                                                                    func (w *Sequencer) QueueUncommitted(env Envelope)

                                                                                                                      QueueUncommitted applies the next read-uncommitted message Envelope to the Sequencer. It panics if called while messages remain to dequeue.

                                                                                                                      func (*Sequencer) ReplayRange

                                                                                                                      func (w *Sequencer) ReplayRange() (begin, end pb.Offset)

                                                                                                                        ReplayRange returns the [begin, end) exclusive byte offsets to be replayed. Panics if ErrMustStartReplay was not returned by DequeCommitted.

                                                                                                                        func (*Sequencer) StartReplay

                                                                                                                        func (w *Sequencer) StartReplay(it Iterator)

                                                                                                                          StartReplay is called with a read-uncommitted Iterator over ReplayRange. Panics if ErrMustStartReplay was not returned by DequeCommitted.

                                                                                                                          type UUID

                                                                                                                          type UUID = uuid.UUID

                                                                                                                            UUID is a RFC 4122 v1 variant Universally Unique Identifier which uniquely identifies a message. As a v1 UUID, it incorporates a clock timestamp and sequence, as well as a node identifier (which, within the context of Gazette, is also known as a ProducerID).

                                                                                                                            Each sequence of UUIDs produced by Gazette use a strongly random ProducerID, and as such the RFC 4122 purpose of the clock sequence isn't required. Instead, Gazette uses clock sequence bits of UUIDs it generates in the following way:

                                                                                                                            * The first 2 bits are reserved to represent the variant, as per RFC 4122.
                                                                                                                            * The next 4 bits extend the 60 bit timestamp with a counter, which allows
                                                                                                                              for a per-producer UUID generation rate of 160M UUIDs / second before
                                                                                                                              running ahead of wall-clock time. The timestamp and counter are monotonic,
                                                                                                                              and together provide a total ordering of UUIDs from each ProducerID.
                                                                                                                            * The remaining 10 bits are flags, eg for representing transaction semantics.
                                                                                                                            

                                                                                                                            func BuildUUID

                                                                                                                            func BuildUUID(id ProducerID, clock Clock, flags Flags) UUID

                                                                                                                              BuildUUID builds v1 UUIDs per RFC 4122.

                                                                                                                              type UnmarshalFunc

                                                                                                                              type UnmarshalFunc func(Frameable) error

                                                                                                                                UnmarshalFunc is returned by a Framing's NewUnmarshalFunc. It unpacks and decodes Frameable instances from the underlying bufio.Reader. It must not read beyond the precise byte boundary of each message frame (eg, by internally buffering reads beyond the frame end).

                                                                                                                                type Validator

                                                                                                                                type Validator = pb.Validator

                                                                                                                                  Validator is an optional interface of a Message able to Validate itself. An attempt to publish a Message which does not Validate will error.