v0.1.20 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2021 License: BSD-3-Clause Imports: 9 Imported by: 40



Package watch defines interfaces for watching a sequence of changes.

API Overview

Watcher service allows a client to watch for updates that match a query. For each watched query, the client will receive a reliable stream of watch events without re-ordering.

The watching is done by starting a streaming RPC. The argument to the RPC contains the query. The result stream consists of a never-ending sequence of Change messages (until the call fails or is cancelled).

Root Entity

The Object name that receives the Watch RPC is called the root entity. The root entity is the parent of all entities that the client cares about. Therefore, the query is confined to children of the root entity, and the names in the Change messages are all relative to the root entity.

Watch Request

When a client makes a watch request, it can indicate whether it wants to receive the initial states of the entities that match the query, just new changes to the entities, or resume watching from a particular point in a previous watch stream. On receiving a watch request, the server sends one or more messages to the client. The first message informs the client that the server has registered the client's request; the instant of time when the client receives the event is referred to as the client's "watch point" for that query.

Atomic Delivery

The response stream consists of a sequence of Change messages. Each Change message contains an optional continued bit (default=false). A sub-sequence of Change messages with continued=true followed by a Change message with continued=false forms an "atomic group". Systems that support multi-entity atomic updates may guarantee that all changes resulting from a single atomic update are delivered in the same "atomic group". It is up to the documentation of a particular system that implements the Watch API to document whether or not it supports such grouping. We expect that most callers will ignore the notion of atomic delivery and the continued bit, i.e., they will just process each Change message as it is received.

Initial State

The first atomic group delivered by a watch call is special. It is delivered as soon as possible and contains the initial state of the entities being watched. The client should consider itself caught up after processing this first atomic group. The messages in this first atomic group depend on the value of ResumeMarker.

(1) ResumeMarker is "" or not specified: For every entity P that
    matches the query and exists, there will be at least one message
    delivered with entity == P and the last such message will contain
    the current state of P.  For every entity Q (including the entity
    itself) that matches the query but does not exist, either no
    message will be delivered, or the last message for Q will have
    state == DOES_NOT_EXIST. At least one message for entity="" will
    be delivered.

(2) ResumeMarker == "now": there will be exactly one message with
    entity = "" and state INITIAL_STATE_SKIPPED.  The client cannot
    assume whether or not the entity exists after receiving this

(3) ResumeMarker has a value R from a preceding watch call on this
    entity: The same messages as described in (1) will be delivered
    to the client except that any information implied by messages
    received on the preceding call up to and including R may not be
    delivered. The expectation is that the client will start with
    state it had built up from the preceding watch call, apply the
    changes received from this call and build an up-to-date view of
    the entities without having to fetch a potentially large amount
    of information that has not changed.  Note that some information
    that had already been delivered by the preceding call might be
    delivered again.

Ordering and Reliability

The Change messages that apply to a particular element of the entity will be delivered eventually in order without loss for the duration of the RPC. Note however that if multiple Changes apply to the same element, the implementation is free to suppress them and deliver just the last one. The underlying system must provide the guarantee that any relevant update received for an entity E after a client's watch point for E MUST be delivered to that client.

These tight guarantees allow for the following simplifications in the client:

(1) The client does not need to have a separate polling loop to
    make up for missed updates.

(2) The client does not need to manage timestamps/versions
    manually; the last update delivered corresponds to the
    eventual state of the entity.



View Source
const DoesNotExist = int32(1)

The entity does not exist.

View Source
const Exists = int32(0)

The entity exists and its full value is included in Value.

View Source
const InitialStateSkipped = int32(2)

The root entity and its children may or may not exist. Used only for initial state delivery when the client is not interested in fetching the initial state. See the "Initial State" section above.


View Source
var (
	ErrUnknownResumeMarker = verror.NewIDAction("", verror.NoRetry)
View Source
var GlobWatcherDesc rpc.InterfaceDesc = descGlobWatcher

GlobWatcherDesc describes the GlobWatcher interface.


func ErrorfUnknownResumeMarker added in v0.1.10

func ErrorfUnknownResumeMarker(ctx *context.T, format string) error

ErrorfUnknownResumeMarker calls ErrUnknownResumeMarker.Errorf with the supplied arguments.

func MessageUnknownResumeMarker added in v0.1.10

func MessageUnknownResumeMarker(ctx *context.T, message string) error

MessageUnknownResumeMarker calls ErrUnknownResumeMarker.Message with the supplied arguments.

func ParamsErrUnknownResumeMarker added in v0.1.10

func ParamsErrUnknownResumeMarker(argumentError error) (verrorComponent string, verrorOperation string, returnErr error)

ParamsErrUnknownResumeMarker extracts the expected parameters from the error's ParameterList.


type Change

type Change struct {
	// Name is the Object name of the entity that changed.  This name is relative
	// to the root entity (i.e. the name of the Watcher service).
	Name string
	// State must be one of Exists, DoesNotExist, or InitialStateSkipped.
	State int32
	// Value contains the service-specific data for the entity.
	Value *vom.RawBytes
	// If present, provides a compact representation of all the messages
	// that have been received by the caller for the given Watch call.
	// For example, it could be a sequence number or a multi-part
	// timestamp/version vector. This marker can be provided in the
	// Request message to allow the caller to resume the stream watching
	// at a specific point without fetching the initial state.
	ResumeMarker ResumeMarker
	// If true, this Change is followed by more Changes that are in the
	// same group as this Change.
	Continued bool

Change is the new value for a watched entity.

func (Change) VDLIsZero

func (x Change) VDLIsZero() bool

func (*Change) VDLRead

func (x *Change) VDLRead(dec vdl.Decoder) error

func (Change) VDLReflect

func (Change) VDLReflect(struct {
	Name string `vdl:""`

func (Change) VDLWrite

func (x Change) VDLWrite(enc vdl.Encoder) error

type GlobRequest

type GlobRequest struct {
	// Pattern specifies the subset of the children of the root entity
	// for which the client wants updates.
	Pattern string
	// ResumeMarker specifies how to resume from a previous Watch call.
	// See the ResumeMarker type for detailed comments.
	ResumeMarker ResumeMarker

GlobRequest specifies which entities should be watched and, optionally, how to resume from a previous Watch call.

func (GlobRequest) VDLIsZero

func (x GlobRequest) VDLIsZero() bool

func (*GlobRequest) VDLRead

func (x *GlobRequest) VDLRead(dec vdl.Decoder) error

func (GlobRequest) VDLReflect

func (GlobRequest) VDLReflect(struct {
	Name string `vdl:""`

func (GlobRequest) VDLWrite

func (x GlobRequest) VDLWrite(enc vdl.Encoder) error

type GlobWatcherClientMethods

type GlobWatcherClientMethods interface {
	// WatchGlob returns a stream of changes that match a pattern.
	WatchGlob(_ *context.T, req GlobRequest, _ ...rpc.CallOpt) (GlobWatcherWatchGlobClientCall, error)

GlobWatcherClientMethods is the client interface containing GlobWatcher methods.

GlobWatcher allows a client to receive updates for changes to objects that match a pattern. See the package comments for details.

type GlobWatcherClientStub

type GlobWatcherClientStub interface {

GlobWatcherClientStub embeds GlobWatcherClientMethods and is a placeholder for additional management operations.

func GlobWatcherClient

func GlobWatcherClient(name string) GlobWatcherClientStub

GlobWatcherClient returns a client stub for GlobWatcher.

type GlobWatcherServerMethods

type GlobWatcherServerMethods interface {
	// WatchGlob returns a stream of changes that match a pattern.
	WatchGlob(_ *context.T, _ GlobWatcherWatchGlobServerCall, req GlobRequest) error

GlobWatcherServerMethods is the interface a server writer implements for GlobWatcher.

GlobWatcher allows a client to receive updates for changes to objects that match a pattern. See the package comments for details.

type GlobWatcherServerStub

type GlobWatcherServerStub interface {
	// DescribeInterfaces the GlobWatcher interfaces.
	Describe__() []rpc.InterfaceDesc

GlobWatcherServerStub adds universal methods to GlobWatcherServerStubMethods.

func GlobWatcherServer

func GlobWatcherServer(impl GlobWatcherServerMethods) GlobWatcherServerStub

GlobWatcherServer returns a server stub for GlobWatcher. It converts an implementation of GlobWatcherServerMethods into an object that may be used by rpc.Server.

type GlobWatcherServerStubMethods

type GlobWatcherServerStubMethods interface {
	// WatchGlob returns a stream of changes that match a pattern.
	WatchGlob(_ *context.T, _ *GlobWatcherWatchGlobServerCallStub, req GlobRequest) error

GlobWatcherServerStubMethods is the server interface containing GlobWatcher methods, as expected by rpc.Server. The only difference between this interface and GlobWatcherServerMethods is the streaming methods.

type GlobWatcherWatchGlobClientCall

type GlobWatcherWatchGlobClientCall interface {
	// Finish blocks until the server is done, and returns the positional return
	// values for call.
	// Finish returns immediately if the call has been canceled; depending on the
	// timing the output could either be an error signaling cancelation, or the
	// valid positional return values from the server.
	// Calling Finish is mandatory for releasing stream resources, unless the call
	// has been canceled or any of the other methods return an error.  Finish should
	// be called at most once.
	Finish() error

GlobWatcherWatchGlobClientCall represents the call returned from GlobWatcher.WatchGlob.

type GlobWatcherWatchGlobClientStream

type GlobWatcherWatchGlobClientStream interface {
	// RecvStream returns the receiver side of the GlobWatcher.WatchGlob client stream.
	RecvStream() interface {
		// Advance stages an item so that it may be retrieved via Value.  Returns
		// true iff there is an item to retrieve.  Advance must be called before
		// Value is called.  May block if an item is not available.
		Advance() bool
		// Value returns the item that was staged by Advance.  May panic if Advance
		// returned false or was not called.  Never blocks.
		Value() Change
		// Err returns any error encountered by Advance.  Never blocks.
		Err() error

GlobWatcherWatchGlobClientStream is the client stream for GlobWatcher.WatchGlob.

type GlobWatcherWatchGlobServerCall

type GlobWatcherWatchGlobServerCall interface {

GlobWatcherWatchGlobServerCall represents the context passed to GlobWatcher.WatchGlob.

type GlobWatcherWatchGlobServerCallStub

type GlobWatcherWatchGlobServerCallStub struct {

GlobWatcherWatchGlobServerCallStub is a wrapper that converts rpc.StreamServerCall into a typesafe stub that implements GlobWatcherWatchGlobServerCall.

func (*GlobWatcherWatchGlobServerCallStub) Init

Init initializes GlobWatcherWatchGlobServerCallStub from rpc.StreamServerCall.

func (*GlobWatcherWatchGlobServerCallStub) SendStream

func (s *GlobWatcherWatchGlobServerCallStub) SendStream() interface {
	Send(item Change) error

SendStream returns the send side of the GlobWatcher.WatchGlob server stream.

type GlobWatcherWatchGlobServerStream

type GlobWatcherWatchGlobServerStream interface {
	// SendStream returns the send side of the GlobWatcher.WatchGlob server stream.
	SendStream() interface {
		// Send places the item onto the output stream.  Returns errors encountered
		// while sending.  Blocks if there is no buffer space; will unblock when
		// buffer space is available.
		Send(item Change) error

GlobWatcherWatchGlobServerStream is the server stream for GlobWatcher.WatchGlob.

type ResumeMarker

type ResumeMarker []byte

ResumeMarker specifies how much of the existing underlying state is delivered to the client when the watch request is received by the system. The client can set this marker in one of the following ways to get different semantics:

(A) Parameter is left empty.

Semantics: Fetch initial state.
The client wants the entities' initial states to be delivered.
See the description in "Initial State".

(B) Parameter is set to the string "now" (UTF-8 encoding).

Semantics: Fetch new changes only.
The client just wants to get the changes received by the
system after the watch point. The system may deliver changes
from before the watch point as well.

(C) Parameter is set to a value received in an earlier

Change.ResumeMarker field while watching the same entity with
the same query.
Semantics: Resume from a specific point.
The client wants to receive the changes from a specific point
- this value must correspond to a value received in the
Change.ResumeMarker field. The system may deliver changes
from before the ResumeMarker as well.  If the system cannot
resume the stream from this point (e.g., if it is too far
behind in the stream), it can return the
ErrUnknownResumeMarker error.

An implementation MUST support the empty string "" marker (initial state fetching) and the "now" marker. It need not support resuming from a specific point.

func (ResumeMarker) VDLIsZero

func (x ResumeMarker) VDLIsZero() bool

func (*ResumeMarker) VDLRead

func (x *ResumeMarker) VDLRead(dec vdl.Decoder) error

func (ResumeMarker) VDLReflect

func (ResumeMarker) VDLReflect(struct {
	Name string `vdl:""`

func (ResumeMarker) VDLWrite

func (x ResumeMarker) VDLWrite(enc vdl.Encoder) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL