v0.99.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2024 License: MIT Imports: 38 Imported by: 8



Package consumer is a framework for distributed, stateful consumption of Gazette journals.

Most users will want to use package to build complete, configured framework applications. This package focuses on interface definitions and implementation of the consumer runtime.

The primary entry point to this package is the Application interface, which users implement to power the event-driven callback logic of the consumer. Service is then the top-level runtime of a consumer process, cooperatively executing an Application across a number of distributed peer processes and servicing the gRPC shard API.

A number of convenience functions are also supplied for interfacing with a remote Shard server endpoint.



View Source
const MAX_ETCD_ERR_LEN = 2048

Etcd values should generally be small. We don't want to push arbitrary length error strings.


View Source
var ErrDeferToNextTransaction = fmt.Errorf("consumer application deferred message")

ErrDeferToNextTransaction may be returned by Application.ConsumeMessage to indicate that processing of the Envelope should be deferred to a following transaction. It's an error to return it on the very first message of the transaction (it cannot be empty).

View Source
var ErrResolverStopped = errors.New("resolver has stopped serving local shards")

ErrResolverStopped is returned by Resolver if a ShardID resolves to a local shard, but the Resolver has already been asked to stop serving local shards. This happens when the consumer is shutting down abnormally (eg, due to Etcd lease keep-alive failure) but its local KeySpace doesn't reflect a re-assignment of the Shard, and we therefore cannot hope to proxy the request to another server. Resolve fails in this case to encourage the immediate completion of related RPCs, as we're probably also trying to drain the gRPC server.


func ApplyShards

func ApplyShards(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

ApplyShards applies shard changes detailed in the ApplyRequest via the consumer Apply RPC. Changes are applied as a single Etcd transaction. If the change list is larger than an Etcd transaction can accommodate, ApplyShardsInBatches should be used instead. ApplyResponse statuses other than OK are mapped to an error.

func ApplyShardsInBatches

func ApplyShardsInBatches(ctx context.Context, sc pc.ShardClient, req *pc.ApplyRequest, size int) (*pc.ApplyResponse, error)

ApplyShardsInBatches is like ApplyShards, but chunks the ApplyRequest into batches of the given size, which should be less than Etcd's maximum configured transaction size (usually 128). If size is 0 all changes will be attempted in a single transaction. Be aware that ApplyShardsInBatches may only partially succeed, with some batches having applied and others not. The final ApplyResponse is returned, unless an error occurs. ApplyResponse statuses other than OK are mapped to an error.

func FetchHints

func FetchHints(ctx context.Context, sc pc.ShardClient, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

FetchHints is a convenience for invoking the GetHints RPC, which maps a response validation or !OK status to an error.

func ListShards

func ListShards(ctx context.Context, sc pc.ShardClient, req *pc.ListRequest) (*pc.ListResponse, error)

ListShards is a convenience for invoking the List RPC, which maps a validation or !OK status to an error.

func NewKeySpace

func NewKeySpace(prefix string) *keyspace.KeySpace

NewKeySpace returns a consumer KeySpace suitable for use with an Allocator, which groups member processes over the given prefix. It decodes allocator Items as ShardSpec messages, Members as ConsumerSpecs, and Assignments as RecoveryStatus enums.

func PickFirstHints added in v0.99.0

func PickFirstHints(hints *pc.GetHintsResponse, log pb.Journal) recoverylog.FSMHints

PickFirstHints retrieves the first hints from |hints|. If there are no primary hints available the most recent backup hints will be returned. If there are no hints available an empty set of hints is returned.

func ShardApply added in v0.89.0

func ShardApply(ctx context.Context, srv *Service, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

ShardApply is the default implementation of the ShardServer.Apply API.

func ShardGetHints added in v0.89.0

func ShardGetHints(ctx context.Context, srv *Service, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

ShardGetHints is the default implementation of the ShardServer.Hints API.

func ShardIsConsistent added in v0.86.1

func ShardIsConsistent(shard allocator.Item, assignment keyspace.KeyValue, all keyspace.KeyValues) bool

ShardIsConsistent returns true if no replicas of the shard are currently back-filling.

func ShardList added in v0.89.0

func ShardList(ctx context.Context, srv *Service, req *pc.ListRequest) (*pc.ListResponse, error)

ShardList is the default implementation of the ShardServer.List API.

func ShardStat added in v0.89.0

func ShardStat(ctx context.Context, srv *Service, req *pc.StatRequest) (*pc.StatResponse, error)

ShardStat is the default implementation of the ShardServer.Stat API.

func ShardUnassign added in v0.99.0

func ShardUnassign(ctx context.Context, srv *Service, req *pc.UnassignRequest) (*pc.UnassignResponse, error)

func StatShard

func StatShard(ctx context.Context, rc pc.RoutedShardClient, req *pc.StatRequest) (*pc.StatResponse, error)

StatShard is a convenience for invoking the Stat RPC, which maps a validation or !OK status to an error.

func VerifyReferencedJournals added in v0.84.1

func VerifyReferencedJournals(ctx context.Context, jc pb.JournalClient, req *pc.ApplyRequest) error

VerifyReferencedJournals ensures the referential integrity of journals (sources and recovery logs, and their content types) referenced by Shards of the ApplyRequest. It returns a descriptive error if any invalid references are found.


type Application

type Application interface {
	// NewStore constructs a Store for the Shard around the initialize file-
	// system *Recorder. If the ShardSpec does not have a configured recovery
	// log, then *Recorder will be nil.
	NewStore(Shard, *recoverylog.Recorder) (Store, error)
	// NewMessage returns a zero-valued Message of an appropriate representation
	// for the JournalSpec.
	NewMessage(*pb.JournalSpec) (message.Message, error)
	// ConsumeMessage consumes a read-committed message within the scope of a
	// transaction. It should use the provided Publisher to PublishUncommitted
	// messages to other journals. Doing so enables Gazette to properly sequence
	// messages and ensure they are either acknowledged or rolled-back atomically
	// with this consumer transaction.
	ConsumeMessage(Shard, Store, message.Envelope, *message.Publisher) error
	// FinalizeTxn indicates a consumer transaction is ending, and that the
	// Application must flush any in-memory transaction state or caches, and
	// begin any deferred journal appends. At completion:
	//  * All transaction messages must have been published to the provided Publisher,
	//  * All raw transaction []byte content must have been appended via the shard
	//    JournalClient, and
	//  * All in-memory state must be marshalled to the active Store transaction.
	FinalizeTxn(Shard, Store, *message.Publisher) error

Application is the interface provided by user applications running as Gazette consumers. Only unrecoverable errors should be returned by Application. A returned error will abort processing of an assigned Shard, and will update the assignment's ReplicaStatus to FAILED.

Gazette consumers process messages within pipelined transactions. A transaction begins upon the first call to ConsumeMessage, which is invoked for each read-committed message of a source journal. In the course of the transaction many more messages may be passed to ConsumeMessage. When consuming a message the Application is free to:

  1. Begin or continue a transaction with its Store.
  2. Publish exactly-once Messages to other journals via the provided Publisher.
  3. Append raw at-least-once []bytes via the Shard's JournalClient.
  4. Keep in-memory-only aggregates such as counts.

Messages published via PublishUncommitted will be visible to read-committed readers once the consumer transaction completes. Read-uncommitted readers will see them while the transaction continues to run. Similarly writes issued directly through the shard JournalClient are also readable while the transaction runs.

A transaction *must* continue to run while asynchronous appends of the _prior_ transaction are ongoing (including appends to the shard recovery log and appends of post-commit acknowledgements). The transaction will continue to process messages during this time until the ShardSpec's MaxTxnDuration is reached, at which point the transaction will stop reading messages but continue to wait for prior appends to finish.

It must also continue to run until the ShardSpec's MinTxnDuration is reached.

Assuming both of those conditions are satisfied, the transaction will close upon encountering a stall in a buffered channel of decoded input messages. If a stall isn't forthcoming (as is frequent at high write rates), it will close upon reaching the ShardSpec's MaxTxnDuration.

Upon transaction close, FinalizeTxn is called. At this point the Application must publish any pending messages and/or begin related journal appends, and must flush any in-memory caches or aggregates into its Store transaction (simply starting appends is sufficient: the Application does _not_ need to wait for journal appends to complete).

StartCommit of the Store is then called with a Checkpoint. For correct exactly-once processing semantics, the Checkpoint must be written in a single transaction alongside all other Store mutations made within the scope of this consumer transaction:

  • Eg for `store-rocksdb`, all store mutations and the Checkpoint are written together within a single RocksDB WriteBatch.
  • For `SQLStore`, verification of the write fence, INSERTS, UPDATES, and the Checkpoint itself are written within a single BEGIN/COMMIT transaction.
  • Similarly, `store-sqlite` persists Checkpoints within the scope of the current SQL transaction.

Note that other, non-transactional Store mutations are permitted, but will have a weaker at-least-once processing guarantee with respect to Store state. This can make sense for applications filling caches in BigTable, Memcache, or other systems which support transactions only over a single key (ie, check-and-set). In this case the Store should apply all mutations, followed by a fenced CAS Checkpoint update. So long as the Checkpoint itself is fenced properly, messages will continue to have exactly-once semantics (though be cautious of publishing messages which are derived from other Store keys that may have been updated more than once).

Once the commit completes, acknowledgements of messages published during the transaction are written to applicable journals, which informs downstream readers that those messages have committed and may now be consumed.

Transactions are fully pipelined: once StartCommit has returned, the next consumer transaction immediately begins processing even though the prior transaction continues to commit in the background (and could even fail). In fact, at this point there's no guarantee that any journal writes of the previous transaction have even _started_, including those to the recovery log.

To make this work safely and correctly, transactions use barriers to ensure that background operations are started and complete in the correct order: for example, that the prior transaction doesn't commit until all its writes to other journals have also completed, and that writes of message acknowledgements don't start until mutations & the checkpoint have committed to the Store.

While those operations complete in the background, the next transaction will consume new messages concurrently. Its one constraint is that it may not itself start to commit until its predecessor transaction has fully completed. This means that transactions will almost always exhibit some degree of batching, which will depend on the rate of incoming messages, the latency to Gazette, and the latency and speed of a utilized external store. If the prior commit takes so long that the successor transaction reaches its maximum duration, then that successor will stall without processing further messages until its predecessor commits. This is the _only_ case under which a consumer can be blocked from processing ready messages.

type BeginFinisher

type BeginFinisher interface {
	// BeginTxn is called to notify the Application that a transaction is beginning
	// (and a call to ConsumeMessage will be immediately forthcoming), allowing
	// the Application to perform any preparatory work. For consumers doing
	// extensive aggregation, it may be beneficial to focus available compute
	// resource on a small number of transactions while completely stalling
	// others: this can be accomplished by blocking in BeginTxn until a semaphore
	// is acquired. A call to BeginTx is always paired with a call to FinishTxn.
	BeginTxn(Shard, Store) error
	// FinishedTxn is notified that a previously begun transaction has started to
	// commit, or has errored. It allows the Application to perform related cleanup
	// (eg, releasing a previously acquired semaphore). Note transactions are
	// pipelined, and commit operations of this transaction may still be ongoing.
	// FinishedTxn can await the provided OpFuture for its final status.
	FinishedTxn(Shard, Store, OpFuture)

BeginFinisher is an optional interface of Application which is informed when consumer transactions begin or finish.

type EnvelopeOrError added in v0.89.0

type EnvelopeOrError struct {
	Error error

EnvelopeOrError composes an Envelope with its read error.

type JSONFileStore

type JSONFileStore struct {
	// State is a user-provided instance which is un/marshal-able to JSON.
	State interface{}
	// contains filtered or unexported fields

JSONFileStore is a simple Store implementation which materializes itself as a JSON-encoded file, which is re-written at the commit of every consumer transaction.

func NewJSONFileStore

func NewJSONFileStore(rec *recoverylog.Recorder, state interface{}) (*JSONFileStore, error)

NewJSONFileStore returns a new JSONFileStore. |state| is the runtime instance of the Store's state, which is decoded into, encoded from, and retained as JSONFileState.State.

func (*JSONFileStore) Destroy

func (s *JSONFileStore) Destroy()

Destroy the JSONFileStore directory and state file.

func (*JSONFileStore) RestoreCheckpoint added in v0.83.1

func (s *JSONFileStore) RestoreCheckpoint(Shard) (pc.Checkpoint, error)

RestoreCheckpoint returns the checkpoint encoded in the recovered JSON state file.

func (*JSONFileStore) StartCommit added in v0.83.1

func (s *JSONFileStore) StartCommit(_ Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

StartCommit marshals the in-memory state and Checkpoint into a recorded JSON state file.

type MessageProducer added in v0.89.0

type MessageProducer interface {
	// StartReadingMessages identifies journals and messages to be processed
	// by this consumer Shard, and dispatches them to the provided channel.
	// Any terminal error encountered during initialization or while reading
	// messages should also be delivered over |intoCh|. Reads of journals
	// included in |from| should begin from the given offset.
	// If |intoCh| closes without having sent an error then a current transaction,
	// if any, is completed and committed. A Store checkpoint is next restored and
	// StartReadingMessages is called again, effectively restarting processing.
	// Implementations can use this behavior to update the joint read and
	// processing context of a shard at a transaction boundary.
	StartReadingMessages(_ Shard, _ Store, _ pc.Checkpoint, intoCh chan<- EnvelopeOrError)
	// ReplayRange builds and returns a read-uncommitted Iterator over the
	// identified byte-range of the journal. The Journal and offset range are
	// guaranteed to be a journal segment which was previously produced via
	// StartReadingMessages. The returned Iterator must re-produce the exact
	// set and ordering of Messages from the identified Journal. If an error
	// occurs while initializing the replay, it should be returned via
	// Next() of the returned message.Iterator.
	ReplayRange(_ Shard, _ Store, journal pb.Journal, begin, end pb.Offset) message.Iterator
	// ReadThrough is used by Resolver to identify a set of Offsets
	// (eg, a sub-set of the Offsets present in ResolveArgs.ReadThrough)
	// which must be read-through before resolution may complete.
	ReadThrough(Shard, Store, ResolveArgs) (pb.Offsets, error)

MessageProducer is an optional interface of Application which controls the means by which messages to process are identified and produced into the provided channel, for processing by consumer transactions.

type OpFuture added in v0.83.1

type OpFuture = client.OpFuture

OpFuture represents an operation which is executing in the background. Aliased for brevity from the "client" package.

type OpFutures added in v0.83.1

type OpFutures = client.OpFutures

type Resolution

type Resolution struct {
	Status pc.Status
	// Header captures the resolved consumer ProcessId, effective Etcd Revision,
	// and Route of the shard resolution.
	Header pb.Header
	// Spec of the Shard at the current Etcd revision.
	Spec *pc.ShardSpec
	// Shard processing context, or nil if this process is not primary for the ShardID.
	Shard Shard
	// Store of the Shard, or nil if this process is not primary for the ShardID.
	Store Store
	// Done releases Shard & Store, and must be called when no longer needed.
	// Iff Shard & Store are nil, so is Done.
	Done func()

Resolution is the result of resolving a ShardID to a responsible consumer process.

type ResolveArgs

type ResolveArgs struct {
	Context context.Context
	// ShardID to be resolved.
	ShardID pc.ShardID
	// Whether we may resolve to another consumer peer. If false and this
	// process is not assigned as shard primary, Resolve returns status
	MayProxy bool
	// If this request is a proxy from a consumer process peer, Header is
	// the forwarded Header of that peer's Resolution. ProxyHeader is used
	// to ensure this resolver first reads through the Etcd revision which
	// was effective at the peer at time of resolution, and to sanity check
	// the consistency of those resolutions.
	ProxyHeader *pb.Header
	// ReadThrough is journals and offsets which must have been read through in a
	// completed consumer transaction before Resolve may return (blocking if
	// required). Offsets of journals not read by this shard are ignored.
	// ReadThrough aides in the implementation of services which "read their
	// own writes", where those writes propagate through one or more consumer
	// applications: callers pass offsets of their appends to Resolve, which
	// will block until those appends have been processed.
	// See also: Shard.Progress.
	ReadThrough pb.Offsets

ResolveArgs parametrize a request to resolve a ShardID to a current, responsible consumer process.

type Resolver

type Resolver struct {
	// contains filtered or unexported fields

Resolver applies the current allocator.State to map journals to shards, and shards to responsible consumer processes. It also monitors the State to manage the lifetime of local shards which are assigned to this process.

func NewResolver

func NewResolver(state *allocator.State, newShard func(keyspace.KeyValue) *shard) *Resolver

NewResolver returns a Resolver derived from the allocator.State, which will use the newShard closure to instantiate a *shard for each local Assignment of the local ConsumerSpec.

func (*Resolver) Collect added in v0.89.0

func (r *Resolver) Collect(ch chan<- prometheus.Metric)

Collect implements prometheus.Collector

func (*Resolver) Describe added in v0.89.0

func (r *Resolver) Describe(ch chan<- *prometheus.Desc)

Describe implements prometheus.Collector

func (*Resolver) Resolve

func (r *Resolver) Resolve(args ResolveArgs) (res Resolution, err error)

Resolve resolves a ShardID to a responsible consumer process. If this process is the assigned primary for a ShardID but its Store is not yet ready (eg, because it's still playing back from its recovery log), then Resolve will block until the Store is ready.

func (*Resolver) ShardsWithSource added in v0.84.1

func (r *Resolver) ShardsWithSource(journal pb.Journal) []*pc.ShardSpec

ShardsWithSource returns specs having the journal as a source, in ShardID order. APIs presented by consumer applications often want to map an app-defined key to an associated journal through a message.MappingFunc, and from there to a responsible shard against which the API request is ultimately dispatched. "Responsible" is up to the application: while it's common to have 1:1 assignment between shards and source journals, other patterns are possible and the application must make an appropriate selection from among the returned ShardSpecs for its use case.

var mapping message.MappingFunc = ...
var mappedID pc.ShardID

if journal, _, err := mapping(key); err != nil {
    // Handle error.
} else if specs := resolver.ShardsWithSource(journal); len(specs) == 0 {
    err = fmt.Errorf("no ShardSpec is consuming mapped journal %s", journal)
    // Handle error.
} else {
    mappedID = specs[0].Id

var resolution, err = svc.Resolver.Resolve(consumer.ResolveArgs{
    ShardID:     specs[0].Id,

type SQLStore added in v0.83.1

type SQLStore struct {
	DB *sql.DB

	// Cache is a provided for application use in the temporary storage of
	// in-memory state associated with a SQLStore. Eg, Cache might store records
	// which have been read this transaction and modified in-memory, and which
	// will be written out during FinalizeTxn.
	// The representation of Cache is up to the application; it is not directly
	// used by SQLStore.
	Cache interface{}
	// contains filtered or unexported fields

SQLStore is a Store implementation which utilizes a remote database having a "database/sql" compatible driver. For each consumer transaction, SQLStore begins a corresponding SQL transaction which may be obtained via Transaction, through which reads and writes to the store should be issued. A table "gazette_checkpoints" is also required, to which consumer checkpoints are persisted, and having a schema like:

CREATE TABLE gazette_checkpoints (
  shard_fqn  TEXT    PRIMARY KEY NOT NULL,
  fence      INTEGER NOT NULL,
  checkpoint BLOB    NOT NULL

The user is responsible for creating this table. Exact data types may vary with the store dialect, but "shard_fqn" must be its PRIMARY KEY. StartCommit writes the checkpoint to this table before committing the transaction.

func NewSQLStore added in v0.83.1

func NewSQLStore(db *sql.DB) *SQLStore

NewSQLStore returns a new SQLStore using the *DB.

func (*SQLStore) Destroy added in v0.83.1

func (s *SQLStore) Destroy()

Destroy rolls back an existing transaction, but does not close the *DB instance previously passed to NewSQLStore.

func (*SQLStore) RestoreCheckpoint added in v0.83.1

func (s *SQLStore) RestoreCheckpoint(shard Shard) (cp pc.Checkpoint, _ error)

RestoreCheckpoint issues a SQL transaction which SELECTS the most recent Checkpoint of this shard FQN and also increments its "fence" column.

func (*SQLStore) StartCommit added in v0.83.1

func (s *SQLStore) StartCommit(shard Shard, cp pc.Checkpoint, waitFor OpFutures) OpFuture

StartCommit starts a background commit of the current transaction. While it runs, a new transaction may be started. The returned OpFuture will fail if the "fence" column has been further modified from the result previously read and updated by RestoreCheckpoint.

func (*SQLStore) Transaction added in v0.83.1

func (s *SQLStore) Transaction(ctx context.Context, txOpts *sql.TxOptions) (_ *sql.Tx, err error)

Transaction returns or (if not already begun) begins a SQL transaction. Optional *TxOptions have an effect only if a transaction has not already been started.

type Service

type Service struct {
	// Application served by the Service.
	App Application
	// Resolver of Service shards.
	Resolver *Resolver
	// Distributed allocator state of the service.
	State *allocator.State
	// Loopback connection which defaults to the local server, but is wired with
	// a pb.DispatchBalancer. Consumer applications may use Loopback to proxy
	// application-specific RPCs to peer consumer instance, after performing
	// shard resolution.
	Loopback *grpc.ClientConn
	// Journal client for use by consumer applications.
	Journals pb.RoutedJournalClient
	// Etcd client for use by consumer applications.
	Etcd *clientv3.Client
	// Delta to apply to message.Clocks used by Shards to sequence published
	// messages, with respect to real time. This should almost always be left
	// as zero, but is helpful for test workflows which require fine-grain
	// control over the write timestamps encoded within message UUIDs.
	// Never decrease this value once the Service is running, only increase it,
	// as a decrement will cause Publisher sequencing invariants to be violated.
	// This is an EXPERIMENTAL API.
	PublishClockDelta time.Duration
	// ShardAPI holds function delegates which power the ShardServer API.
	// They're exposed to allow consumer applications to wrap or alter their behavior.
	ShardAPI struct {
		Stat     func(context.Context, *Service, *pc.StatRequest) (*pc.StatResponse, error)
		List     func(context.Context, *Service, *pc.ListRequest) (*pc.ListResponse, error)
		Apply    func(context.Context, *Service, *pc.ApplyRequest) (*pc.ApplyResponse, error)
		GetHints func(context.Context, *Service, *pc.GetHintsRequest) (*pc.GetHintsResponse, error)
		Unassign func(context.Context, *Service, *pc.UnassignRequest) (*pc.UnassignResponse, error)
	// contains filtered or unexported fields

Service is the top-level runtime entity of a Gazette Consumer process. It drives local shard processing in response to allocator.State, powers shard resolution, and is also an implementation of ShardServer.

func NewService

func NewService(app Application, state *allocator.State, rjc pb.RoutedJournalClient, lo *grpc.ClientConn, etcd *clientv3.Client) *Service

NewService constructs a new Service of the Application, driven by allocator.State.

func (*Service) Apply

func (svc *Service) Apply(ctx context.Context, req *pc.ApplyRequest) (*pc.ApplyResponse, error)

Apply calls its ShardAPI delegate.

func (*Service) GetHints

func (svc *Service) GetHints(ctx context.Context, req *pc.GetHintsRequest) (*pc.GetHintsResponse, error)

GetHints calls its ShardAPI delegate.

func (*Service) List

func (svc *Service) List(ctx context.Context, req *pc.ListRequest) (*pc.ListResponse, error)

List calls its ShardAPI delegate.

func (*Service) QueueTasks

func (svc *Service) QueueTasks(tasks *task.Group, server *server.Server)

QueueTasks to watch the Service KeySpace and serve any local assignments reflected therein, until the Context is cancelled or an error occurs. All local shards are gracefully stopped prior to return, even when exiting due to an error.

func (*Service) Stat

func (svc *Service) Stat(ctx context.Context, req *pc.StatRequest) (*pc.StatResponse, error)

Stat calls its ShardAPI delegate.

func (*Service) Stopping

func (svc *Service) Stopping() <-chan struct{}

Stopping returns a channel which signals when the Service is in the process of shutting down. Consumer applications with long-lived RPCs should use this signal to begin graceful cleanup of outstanding RPCs.

func (*Service) Unassign added in v0.99.0

func (svc *Service) Unassign(ctx context.Context, req *pc.UnassignRequest) (*pc.UnassignResponse, error)

Unassign calls its ShardAPI delegate.

type Shard

type Shard interface {
	// Context of this shard. Notably, the Context will be cancelled when this
	// process is no longer responsible for the shard.
	Context() context.Context
	// Fully-qualified name of this shard, AKA the Etcd key of the ShardSpec.
	// That key is used as the Shard FQN because it composes the consumer
	// application prefix with this ShardID, and is universally unique
	// within the backing Etcd cluster. An FQN can only conflict if another
	// consumer deployment, utilizing another Etcd cluster, also reuses the
	// same application prefix and ShardID.
	FQN() string
	// Current Spec of the shard. Fields of a returned Spec instance will never
	// change, but the instance returned by Spec may change over time to reflect
	// updated Etcd states.
	Spec() *pc.ShardSpec
	// Assignment of the shard to this process. Fields of a returned KeyValue
	// instance will never change, but the instance returned by Assignment may
	// change over time to reflect updated Etcd states.
	Assignment() keyspace.KeyValue
	// JournalClient to be used for raw journal []byte appends made on behalf
	// of this Shard. Consistent use of this client enables Gazette to ensure
	// that all writes issued within a consumer transaction have completed prior
	// to that transaction being committed. Put differently, if a consumed message
	// triggers a raw append to a journal, Gazette can guarantee that append will
	// occur at-least-once no matter how this Shard or brokers may fail.
	JournalClient() client.AsyncJournalClient
	// RecoveredHints returns the GetHintsResponse which was used in the
	// recovery of this Shard from its recovery log.
	// If the Shard doesn't use a recovery log, RecoveredHints is nil.
	RecoveredHints() *pc.GetHintsResponse
	// Progress of the Shard as-of its most recent completed transaction.
	// readThrough is offsets of source journals which have been read
	// through. publishAt is journals and offsets this Shard has published
	// through, including acknowledgements. If a read message A results in this
	// Shard publishing messages B, and A falls within readThrough, then all
	// messages B (& their acknowledgements) fall within publishAt.
	// While readThrough is comprehensive and persists across Shard faults,
	// note that publishAt is *advisory* and not necessarily complete: it
	// includes only journals written to since this Shard was assigned to
	// this process.
	Progress() (readThrough, publishAt pb.Offsets)
	// PrimaryLoop returns an OpFuture corresponding to this Shard
	// assignment's primary processing loop.
	// The returned future will resolve with the primary loop's returned error
	// upon its exit. This will often be context.Cancelled, or a wrapped instance
	// thereof, and the caller is responsible for detecting errors due to cancellation
	// vs other kinds of processing errors.
	PrimaryLoop() client.OpFuture

Shard is the processing context of a ShardSpec which is assigned to the local consumer process.

type Store

type Store interface {
	// StartCommit starts an asynchronous, atomic "commit" to the Store of state
	// updates from this transaction along with the Checkpoint. If Store uses an
	// external transactional system, then StartCommit must fail if another
	// process has invoked RestoreCheckpoint after *this* Store instance did
	// so. Put differently, Store cannot commit a Checkpoint that another Store
	// may never see (because it continues from an earlier Checkpoint that it
	// restored).
	// In general terms, this means StartCommit must verify a "write fence"
	// previously installed by RestoreCheckpoint as part of its transaction.
	// Embedded Stores which use recovery logs can rely on the write fence of
	// the log itself, implemented via journal registers.
	// StartCommit may immediately begin a transaction with the external system
	// (if one hasn't already been started from ConsumeMessage), but cannot
	// allow it to commit until all waitFor OpFutures have completed
	// successfully. A failure of one of these OpFutures must also fail this
	// commit.
	// StartCommit will never be called if the OpFuture returned by a previous
	// StartCommit hasn't yet completed. However, ConsumeMessage *will* be
	// called while a previous StartCommit continues to run. Stores should
	// account for this, typically by starting a new transaction which runs
	// alongside the old.
	StartCommit(_ Shard, _ pc.Checkpoint, waitFor OpFutures) OpFuture
	// RestoreCheckpoint recovers the most recent Checkpoint previously committed
	// to the Store. It is called at Shard start-up, and may be called again
	// if a MessageProducer drains its message channel. If an external system
	// is used, it should install a transactional "write fence" to ensure
	// that an older Store instance of another process cannot successfully
	// StartCommit after this RestoreCheckpoint returns.
	RestoreCheckpoint(Shard) (pc.Checkpoint, error)
	// Destroy releases all resources associated with the Store (eg, local files).
	// It is guaranteed that the Store is no longer being used or referenced at
	// the onset of this call.

Store is a durable and transactional storage backend used to persist arbitrary Application-defined states alongside the consumer Checkpoints which produced those states. The particular means by which the Store represents transactions varies from implementation to implementation, and is not modeled by this interface. However for correct exactly-once processing semantics, it must be the case that Store modifications made by Applications are made in transactions which are committed by StartCommit, and which incorporate the Checkpoint provided to StartCommit.

Often Stores are implemented as embedded databases which record their file operations into a provided `recoverylog.Recorder`. Stores which instead utilize an external transactional system (eg, an RDBMS) are also supported.

Application implementations control the selection, initialization, and usage of an appropriate Store for their use case.


Path Synopsis
Package protocol defines the consumer datamodel, validation behaviors, and gRPC APIs which are shared across clients and consumer application servers.
Package protocol defines the consumer datamodel, validation behaviors, and gRPC APIs which are shared across clients and consumer application servers.
Package recoverylog specifies a finite state machine for recording and replaying observed filesystem operations into a Gazette journal.
Package recoverylog specifies a finite state machine for recording and replaying observed filesystem operations into a Gazette journal.
Package shardspace provides mechanisms for mapping a collection of ShardSpecs into a minimally-described, semi hierarchical structure, and for mapping back again.
Package shardspace provides mechanisms for mapping a collection of ShardSpecs into a minimally-described, semi hierarchical structure, and for mapping back again.
Package store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance.
Package store_rocksdb implements the consumer.Store interface via an embedded RocksDB instance.
Package store_sqlite implements the consumer.Store interface via an embedded SQLite instance.
Package store_sqlite implements the consumer.Store interface via an embedded SQLite instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL