Package coordinator implements a minimal interface to the Coordinator service that is sufficient for Collector usage.

The interface also serves as an API abstraction boundary between the current Coordinator service definition and the Collector's logic.


Coordinator methods are called very heavily during Collector operation. In production, the Coordinator instance should be wrapped in a Cache structure to locally cache Coordinator's known state.

The cache is responsible for two things: Firstly, it coalesces multiple pending requests for the same stream state into a single Coordinator request. Secondly, it maintains a cache of completed responses to short-circuit the Coordinator.

Stream state is stored internally as a Promise. This Promise is evaluated by querying the Coordinator. This interface is hidden to callers.



View Source
const (
	// DefaultSize is the default (maximum) size of the LRU cache.
	DefaultSize = 1024 * 1024

	// DefaultExpiration is the default expiration value.
	DefaultExpiration = 10 * time.Minute


This section is empty.


This section is empty.


type Coordinator

type Coordinator interface {
	// RegisterStream registers a log stream state.
	RegisterStream(c context.Context, s *LogStreamState, desc []byte) (*LogStreamState, error)
	// TerminateStream registers the terminal index of a log stream state.
	TerminateStream(c context.Context, s *TerminateRequest) error

Coordinator is an interface to a remote LogDog Coordinator service. This is a simplified version of the Coordinator's Service API tailored specifically to the Collector's usage.

All Coordinator methods will return transient-wrapped errors if appropriate.

func NewCache

func NewCache(c Coordinator, size int, expiration time.Duration) Coordinator

NewCache creates a new Coordinator instance that wraps another Coordinator instance with a cache that retains the latest remote Coordinator state in a client-side LRU cache.

If size is <= 0, DefaultSize will be used. If expiration is <= 0, DefaultExpiration will be used.

func NewCoordinator

func NewCoordinator(s logdog.ServicesClient) Coordinator

NewCoordinator returns a Coordinator implementation that uses a logdog.ServicesClient.

type LogStreamState

type LogStreamState struct {
	// Project is the log stream project.
	Project string
	// Path is the log stream path.
	Path types.StreamPath

	// ID is the stream's Coordinator ID. This is returned by the Coordinator.
	ID string
	// ProtoVersion is the stream protocol version string.
	ProtoVersion string
	// Secret is the log stream's prefix secret.
	Secret types.PrefixSecret
	// TerminalIndex is an optional terminal index to register alongside the
	// stream. If this is <0, the stream will be registered without a terminal
	// index.
	TerminalIndex types.MessageIndex

	// Archived is true if the log stream has been archived. This is returned by
	// the remote service.
	Archived bool
	// Purged is true if the log stream has been archived. This is returned by
	// the remote service.
	Purged bool

LogStreamState is a local representation of a remote stream's state. It is a subset of the remote state with the necessary elements for the Collector to operate and update.

type TerminateRequest

type TerminateRequest struct {
	Project string           // Project name.
	Path    types.StreamPath // Stream path. Needed for cache lookup.

	// ID is the stream's Coordinator ID, as indicated by the Coordinator.
	ID string
	// TerminalIndex is the terminal index to register.
	// This must be >= 0, else there is no point in sending the TerminateStream
	// request.
	TerminalIndex types.MessageIndex
	// Secret is the log stream's prefix secret.
	Secret types.PrefixSecret

TerminateRequest is a local representation of a Coordinator stream termination request.