stream_sum

package
v0.89.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2021 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package stream_sum is an example application consisting of three stages:

1) A `chunker` job randomly generates a number of unique "streams", with stream content emitted across a number of interleaved data chunks.

2) A `summer` consumer accumulates stream chunks and computes a running SHA1-sum of each stream's content. When the stream is completed, the `summer` consumer emits a final sum to an output journal.

3) Having written a complete stream, the `chunker` job confirms that the correct sum is written to the output journal.

The `chunker` and `summer` tasks may be independently scaled, and are invariant to process failures and restarts.

The stream-sum example application is also a sneaky integration test: it actively verifies processing guarantees provided by Gazette, such as exactly-once semantics and bounds on end-to-end latency, and fails if those properties are not met. stream-sum is used in Gazette's continuous integration and chaos-testing suites.

Index

Constants

View Source
const FinalSumsJournal pb.Journal = "examples/stream-sum/sums"

FinalSumsJournal to which final stream sums are written.

Variables

This section is empty.

Functions

func GenerateAndVerifyStreams

func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error

GenerateAndVerifyStreams is the main routine of the `chunker` job. It generates and verifies streams based on the ChunkerConfig.

Types

type Chunk

type Chunk struct {
	UUID  message.UUID
	ID    StreamID // Unique ID of the stream.
	SeqNo int      // Monotonic sequence number, starting from 1.
	Data  []byte   // Raw data included in the Value. If empty, this is the stream's final chunk.
}

Chunk is an ordered slice of stream content.

func (*Chunk) GetUUID added in v0.83.1

func (c *Chunk) GetUUID() message.UUID

func (*Chunk) NewAcknowledgement added in v0.83.1

func (c *Chunk) NewAcknowledgement(pb.Journal) message.Message

func (*Chunk) SetUUID added in v0.83.1

func (c *Chunk) SetUUID(uuid message.UUID)

type ChunkerConfig

type ChunkerConfig struct {
	Chunker struct {
		mbp.ZoneConfig
		Streams int `long:"streams" default:"-1" description:"Number of streams each worker should create. <0 for infinite"`
		Chunks  int `long:"chunks" default:"100" description:"Number of chunks per stream"`
		Workers int `long:"workers" default:"4" description:"Number of parallel workers"`
	} `group:"Chunker" namespace:"chunker" env-namespace:"CHUNKER"`

	Broker      mbp.ClientConfig      `group:"Broker" namespace:"broker" env-namespace:"BROKER"`
	Log         mbp.LogConfig         `group:"Logging" namespace:"log" env-namespace:"LOG"`
	Diagnostics mbp.DiagnosticsConfig `group:"Debug" namespace:"debug" env-namespace:"DEBUG"`
}

ChunkerConfig is the configuration used by the `chunker` job binary.

type StreamID

type StreamID [16]byte

StreamID uniquely identifies a stream.

type Sum

type Sum struct {
	UUID  message.UUID
	ID    StreamID // Unique ID of the stream.
	SeqNo int      // SeqNo of last Chunk summed over.
	Value uint64   // Computed sum through SeqNo.
}

Sum represents a partial or final CRC64 sum of a stream.

func (*Sum) GetUUID added in v0.83.1

func (s *Sum) GetUUID() message.UUID

func (*Sum) NewAcknowledgement added in v0.83.1

func (s *Sum) NewAcknowledgement(pb.Journal) message.Message

func (*Sum) SetUUID added in v0.83.1

func (s *Sum) SetUUID(uuid message.UUID)

func (*Sum) Update

func (s *Sum) Update(chunk Chunk) (done bool, err error)

Update folds a Chunk into this Sum, returning whether this is the last Chunk of the Stream.

type Summer

type Summer struct{}

Summer consumes stream chunks, aggregates chunk data, and emits final sums. It implements the runconsumer.Application interface.

func (Summer) ConsumeMessage

func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope, pub *message.Publisher) error

ConsumeMessage folds a Chunk into its respective partial stream sum. If the Chunk represents a stream EOF, it emits a final sum. consumer.Application implementation.

func (Summer) FinalizeTxn

func (Summer) FinalizeTxn(shard consumer.Shard, store consumer.Store, _ *message.Publisher) error

FinalizeTxn marshals partial stream sums to the |store| to ensure persistence across consumer transactions. consumer.Application implementation.

func (Summer) InitApplication

func (Summer) InitApplication(args runconsumer.InitArgs) error

InitApplication is a no-op, as Summer provides no client-facing APIs.

func (Summer) NewConfig

func (Summer) NewConfig() runconsumer.Config

NewConfig returns a new BaseConfig.

func (Summer) NewMessage

func (Summer) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns a Chunk message. consumer.Application implementation.

func (Summer) NewStore

func (Summer) NewStore(shard consumer.Shard, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore builds a RocksDB or SQLite store for the Shard. consumer.Application implementation.

Directories

Path Synopsis
Package summer runs the stream_sum.Summer consumer.
Package summer runs the stream_sum.Summer consumer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL