v0.89.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2021 License: MIT Imports: 20 Imported by: 14



Package client implements a Go client for interacting with the gRPC Journal service of Gazette brokers. It concerns itself with operations over journal specifications, fragments and byte-streams. See package message for an implementation of messages layered atop journal streams.

The package provides Reader and RetryReader, which adapt the broker Read RPC to the io.Reader interface. Reader will utilize just one Read RPC, while RetryReader will silently restart Read RPCs as needed:

// Copy a journal byte range to os.Stdout.
io.Copy(os.Stdout, NewReader(ctx, client, pb.ReadRequest{
    Journal: "a/journal/name",
    Offset:  1234,
    EndOffset: 5678,

It provides Appender, which adapts the Append RPC to a io.WriteCloser:

// Copy os.Stdin to the journal.
var a = NewAppender(ctx, client, pb.AppendRequest{
    Journal: "a/journal/name",
if err = io.Copy(a, os.Stdin); err == nil {
    err = a.Close() // Commit the append.

Gazette appends are linearizable (atomic) per journal. Appender streams content to brokers as its written, but no content of an Appender will be visible to any reader until Close is called and succeeds. An implication of this is that once brokers have begun to sequence an append into a journal, they expect the remaining content and Close of that Appender to be forthcoming, and will quickly time it out if it stalls. Uses of Appender should thus be limited to cases where its full content is readily available.

Most clients should instead use an AppendService. It offers automatic retries, an asynchronous API, and supports constraints on the ordering of appends with respect to other ongoing operations (ie, "append to journal Foo, but not before this append to journal Bar completes"). It also dynamically batches many co-occurring small writes into larger ones for efficiency.

var as = NewAppendService(ctx, client)
var op = as.StartAppend(pb.AppendRequest{
    Journal: "a/journal/name",
}, myOtherOpsWhichMustCompleteFirst)

// Produce content to append into the AsyncAppend's Writer.
// We hold an exclusive lock over it until Release.
op.Writer().Write("hello, ")
op.Writer().Write("gazette: ")
op.Require(os.Copy(op.Writer(), os.Stdin))

// If os.Copy error'd, it aborts the append and is returned by Release.
if err = op.Release(); err == nil {
    err = op.Err() // Blocks until operation completes.

The package offers functions for listing Fragments & JournalSpecs and applying JournalSpecs, while accounting for pagination details. Also notable is PolledList, which is an important building-block for applications scaling to multiple journals.



This section is empty.


View Source
var (
	// Map common broker error statuses into named errors.
	ErrNotJournalBroker           = errors.New(pb.Status_NOT_JOURNAL_BROKER.String())
	ErrNotJournalPrimaryBroker    = errors.New(pb.Status_NOT_JOURNAL_PRIMARY_BROKER.String())
	ErrOffsetNotYetAvailable      = errors.New(pb.Status_OFFSET_NOT_YET_AVAILABLE.String())
	ErrRegisterMismatch           = errors.New(pb.Status_REGISTER_MISMATCH.String())
	ErrWrongAppendOffset          = errors.New(pb.Status_WRONG_APPEND_OFFSET.String())
	ErrInsufficientJournalBrokers = errors.New(pb.Status_INSUFFICIENT_JOURNAL_BROKERS.String())

	// ErrOffsetJump is returned by Reader.Read to indicate that the next byte
	// available to be read is at a larger offset than that requested (eg,
	// because a span of the Journal has been deleted). The Reader's ReadResponse
	// should be inspected by the caller, and Read may be invoked again to continue.
	ErrOffsetJump = errors.New("offset jump")
	// ErrSeekRequiresNewReader is returned by Reader.Seek if it is unable to
	// satisfy the requested Seek. A new Reader should be started instead.
	ErrSeekRequiresNewReader = errors.New("seek offset requires new Reader")
	// ErrDidNotReadExpectedEOF is returned by FragmentReader.Read if the
	// underlying file did not return EOF at the expected Fragment End offset.
	ErrDidNotReadExpectedEOF = errors.New("did not read EOF at expected Fragment.End")


func Append

func Append(ctx context.Context, rjc pb.RoutedJournalClient, req pb.AppendRequest,
	content (pb.AppendResponse, error)

Append zero or more ReaderAts to a journal as a single Append transaction. Append retries on transport or routing errors, but fails on all other errors. Each ReaderAt is read from byte zero until EOF, and may be read multiple times. If no ReaderAts are provided, an Append RPC with no content is issued.

func ApplyJournals

func ApplyJournals(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest) (*pb.ApplyResponse, error)

ApplyJournals applies journal changes detailed in the ApplyRequest via the broker Apply RPC. Changes are applied as a single Etcd transaction. If the change list is larger than an Etcd transaction can accommodate, ApplyJournalsInBatches should be used instead. ApplyResponse statuses other than OK are mapped to an error.

func ApplyJournalsInBatches

func ApplyJournalsInBatches(ctx context.Context, jc pb.JournalClient, req *pb.ApplyRequest, size int) (*pb.ApplyResponse, error)

ApplyJournalsInBatches is like ApplyJournals, but chunks the ApplyRequest into batches of the given size, which should be less than Etcd's maximum configured transaction size (usually 128). If size is 0 all changes will be attempted in a single transaction. Be aware that ApplyJournalsInBatches may only partially succeed, with some batches having applied and others not. The final ApplyResponse is returned, unless an error occurs. ApplyResponse statuses other than OK are mapped to an error.

func GetJournal added in v0.83.1

func GetJournal(ctx context.Context, jc pb.JournalClient, journal pb.Journal) (*pb.JournalSpec, error)

GetJournal retrieves the JournalSpec of the named Journal, or returns an error.

func InstallFileTransport

func InstallFileTransport(root string) (remove func())

InstallFileTransport registers a file:// protocol handler at the given root with the http.Client used by OpenFragmentURL. It's used in testing contexts, and is also appropriate when brokers share a common NAS file store to which fragments are persisted, and to which this client also has access. The returned cleanup function removes the handler and restores the prior http.Client.

const root = "/mnt/shared-nas-array/path/to/fragment-root"
defer client.InstallFileTransport(root)()

var rr = NewRetryReader(ctx, client, protocol.ReadRequest{
    Journal: "a/journal/with/nas/fragment/store",
    DoNotProxy: true,
// rr.Read will read Fragments directly from NAS.

func ListAllFragments

func ListAllFragments(ctx context.Context, client pb.RoutedJournalClient, req pb.FragmentsRequest) (*pb.FragmentsResponse, error)

ListAllFragments performs multiple Fragments RPCs, as required to join across multiple FragmentsResponse pages, and returns the completed FragmentResponse. Any encountered error is returned.

func ListAllJournals

func ListAllJournals(ctx context.Context, client pb.JournalClient, req pb.ListRequest) (*pb.ListResponse, error)

ListAllJournals performs a broker journal listing. Any encountered error is returned.


type AppendService

type AppendService struct {
	// contains filtered or unexported fields

AppendService batches, dispatches, and (if needed) retries asynchronous Append RPCs. Use of an AppendService is appropriate for clients who make large numbers of small writes to a Journal, and where those writes may be pipelined and batched to amortize the cost of broker Append RPCs. It may also simplify implementations for clients who would prefer to simply have writes block until successfully committed, as opposed to handling errors and retries themselves.

For each journal, AppendService manages an ordered list of AsyncAppends, each having buffered content to be appended. The list is dispatched in FIFO order by a journal-specific goroutine.

AsyncAppends are backed by temporary files on the local disk rather than memory buffers. This minimizes the impact of buffering on the heap and garbage collector, and also makes AppendService tolerant to sustained service disruptions (up to the capacity of the disk).

AppendService implements the AsyncJournalClient interface.

func NewAppendService

func NewAppendService(ctx context.Context, client pb.RoutedJournalClient) *AppendService

NewAppendService returns an AppendService with the provided Context and BrokerClient.

func (*AppendService) PendingExcept

func (s *AppendService) PendingExcept(except pb.Journal) OpFutures

PendingExcept implements the AsyncJournalClient interface.

func (*AppendService) StartAppend

func (s *AppendService) StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend

StartAppend implements the AsyncJournalClient interface.

type Appender

type Appender struct {
	Request  pb.AppendRequest  // AppendRequest of the Append.
	Response pb.AppendResponse // AppendResponse sent by broker.
	// contains filtered or unexported fields

Appender adapts an Append RPC to the io.WriteCloser interface. The first byte written to the Appender initiates the RPC. Subsequent bytes are streamed to brokers as they are written. Writes to the Appender may stall as the RPC window fills, while waiting for brokers to sequence this Append into the journal. Once they do, brokers will expect remaining content to append is quickly written to this Appender (and may time-out the RPC if it's not).

Content written to this Appender does not commit until Close is called, including cases where the application dies without calling Close. If a call to Close is started and the application dies before Close returns, the append may or may commit.

The application can cleanly roll-back a started Appender by Aborting it.

func NewAppender

func NewAppender(ctx context.Context, client pb.RoutedJournalClient, req pb.AppendRequest) *Appender

NewAppender returns an initialized Appender of the given AppendRequest.

func (*Appender) Abort

func (a *Appender) Abort()

Abort the append, causing the broker to discard previously written content.

func (*Appender) Close

func (a *Appender) Close() (err error)

Close the Append to complete the transaction, committing previously written content. If Close returns without an error, Append.Response will hold the broker response.

func (*Appender) Reset

func (a *Appender) Reset()

Reset the Appender to its post-construction state, allowing it to be re-used or re-tried. Reset without a prior Close or Abort will leak resources.

func (*Appender) Write

func (a *Appender) Write(p []byte) (n int, err error)

Write to the Appender, starting an Append RPC if this is the first Write.

type AsyncAppend

type AsyncAppend struct {
	// contains filtered or unexported fields

AsyncAppend represents an asynchronous Append RPC started and managed by an AppendService.

func (*AsyncAppend) Done

func (p *AsyncAppend) Done() <-chan struct{}

Done returns a channel which selects when the AsyncAppend has committed or has been aborted along with the AppendService's Context.

func (*AsyncAppend) Err

func (p *AsyncAppend) Err() error

Err blocks until Done, and returns the final operation error.

func (*AsyncAppend) Release

func (p *AsyncAppend) Release() error

Release the AsyncAppend, allowing further writes to queue or for it to be dispatched to the brokers. Release first determines whether a previous Require failed, or if a Writer error occurred, in which case it will roll back all writes queued by the caller, aborting the append transaction, and return the non-nil error. A non-nil error is returned if and only if the Append was rolled back. Otherwise, the caller may then select on Done to determine when the AsyncAppend has committed and its Response may be examined.

func (*AsyncAppend) Request

func (p *AsyncAppend) Request() pb.AppendRequest

Request returns the AppendRequest that was or will be made by this AsyncAppend. Request is safe to call at all times.

func (*AsyncAppend) Require

func (p *AsyncAppend) Require(err error) *AsyncAppend

Require the error to be nil. If Require is called with a non-nil error, the error is retained and later returned by Release, in which case it will also roll back any writes queued by the caller, aborting the append transaction. Require is valid for use only until Release is called. Require returns itself, allowing uses like:


func (*AsyncAppend) Response

func (p *AsyncAppend) Response() pb.AppendResponse

Response returns the AppendResponse from the broker, and may be called only after Done selects.

func (*AsyncAppend) Writer

func (p *AsyncAppend) Writer() *bufio.Writer

Writer returns a bufio.Writer to which content may be appended. Writer is valid for use only until Release is called. Clients may ignore write errors of the Writer, preferring to "fire and forget" a sequence of writes which could fail: Release will itself Require that no error is set on the Writer.

type AsyncJournalClient

type AsyncJournalClient interface {

	// StartAppend begins a new asynchronous Append RPC. The caller holds exclusive access
	// to the returned AsyncAppend, and must then:
	//  * Write content to its Writer.
	//  * Optionally Require that one or more errors are nil.
	//  * Release the AsyncAppend, allowing queued writes to commit or,
	//    if an error occurred, to roll back.
	// For performance reasons, an Append will often be batched with other Appends
	// having identical AppendRequests which are dispatched to this AppendService,
	// and note the Response.Fragment will reflect the entire batch written to the
	// broker. In all cases, relative order of Appends is preserved. One or more
	// dependencies may optionally be supplied. The Append RPC will not begin
	// until all dependencies have completed without error. A failure of a
	// dependency will also permanently fail the returned AsyncAppend and prevent
	// any further appends to this journal from starting. For this reason, an
	// OpFuture should only fail if it also invalidates the AsyncJournalClient
	// (eg, because the client is scoped to a context which is invalidated by the
	// OpFuture failure).
	StartAppend(req pb.AppendRequest, dependencies OpFutures) *AsyncAppend

	// PendingExcept returns an OpFutures set of *AsyncAppend instances being
	// evaluated for all Journals other than |except|. It can be used to build
	// "barriers" which ensure that all pending appends have committed prior to the
	// commencement of an append which is about to be issued. Eg, given:
	//   var op = as.StartAppend(pb.AppendRequest{Journal: "target"}, as.PendingExcept("target"))
	//   op.Writer().WriteString("checkpoint")
	//   op.Release()
	// All ongoing appends to journals other than "target" are guaranteed to commit
	// before an Append RPC is begun which writes "checkpoint" to journal "target".
	// PendingExcept("") returns all pending AsyncAppends.
	// If a prior journal append failed (eg, because its dependency failed) a
	// resolved OpFuture with that error will be included in returned OpFutures.
	// This ensures the error will properly cascade to an operation which may
	// depend on these OpFutures.
	PendingExcept(except pb.Journal) OpFutures

AsyncJournalClient composes a RoutedJournalClient with an API for performing asynchronous Append operations.

type AsyncOperation added in v0.83.1

type AsyncOperation struct {
	// contains filtered or unexported fields

AsyncOperation is a simple, minimal implementation of the OpFuture interface.

func NewAsyncOperation added in v0.83.1

func NewAsyncOperation() *AsyncOperation

NewAsyncOperation returns a new AsyncOperation.

func (*AsyncOperation) Done added in v0.83.1

func (o *AsyncOperation) Done() <-chan struct{}

Done selects when Resolve is called.

func (*AsyncOperation) Err added in v0.83.1

func (o *AsyncOperation) Err() error

Err blocks until Resolve is called, then returns its error.

func (*AsyncOperation) Resolve added in v0.83.1

func (o *AsyncOperation) Resolve(err error)

Resolve marks the AsyncOperation as completed with the given error.

type FragmentReader

type FragmentReader struct {
	pb.Fragment       // Fragment being read.
	Offset      int64 // Next journal offset to be read, in range [Begin, End).
	// contains filtered or unexported fields

FragmentReader directly reads from an opened Fragment file.

func NewFragmentReader

func NewFragmentReader(rc io.ReadCloser, fragment pb.Fragment, offset int64) (*FragmentReader, error)

NewFragmentReader wraps a io.ReadCloser of raw Fragment bytes with a returned *FragmentReader which has been pre-seeked to the given offset.

func OpenFragmentURL

func OpenFragmentURL(ctx context.Context, fragment pb.Fragment, offset int64, url string) (*FragmentReader, error)

OpenFragmentURL directly opens the Fragment, which must be available at the given URL, and returns a *FragmentReader which has been pre-seeked to the given offset.

func (*FragmentReader) Close

func (fr *FragmentReader) Close() error

Close closes the underlying ReadCloser and associated decompressor (if any).

func (*FragmentReader) Read

func (fr *FragmentReader) Read(p []byte) (n int, err error)

Read returns the next bytes of decompressed Fragment content. When Read returns, Offset has been updated to reflect the next byte to be read. Read returns EOF only if the underlying Reader returns EOF at precisely Offset == Fragment.End. If the underlying Reader is too short, io.ErrUnexpectedEOF is returned. If it's too long, ErrDidNotReadExpectedEOF is returned.

type OpFuture added in v0.83.1

type OpFuture interface {
	// Done selects when operation background execution has finished.
	Done() <-chan struct{}
	// Err blocks until Done() and returns the final error of the OpFuture.
	Err() error

OpFuture represents an operation which is executing in the background. The operation has completed when Done selects. Err may be invoked to determine whether the operation succeeded or failed.

func FinishedOperation added in v0.83.1

func FinishedOperation(err error) OpFuture

FinishedOperation is a convenience that returns an already-resolved AsyncOperation.

type OpFutures added in v0.83.1

type OpFutures map[OpFuture]struct{}

OpFutures is a set of OpFuture instances.

func (OpFutures) IsSubsetOf added in v0.83.1

func (s OpFutures) IsSubsetOf(other OpFutures) bool

IsSubsetOf is true of this OpFutures is a subset of the other.

type PolledList

type PolledList struct {
	// contains filtered or unexported fields

PolledList periodically polls the List RPC with a given ListRequest, making its most recent result available via List. It's a building block for applications which interact with dynamic journal sets and wish to react to changes in their set membership over time.

var partitions, _ = protocol.ParseLabelSelector("logs=clicks, source=mobile")
var pl, err = NewPolledList(ctx, client, time.Minute, protocol.ListRequest{
    Selector: partitions,

func NewPolledList

func NewPolledList(ctx context.Context, client pb.JournalClient, dur time.Duration, req pb.ListRequest) (*PolledList, error)

NewPolledList returns a PolledList of the ListRequest which is initialized and ready for immediate use, and which will regularly refresh with the given Duration. An error encountered in the first List RPC is returned. Subsequent RPC errors will be logged as warnings and retried as part of regular refreshes.

func (*PolledList) List

func (pl *PolledList) List() *pb.ListResponse

List returns the most recent polled & merged ListResponse (see ListAllJournals).

func (*PolledList) UpdateCh

func (pl *PolledList) UpdateCh() <-chan struct{}

UpdateCh returns a channel which is signaled with each update of the PolledList. Only one channel is allocated and one signal sent per-update, so if multiple goroutines select from UpdateCh only one will wake.

type Reader

type Reader struct {
	Request  pb.ReadRequest  // ReadRequest of the Reader.
	Response pb.ReadResponse // Most recent ReadResponse from broker.
	// contains filtered or unexported fields

Reader adapts a Read RPC to the io.Reader interface. The first byte read from the Reader initiates the RPC, and subsequent reads stream from it.

If DoNotProxy is true then the broker may close the RPC after sending a signed Fragment URL, and Reader will directly open the Fragment (decompressing if needed), seek to the requested offset, and read its content.

Reader returns EOF if:

  • The broker closes the RPC, eg because its assignment has change or it's shutting down.
  • The requested EndOffset has been read through.
  • A Fragment being read by the Reader reaches EOF.

If Block is true, Read may block indefinitely. Otherwise, ErrOffsetNotYetAvailable is returned upon reaching the journal write head.

A Reader is invalidated by its first returned error, with the exception of ErrOffsetJump: this error is returned to notify the client that the next Journal offset to be Read is not the offset that was requested (eg, because a portion of the Journal was deleted), but the Reader is prepared to continue at the updated, strictly larger offset.

func NewReader

func NewReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *Reader

NewReader returns an initialized Reader of the given ReadRequest.

func (*Reader) AdjustedOffset

func (r *Reader) AdjustedOffset(br *bufio.Reader) int64

AdjustedOffset returns the current journal offset adjusted for content read by the bufio.Reader (which must wrap this Reader), which has not yet been consumed from the bufio.Reader's buffer.

func (*Reader) Read

func (r *Reader) Read(p []byte) (n int, err error)

Read from the journal. If this is the first Read of the Reader, a Read RPC is started.

func (*Reader) Seek

func (r *Reader) Seek(offset int64, whence int) (int64, error)

Seek provides a limited form of seeking support. Specifically, if:

  • A Fragment URL is being directly read, and
  • The Seek offset is ahead of the current Reader offset, and
  • The Fragment also covers the desired Seek offset

Then a seek is performed by reading and discarding to the seeked offset. Seek will otherwise return ErrSeekRequiresNewReader.

type RetryReader

type RetryReader struct {
	// Context of the RetryReader, which parents the Context provided to
	// underlying *Reader instances.
	Context context.Context
	// Client of the RetryReader.
	Client pb.RoutedJournalClient
	// Reader is the current underlying Reader of the RetryReader. This instance
	// may change many times over the lifetime of a RetryReader, as Read RPCs
	// finish or are cancelled and then restarted.
	Reader *Reader
	// Cancel Read operations of the current *Reader. Notably this will cause an
	// ongoing blocked Read (as well as any future Reads) to return a "Cancelled"
	// error. Restart may be called to re-initialize the RetryReader.
	Cancel context.CancelFunc

RetryReader wraps Reader with error handling and retry behavior, as well as support for cancellation of an ongoing Read or Seek operation. RetryReader is not thread-safe, with one exception: Cancel may be called from one goroutine to abort an ongoing Read or Seek call in another.

func NewRetryReader

func NewRetryReader(ctx context.Context, client pb.RoutedJournalClient, req pb.ReadRequest) *RetryReader

NewRetryReader returns a RetryReader initialized with the BrokerClient and ReadRequest.

func (*RetryReader) AdjustedOffset

func (rr *RetryReader) AdjustedOffset(br *bufio.Reader) int64

AdjustedOffset delegates to the current Reader's AdjustedOffset.

func (*RetryReader) AdjustedSeek

func (rr *RetryReader) AdjustedSeek(offset int64, whence int, br *bufio.Reader) (int64, error)

AdjustedSeek sets the offset for the next Read, accounting for buffered data and, where possible, accomplishing the AdjustedSeek by discarding from the bufio.Reader.

func (*RetryReader) Journal

func (rr *RetryReader) Journal() pb.Journal

Journal being read by this RetryReader.

func (*RetryReader) Offset

func (rr *RetryReader) Offset() int64

Offset of the next Journal byte to be returned by Read.

func (*RetryReader) Read

func (rr *RetryReader) Read(p []byte) (n int, err error)

Read returns the next bytes of journal content. It will return a non-nil error in the following cases:

  • Cancel is called, or the RetryReader context is cancelled.
  • The broker returns OFFSET_NOT_YET_AVAILABLE (ErrOffsetNotYetAvailable) for a non-blocking ReadRequest.
  • An offset jump occurred (ErrOffsetJump), in which case the client should inspect the new Offset and may continue reading if desired.
  • The broker returns io.EOF upon reaching the requested EndOffset.

All other errors are retried.

func (*RetryReader) Restart

func (rr *RetryReader) Restart(req pb.ReadRequest)

Restart the RetryReader with a new ReadRequest. Restart without a prior Cancel will leak resources.

func (*RetryReader) Seek

func (rr *RetryReader) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next Read. It returns an error if (and only if) whence is io.SeekEnd, which is not supported. Where possible Seek will delegate to the current Reader, but in most cases a new Read RPC must be started.

type RouteCache

type RouteCache struct {
	// contains filtered or unexported fields

RouteCache caches observed Routes for items (eg, Journals, or Shards). It implements the protocol.DispatchRouter interface, and where a cached Route of an item is available, it enables applications to dispatch RPCs directly to the most appropriate broker or consumer process. This reduces the overall number of network hops, and especially the number of hops crossing availability zones (which often cost more).

For example, RouteCache can direct an application to a broker in its same availability zone which is replicating a desired journal, and to which a long-lived Read RPC can be dispatched.

// Adapt a JournalClient to a RoutedJournalClient by using a RouteCache.
var jc protocol.JournalClient
var rjc = protocol.NewRoutedJournalClient(jc, NewRouteCache(256, time.Hour))

func NewRouteCache

func NewRouteCache(size int, ttl time.Duration) *RouteCache

NewRouteCache returns a RouteCache of the given size (which must be > 0) and caching Duration.

func (*RouteCache) IsNoopRouter

func (rc *RouteCache) IsNoopRouter() bool

IsNoopRouter returns false.

func (*RouteCache) Route

func (rc *RouteCache) Route(_ context.Context, item string) pb.Route

Route queries for a cached Route of the item.

func (*RouteCache) UpdateRoute

func (rc *RouteCache) UpdateRoute(item string, route *pb.Route)

UpdateRoute caches the provided Route for the item, or invalidates it if the route is nil or empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL