Package client implements a Go client for interacting with the gRPC Journal service of Gazette brokers. It concerns itself with operations over journal specifications, fragments and byte-streams. See package message for an implementation of messages layered atop journal streams.

    The package provides Reader and RetryReader, which adapt the broker Read RPC to the io.Reader interface. Reader will utilize just one Read RPC, while RetryReader will silently restart Read RPCs as needed:

    // Copy a journal byte range to os.Stdout.
    io.Copy(os.Stdout, NewReader(ctx, client, pb.ReadRequest{
        Journal: "a/journal/name",
        Offset:  1234,
        EndOffset: 5678,

    It provides Appender, which adapts the Append RPC to a io.WriteCloser:

    // Copy os.Stdin to the journal.
    var a = NewAppender(ctx, client, pb.AppendRequest{
        Journal: "a/journal/name",
    if err = io.Copy(a, os.Stdin); err == nil {
        err = a.Close() // Commit the append.

    Gazette appends are linearizable (atomic) per journal. Appender streams content to brokers as its written, but no content of an Appender will be visible to any reader until Close is called and succeeds. An implication of this is that once brokers have begun to sequence an append into a journal, they expect the remaining content and Close of that Appender to be forthcoming, and will quickly time it out if it stalls. Uses of Appender should thus be limited to cases where its full content is readily available.

    Most clients should instead use an AppendService. It offers automatic retries, an asynchronous API, and supports constraints on the ordering of appends with respect to other ongoing operations (ie, "append to journal Foo, but not before this append to journal Bar completes"). It also dynamically batches many co-occurring small writes into larger ones for efficiency.

    var as = NewAppendService(ctx, client)
    var op = as.StartAppend(pb.AppendRequest{
        Journal: "a/journal/name",
    }, myOtherOpsWhichMustCompleteFirst)
    // Produce content to append into the AsyncAppend's Writer.
    // We hold an exclusive lock over it until Release.
    op.Writer().Write("hello, ")
    op.Writer().Write("gazette: ")
    op.Require(os.Copy(op.Writer(), os.Stdin))
    // If os.Copy error'd, it aborts the append and is returned by Release.
    if err = op.Release(); err == nil {
        err = op.Err() // Blocks until operation completes.

    The package offers functions for listing Fragments & JournalSpecs and applying JournalSpecs, while accounting for pagination details. Also notable is PolledList, which is an important building-block for applications scaling to multiple journals.



    This section is empty.


    View Source
    var (
    	// Map common broker error statuses into named errors.
    	ErrNotJournalBroker        = errors.New(pb.Status_NOT_JOURNAL_BROKER.String())
    	ErrNotJournalPrimaryBroker = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String())
    	ErrOffsetNotYetAvailable   = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String())
    	ErrRegisterMismatch        = errors.New(pb.Status_REGISTER_MISMATCH.String())
    	ErrWrongAppendOffset       = errors.New(pb.Status_WRONG_APPEND_OFFSET.String())
    	// ErrOffsetJump is returned by Reader.Read to indicate that the next byte
    	// available to be read is at a larger offset than that requested (eg,
    	// because a span of the Journal has been deleted). The Reader's ReadResponse
    	// should be inspected by the caller, and Read may be invoked again to continue.
    	ErrOffsetJump = errors.New("offset jump")
    	// ErrSeekRequiresNewReader is returned by Reader.Seek if it is unable to
    	// satisfy the requested Seek. A new Reader should be started instead.
    	ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader")
    	// ErrDidNotReadExpectedEOF is returned by FragmentReader.Read if the
    	// underlying file did not return EOF at the expected Fragment End offset.
    	ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")


    func Append

    func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest,
    	content (pb.AppendResponse, error)

      Append zero or more ReaderAts to a journal as a single Append transaction. Append retries on transport or routing errors, but fails on all other errors. Each ReaderAt is read from byte zero until EOF, and may be read multiple times. If no ReaderAts are provided, an Append RPC with no content is issued.

      func ApplyJournals

      func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)

        ApplyJournals applies journal changes detailed in the ApplyRequest via the broker Apply RPC. Changes are applied as a single Etcd transaction. If the change list is larger than an Etcd transaction can accommodate, ApplyJournalsInBatches should be used instead. ApplyResponse statuses other than OK are mapped to an error.

        func ApplyJournalsInBatches

        func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error)

          ApplyJournalsInBatches is like ApplyJournals, but chunks the ApplyRequest into batches of the given size, which should be less than Etcd's maximum configured transaction size (usually 128). If size is 0 all changes will be attempted in a single transaction. Be aware that ApplyJournalsInBatches may only partially succeed, with some batches having applied and others not. The final ApplyResponse is returned, unless an error occurs. ApplyResponse statuses other than OK are mapped to an error.

          func GetJournal

          func GetJournal(ctx context.Context, jc pb.JournalClient, journal pb.Journal) (*pb.JournalSpec, error)

            GetJournal retrieves the JournalSpec of the named Journal, or returns an error.

            func InstallFileTransport

            func InstallFileTransport(root string) (remove func())

              InstallFileTransport registers a file:// protocol handler at the given root with the http.Client used by OpenFragmentURL. It's used in testing contexts, and is also appropriate when brokers share a common NAS file store to which fragments are persisted, and to which this client also has access. The returned cleanup function removes the handler and restores the prior http.Client.

              const root = "/mnt/shared-nas-array/path/to/fragment-root"
              defer client.InstallFileTransport(root)()
              var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
                  Journal: "a/journal/with/nas/fragment/store",
                  DoNotProxy: true,
              // rr.Read will read Fragments directly from NAS.

              func ListAllFragments

              func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)

                ListAllFragments performs multiple Fragments RPCs, as required to join across multiple FragmentsResponse pages, and returns the completed FragmentResponse. Any encountered error is returned.

                func ListAllJournals

                func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)

                  ListAllJournals performs multiple List RPCs, as required to join across multiple ListResponse pages, and returns the complete ListResponse of the ListRequest. Any encountered error is returned.


                  type AppendService

                  type AppendService struct {
                  	// contains filtered or unexported fields

                    AppendService batches, dispatches, and (if needed) retries asynchronous Append RPCs. Use of an AppendService is appropriate for clients who make large numbers of small writes to a Journal, and where those writes may be pipelined and batched to amortize the cost of broker Append RPCs. It may also simplify implementations for clients who would prefer to simply have writes block until successfully committed, as opposed to handling errors and retries themselves.

                    For each journal, AppendService manages an ordered list of AsyncAppends, each having buffered content to be appended. The list is dispatched in FIFO order by a journal-specific goroutine.

                    AsyncAppends are backed by temporary files on the local disk rather than memory buffers. This minimizes the impact of buffering on the heap and garbage collector, and also makes AppendService tolerant to sustained service disruptions (up to the capacity of the disk).

                    AppendService implements the AsyncJournalClient interface.

                    func NewAppendService

                    func NewAppendService(ctx context.Context, client pb.RoutedJournalClient) *AppendService

                      NewAppendService returns an AppendService with the provided Context and BrokerClient.

                      func (*AppendService) PendingExcept

                      func (s *AppendService) PendingExcept(except pb.Journal) OpFutures

                        PendingExcept implements the AsyncJournalClient interface.

                        func (*AppendService) StartAppend

                        func (s *AppendService) StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend

                          StartAppend implements the AsyncJournalClient interface.

                          type Appender

                          type Appender struct {
                          	Request  pb.AppendRequest  // AppendRequest of the Append.
                          	Response pb.AppendResponse // AppendResponse sent by broker.
                          	// contains filtered or unexported fields

                            Appender adapts an Append RPC to the io.WriteCloser interface. The first byte written to the Appender initiates the RPC. Subsequent bytes are streamed to brokers as they are written. Writes to the Appender may stall as the RPC window fills, while waiting for brokers to sequence this Append into the journal. Once they do, brokers will expect remaining content to append is quickly written to this Appender (and may time-out the RPC if it's not).

                            Content written to this Appender does not commit until Close is called, including cases where the application dies without calling Close. If a call to Close is started and the application dies before Close returns, the append may or may commit.

                            The application can cleanly roll-back a started Appender by Aborting it.

                            func NewAppender

                            func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.AppendRequest) *Appender

                              NewAppender returns an initialized Appender of the given AppendRequest.

                              func (*Appender) Abort

                              func (a *Appender) Abort()

                                Abort the append, causing the broker to discard previously written content.

                                func (*Appender) Close

                                func (a *Appender) Close() (err error)

                                  Close the Append to complete the transaction, committing previously written content. If Close returns without an error, Append.Response will hold the broker response.

                                  func (*Appender) Reset

                                  func (a *Appender) Reset()

                                    Reset the Appender to its post-construction state, allowing it to be re-used or re-tried. Reset without a prior Close or Abort will leak resources.

                                    func (*Appender) Write

                                    func (a *Appender) Write(p []byte) (n int, err error)

                                      Write to the Appender, starting an Append RPC if this is the first Write.

                                      type AsyncAppend

                                      type AsyncAppend struct {
                                      	// contains filtered or unexported fields

                                        AsyncAppend represents an asynchronous Append RPC started and managed by an AppendService.

                                        func (*AsyncAppend) Done

                                        func (p *AsyncAppend) Done() <-chan struct{}

                                          Done returns a channel which selects when the AsyncAppend has committed or has been aborted along with the AppendService's Context.

                                          func (*AsyncAppend) Err

                                          func (p *AsyncAppend) Err() error

                                            Err blocks until Done, and returns the final operation error.

                                            func (*AsyncAppend) Release

                                            func (p *AsyncAppend) Release() error

                                              Release the AsyncAppend, allowing further writes to queue or for it to be dispatched to the brokers. Release first determines whether a previous Require failed, or if a Writer error occurred, in which case it will roll back all writes queued by the caller, aborting the append transaction, and return the non-nil error. A non-nil error is returned if and only if the Append was rolled back. Otherwise, the caller may then select on Done to determine when the AsyncAppend has committed and its Response may be examined.

                                              func (*AsyncAppend) Request

                                              func (p *AsyncAppend) Request() pb.AppendRequest

                                                Request returns the AppendRequest that was or will be made by this AsyncAppend. Request is safe to call at all times.

                                                func (*AsyncAppend) Require

                                                func (p *AsyncAppend) Require(err error) *AsyncAppend

                                                  Require the error to be nil. If Require is called with a non-nil error, the error is retained and later returned by Release, in which case it will also roll back any writes queued by the caller, aborting the append transaction. Require is valid for use only until Release is called. Require returns itself, allowing uses like:


                                                  func (*AsyncAppend) Response

                                                  func (p *AsyncAppend) Response() pb.AppendResponse

                                                    Response returns the AppendResponse from the broker, and may be called only after Done selects.

                                                    func (*AsyncAppend) Writer

                                                    func (p *AsyncAppend) Writer() *bufio.Writer

                                                      Writer returns a bufio.Writer to which content may be appended. Writer is valid for use only until Release is called. Clients may ignore write errors of the Writer, preferring to "fire and forget" a sequence of writes which could fail: Release will itself Require that no error is set on the Writer.

                                                      type AsyncJournalClient

                                                      type AsyncJournalClient interface {
                                                      	// StartAppend begins a new asynchronous Append RPC. The caller holds exclusive access
                                                      	// to the returned AsyncAppend, and must then:
                                                      	//  * Write content to its Writer.
                                                      	//  * Optionally Require that one or more errors are nil.
                                                      	//  * Release the AsyncAppend, allowing queued writes to commit or,
                                                      	//    if an error occurred, to roll back.
                                                      	// For performance reasons, an Append will often be batched with other Appends
                                                      	// having identical AppendRequests which are dispatched to this AppendService,
                                                      	// and note the Response.Fragment will reflect the entire batch written to the
                                                      	// broker. In all cases, relative order of Appends is preserved. One or more
                                                      	// dependencies may optionally be supplied. The Append RPC will not begin
                                                      	// until all dependencies have completed without error. A failure of a
                                                      	// dependency will also permanently fail the returned AsyncAppend and prevent
                                                      	// any further appends to this journal from starting. For this reason, an
                                                      	// OpFuture should only fail if it also invalidates the AsyncJournalClient
                                                      	// (eg, because the client is scoped to a context which is invalidated by the
                                                      	// OpFuture failure).
                                                      	StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend
                                                      	// PendingExcept returns an OpFutures set of *AsyncAppend instances being
                                                      	// evaluated for all Journals other than |except|. It can be used to build
                                                      	// "barriers" which ensure that all pending appends have committed prior to the
                                                      	// commencement of an append which is about to be issued. Eg, given:
                                                      	//   var op = as.StartAppend(pb.AppendRequest{Journal: "target"}, as.PendingExcept("target"))
                                                      	//   op.Writer().WriteString("checkpoint")
                                                      	//   op.Release()
                                                      	// All ongoing appends to journals other than "target" are guaranteed to commit
                                                      	// before an Append RPC is begun which writes "checkpoint" to journal "target".
                                                      	// PendingExcept("") returns all pending AsyncAppends.
                                                      	// If a prior journal append failed (eg, because its dependency failed) a
                                                      	// resolved OpFuture with that error will be included in returned OpFutures.
                                                      	// This ensures the error will properly cascade to an operation which may
                                                      	// depend on these OpFutures.
                                                      	PendingExcept(except pb.Journal) OpFutures

                                                        AsyncJournalClient composes a RoutedJournalClient with an API for performing asynchronous Append operations.

                                                        type AsyncOperation

                                                        type AsyncOperation struct {
                                                        	// contains filtered or unexported fields

                                                          AsyncOperation is a simple, minimal implementation of the OpFuture interface.

                                                          func NewAsyncOperation

                                                          func NewAsyncOperation() *AsyncOperation

                                                            NewAsyncOperation returns a new AsyncOperation.

                                                            func (*AsyncOperation) Done

                                                            func (o *AsyncOperation) Done() <-chan struct{}

                                                              Done selects when Resolve is called.

                                                              func (*AsyncOperation) Err

                                                              func (o *AsyncOperation) Err() error

                                                                Err blocks until Resolve is called, then returns its error.

                                                                func (*AsyncOperation) Resolve

                                                                func (o *AsyncOperation) Resolve(err error)

                                                                  Resolve marks the AsyncOperation as completed with the given error.

                                                                  type FragmentReader

                                                                  type FragmentReader struct {
                                                                  	pb.Fragment       // Fragment being read.
                                                                  	Offset      int64 // Next journal offset to be read, in range [Begin, End).
                                                                  	// contains filtered or unexported fields

                                                                    FragmentReader directly reads from an opened Fragment file.

                                                                    func NewFragmentReader

                                                                    func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error)

                                                                      NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a returned *FragmentReader which has been pre-seeked to the given offset.

                                                                      func OpenFragmentURL

                                                                      func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error)

                                                                        OpenFragmentURL directly opens the Fragment, which must be available at the given URL, and returns a *FragmentReader which has been pre-seeked to the given offset.

                                                                        func (*FragmentReader) Close

                                                                        func (fr *FragmentReader) Close() error

                                                                          Close closes the underlying ReadCloser and associated decompressor (if any).

                                                                          func (*FragmentReader) Read

                                                                          func (fr *FragmentReader) Read(p []byte) (n int, err error)

                                                                            Read returns the next bytes of decompressed Fragment content. When Read returns, Offset has been updated to reflect the next byte to be read. Read returns EOF only if the underlying Reader returns EOF at precisely Offset == Fragment.End. If the underlying Reader is too short, io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF is returned.

                                                                            type OpFuture

                                                                            type OpFuture interface {
                                                                            	// Done selects when operation background execution has finished.
                                                                            	Done() <-chan struct{}
                                                                            	// Err blocks until Done() and returns the final error of the OpFuture.
                                                                            	Err() error

                                                                              OpFuture represents an operation which is executing in the background. The operation has completed when Done selects. Err may be invoked to determine whether the operation succeeded or failed.

                                                                              func FinishedOperation

                                                                              func FinishedOperation(err error) OpFuture

                                                                                FinishedOperation is a convenience that returns an already-resolved AsyncOperation.

                                                                                type OpFutures

                                                                                type OpFutures map[OpFuture]struct{}

                                                                                  OpFutures is a set of OpFuture instances.

                                                                                  func (OpFutures) IsSubsetOf

                                                                                  func (s OpFutures) IsSubsetOf(other OpFutures) bool

                                                                                    IsSubsetOf is true of this OpFutures is a subset of the other.

                                                                                    type PolledList

                                                                                    type PolledList struct {
                                                                                    	// contains filtered or unexported fields

                                                                                      PolledList periodically polls the List RPC with a given ListRequest, making its most recent result available via List. It's a building block for applications which interact with dynamic journal sets and wish to react to changes in their set membership over time.

                                                                                      var partitions, _ = protocol.ParseLabelSelector("logs=clicks, source=mobile")
                                                                                      var pl, err = NewPolledList(ctx, client, time.Minute, protocol.ListRequest{
                                                                                          Selector: partitions,

                                                                                      func NewPolledList

                                                                                      func NewPolledList(ctx context.Context, client pb.JournalClient, dur time.Duration, req pb.ListRequest) (*PolledList, error)

                                                                                        NewPolledList returns a PolledList of the ListRequest which is initialized and ready for immediate use, and which will regularly refresh with the given Duration. An error encountered in the first List RPC is returned. Subsequent RPC errors will be logged as warnings and retried as part of regular refreshes.

                                                                                        func (*PolledList) List

                                                                                        func (pl *PolledList) List() *pb.ListResponse

                                                                                          List returns the most recent polled & merged ListResponse (see ListAllJournals).

                                                                                          func (*PolledList) UpdateCh

                                                                                          func (pl *PolledList) UpdateCh() <-chan struct{}

                                                                                            UpdateCh returns a channel which is signaled with each update of the PolledList. Only one channel is allocated and one signal sent per-update, so if multiple goroutines select from UpdateCh only one will wake.

                                                                                            type Reader

                                                                                            type Reader struct {
                                                                                            	Request  pb.ReadRequest  // ReadRequest of the Reader.
                                                                                            	Response pb.ReadResponse // Most recent ReadResponse from broker.
                                                                                            	// contains filtered or unexported fields

                                                                                              Reader adapts a Read RPC to the io.Reader interface. The first byte read from the Reader initiates the RPC, and subsequent reads stream from it.

                                                                                              If DoNotProxy is true then the broker may close the RPC after sending a signed Fragment URL, and Reader will directly open the Fragment (decompressing if needed), seek to the requested offset, and read its content.

                                                                                              Reader returns EOF if:

                                                                                              * The broker closes the RPC, eg because its assignment has change or it's shutting down.
                                                                                              * The requested EndOffset has been read through.
                                                                                              * A Fragment being read by the Reader reaches EOF.

                                                                                              If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable is returned upon reaching the journal write head.

                                                                                              A Reader is invalidated by its first returned error, with the exception of ErrOffsetJump: this error is returned to notify the client that the next Journal offset to be Read is not the offset that was requested (eg, because a portion of the Journal was deleted), but the Reader is prepared to continue at the updated, strictly larger offset.

                                                                                              func NewReader

                                                                                              func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader

                                                                                                NewReader returns an initialized Reader of the given ReadRequest.

                                                                                                func (*Reader) AdjustedOffset

                                                                                                func (r *Reader) AdjustedOffset(br *bufio.Reader) int64

                                                                                                  AdjustedOffset returns the current journal offset adjusted for content read by the bufio.Reader (which must wrap this Reader), which has not yet been consumed from the bufio.Reader's buffer.

                                                                                                  func (*Reader) Read

                                                                                                  func (r *Reader) Read(p []byte) (n int, err error)

                                                                                                    Read from the journal. If this is the first Read of the Reader, a Read RPC is started.

                                                                                                    func (*Reader) Seek

                                                                                                    func (r *Reader) Seek(offset int64, whence int) (int64, error)

                                                                                                      Seek provides a limited form of seeking support. Specifically, if:

                                                                                                      * A Fragment URL is being directly read, and
                                                                                                      * The Seek offset is ahead of the current Reader offset, and
                                                                                                      * The Fragment also covers the desired Seek offset

                                                                                                      Then a seek is performed by reading and discarding to the seeked offset. Seek will otherwise return ErrSeekRequiresNewReader.

                                                                                                      type RetryReader

                                                                                                      type RetryReader struct {
                                                                                                      	// Context of the RetryReader, which parents the Context provided to
                                                                                                      	// underlying *Reader instances.
                                                                                                      	Context context.Context
                                                                                                      	// Client of the RetryReader.
                                                                                                      	Client pb.RoutedJournalClient
                                                                                                      	// Reader is the current underlying Reader of the RetryReader. This instance
                                                                                                      	// may change many times over the lifetime of a RetryReader, as Read RPCs
                                                                                                      	// finish or are cancelled and then restarted.
                                                                                                      	Reader *Reader
                                                                                                      	// Cancel Read operations of the current *Reader. Notably this will cause an
                                                                                                      	// ongoing blocked Read (as well as any future Reads) to return a "Cancelled"
                                                                                                      	// error. Restart may be called to re-initialize the RetryReader.
                                                                                                      	Cancel context.CancelFunc

                                                                                                        RetryReader wraps Reader with error handling and retry behavior, as well as support for cancellation of an ongoing Read or Seek operation. RetryReader is not thread-safe, with one exception: Cancel may be called from one goroutine to abort an ongoing Read or Seek call in another.

                                                                                                        func NewRetryReader

                                                                                                        func NewRetryReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *RetryReader

                                                                                                          NewRetryReader returns a RetryReader initialized with the BrokerClient and ReadRequest.

                                                                                                          func (*RetryReader) AdjustedOffset

                                                                                                          func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64

                                                                                                            AdjustedOffset delegates to the current Reader's AdjustedOffset.

                                                                                                            func (*RetryReader) AdjustedSeek

                                                                                                            func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)

                                                                                                              AdjustedSeek sets the offset for the next Read, accounting for buffered data and, where possible, accomplishing the AdjustedSeek by discarding from the bufio.Reader.

                                                                                                              func (*RetryReader) Journal

                                                                                                              func (rr *RetryReader) Journal() pb.Journal

                                                                                                                Journal being read by this RetryReader.

                                                                                                                func (*RetryReader) Offset

                                                                                                                func (rr *RetryReader) Offset() int64

                                                                                                                  Offset of the next Journal byte to be returned by Read.

                                                                                                                  func (*RetryReader) Read

                                                                                                                  func (rr *RetryReader) Read(p []byte) (n int, err error)

                                                                                                                    Read returns the next bytes of journal content. It will return a non-nil error in the following cases:

                                                                                                                    * Cancel is called, or the RetryReader context is cancelled.
                                                                                                                    * The broker returns OFFSET_NOT_YET_AVAILABLE (ErrOffsetNotYetAvailable)
                                                                                                                      for a non-blocking ReadRequest.
                                                                                                                    * An offset jump occurred (ErrOffsetJump), in which case the client
                                                                                                                      should inspect the new Offset and may continue reading if desired.
                                                                                                                    * The broker returns io.EOF upon reaching the requested EndOffset.

                                                                                                                    All other errors are retried.

                                                                                                                    func (*RetryReader) Restart

                                                                                                                    func (rr *RetryReader) Restart(req pb.ReadRequest)

                                                                                                                      Restart the RetryReader with a new ReadRequest. Restart without a prior Cancel will leak resources.

                                                                                                                      func (*RetryReader) Seek

                                                                                                                      func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)

                                                                                                                        Seek sets the offset for the next Read. It returns an error if (and only if) whence is io.SeekEnd, which is not supported. Where possible Seek will delegate to the current Reader, but in most cases a new Read RPC must be started.

                                                                                                                        type RouteCache

                                                                                                                        type RouteCache struct {
                                                                                                                        	// contains filtered or unexported fields

                                                                                                                          RouteCache caches observed Routes for items (eg, Journals, or Shards). It implements the protocol.DispatchRouter interface, and where a cached Route of an item is available, it enables applications to dispatch RPCs directly to the most appropriate broker or consumer process. This reduces the overall number of network hops, and especially the number of hops crossing availability zones (which often cost more).

                                                                                                                          For example, RouteCache can direct an application to a broker in its same availability zone which is replicating a desired journal, and to which a long-lived Read RPC can be dispatched.

                                                                                                                          // Adapt a JournalClient to a RoutedJournalClient by using a RouteCache.
                                                                                                                          var jc protocol.JournalClient
                                                                                                                          var rjc = protocol.NewRoutedJournalClient(jc, NewRouteCache(256, time.Hour))

                                                                                                                          func NewRouteCache

                                                                                                                          func NewRouteCache(size int, ttl time.Duration) *RouteCache

                                                                                                                            NewRouteCache returns a RouteCache of the given size (which must be > 0) and caching Duration.

                                                                                                                            func (*RouteCache) IsNoopRouter

                                                                                                                            func (rc *RouteCache) IsNoopRouter() bool

                                                                                                                              IsNoopRouter returns false.

                                                                                                                              func (*RouteCache) Route

                                                                                                                              func (rc *RouteCache) Route(_ context.Context, item string) pb.Route

                                                                                                                                Route queries for a cached Route of the item.

                                                                                                                                func (*RouteCache) UpdateRoute

                                                                                                                                func (rc *RouteCache) UpdateRoute(item string, route *pb.Route)

                                                                                                                                  UpdateRoute caches the provided Route for the item, or invalidates it if the route is nil or empty.