v0.0.0-...-51f9457 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Jul 9, 2021 License: Apache-2.0 Imports: 22 Imported by: 0



Package bundler is responsible for efficiently transforming aggregate stream data into Butler messages for export.

A process will instantiate a single Bundler instance. The Bundler manages an elastic set of Stream instances, each of which contains state for a single log Stream.

Each Stream instance will have sequential stream binary data appended to it via Append, which it will collect and organize for export as a series of ButlerLogBundle_Entry protobufs. Streams operate independently and buffer data until it is consumed by their Bundler instance. If a Stream's buffer is full, the Stream will block on appending data, which will, in turn, block its data source.

The Bundler owns the various Stream instances. When its Next() method is called, it will sort through the stream instances to prepare an optimally-sized ButlerLogBundle protobuf for export. The construction of this bundle may block pending data, and may be subject to various data urgency requests.

The Bundler acknowledges the following constraints:

  • Data enqueued into a Stream should be exported within a specific period of time from its introduction
  • The exported ButlerLogBundle protobuf must not exceed a maximum bundle size constraint.
  • Stream data may be added during the bundling process, and should be acknowledged if possible.

When a Stream is finished, its Close method should be called. This alerts the Stream that it will receive no more data, causing it to export a terminal ButlerLogBundle and unregister from the Bundler.

The Bundler may block via its CloseAndFinish() method until all Streams are drained and cleared.



This section is empty.


This section is empty.


This section is empty.


type Bundler

type Bundler struct {
	// contains filtered or unexported fields

Bundler is the main Bundler instance. It exposes goroutine-safe endpoints for stream registration and bundle consumption.

func New

func New(c Config) *Bundler

New instantiates a new Bundler instance.

func (*Bundler) CloseAndFlush

func (b *Bundler) CloseAndFlush()

CloseAndFlush closes the Bundler, alerting it that no more streams will be added and that existing data may be aggressively output.

CloseAndFlush will block until all buffered data has been consumed.

func (*Bundler) GetStreamDescs

func (b *Bundler) GetStreamDescs() map[string]*logpb.LogStreamDescriptor

GetStreamDescs returns the set of registered stream names mapped to their descriptors.

This is intended for testing purposes. DO NOT modify the resulting descriptors.

func (*Bundler) Next

func (b *Bundler) Next() *logpb.ButlerLogBundle

Next returns the next bundle, blocking until it is available.

func (*Bundler) Register

func (b *Bundler) Register(d *logpb.LogStreamDescriptor) (Stream, error)

Register adds a new stream to the Bundler, returning a reference to the registered stream.

The Bundler takes ownership of the supplied Properties, and may modify them as needed.

type Config

type Config struct {
	// Clock is the clock instance that will be used for Bundler and stream
	// timing.
	Clock clock.Clock

	// MaxBufferedBytes is the maximum number of bytes to buffer in memory per
	// stream.
	MaxBufferedBytes int64

	// MaxBundleSize is the maximum bundle size in bytes that may be generated.
	// If this value is zero, no size constraint will be applied to generated
	// bundles.
	MaxBundleSize int

	// MaxBufferDelay is the maximum amount of time we're willing to buffer
	// bundled data. Other factors can cause the bundle to be sent before this,
	// but it is an upper bound.
	MaxBufferDelay time.Duration

Config is the Bundler configuration.

type Data

type Data interface {

	// Bind resizes the Chunk buffer and records a timestamp to associate with the
	// data chunk.
	Bind(int, time.Time) Data

	// Timestamp returns the bound timestamp. This will be zero if no timestamp
	// has been bound.
	Timestamp() time.Time

Data is a reusable data buffer that is used by Stream instances to ingest data.

Data is initially an empty buffer. Once data is loaded into it, the buffer is resized to the bound data and a timestamp is attached via Bind.

type Stream

type Stream interface {
	// LeaseData allocates and returns a Data block that stream data can be
	// loaded into. The caller should Release() the Data, or transfer ownership to
	// something that will (e.g., Append()).
	// If the leased data is not Released, it is merely inefficient, not fatal.
	LeaseData() Data

	// Append adds a sequential chunk of data to the Stream. Append may block if
	// the data isn't ready to be consumed.
	// Append takes ownership of the data regardless of whether or not it returns
	// an error. The supplied Data must not be referenced after calling Append.
	Append(Data) error

	// Close closes the Stream, flushing any remaining data.

Stream is an individual Bundler Stream. Data is added to the Stream as a series of ordered binary chunks.

A Stream is not goroutine-safe.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL