goka

package module
v0.0.0-...-2c9e0f5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2018 License: BSD-3-Clause Imports: 20 Imported by: 0

README

Goka License Build Status GoDoc

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka. A more detailed introduction of the project can be found in this blog post.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// emits a single message and leave
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	err = emitter.EmitSync("some-key", "some-value")
	if err != nil {
		log.Fatalf("error emitting message: %v", err)
	}
	fmt.Println("message emitted")
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	go func() {
		if err = p.Start(); err != nil {
			log.Fatalf("error running processor: %v", err)
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	p.Stop() // gracefully stop processor
}

func main() {
	runEmitter()   // emits one message and stops
	runProcessor() // press ctrl-c to stop
}

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

Documentation

Overview

Package goka is a stateful stream processing library for Apache Kafka (version 0.9+) that eases the development of microservices. Goka extends the concept of consumer group with a group table, which represents the state of the group. A microservice modifies and serves the content of a table employing two complementary object types: processors and views.

Processors

A processor is a set of callback functions that modify the group table when messages arrive and may also emit messages into other topics. Messages as well as rows in the group table are key-value pairs. Callbacks receive the arriving message and the row addressed by the message's key.

In Kafka, keys are used to partition topics. A goka processor consumes from a set of co-partitioned topics (topics with the same number of partitions and the same key range). A group topic keeps track of the group table updates, allowing for recovery and rebalancing of processors: When multiple processor instances start in the same consumer group, the instances split the co-partitioned input topics and load the respective group table partitions from the group topic. A local disk storage minimizes recovery time by caching partitions of group table.

Views

A view is a materialized (ie, persistent) cache of a group table. A view subscribes for the updates of all partitions of a group table and keeps local disk storage in sync with the group topic. With a view, one can easily serve up-to-date content of the group table via, for example, gRPC.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultHasher

func DefaultHasher() func() hash.Hash32

DefaultHasher returns an FNV hasher builder to assign keys to partitions.

func DefaultProcessorStoragePath

func DefaultProcessorStoragePath(group Group) string

DefaultProcessorStoragePath is the default path where processor state will be stored.

func DefaultUpdate

func DefaultUpdate(s storage.Storage, partition int32, key string, value []byte) error

DefaultUpdate is the default callback used to update the local storage with from the table topic in Kafka. It is called for every message received during recovery of processors and during the normal operation of views. DefaultUpdate can be used in the function passed to WithUpdateCallback and WithViewCallback.

func DefaultViewStoragePath

func DefaultViewStoragePath() string

DefaultViewStoragePath returns the default path where view state will be stored.

func NewConstHasher

func NewConstHasher(part uint32) hash.Hash32

func NewMockController

func NewMockController(t gomock.TestReporter) *gomock.Controller

NewMockController returns a *gomock.Controller using a wrapped testing.T (or whatever) which panics on a Fatalf. This is necessary when using a mock in kafkamock. Otherwise it will freeze on an unexpected call.

Types

type Codec

type Codec interface {
	Encode(value interface{}) (data []byte, err error)
	Decode(data []byte) (value interface{}, err error)
}

Codec decodes and encodes from and to []byte

type Context

type Context interface {
	// Topic returns the topic of input message.
	Topic() Stream

	// Key returns the key of the input message.
	Key() string

	// Value returns the value of the key in the group table.
	Value() interface{}

	// SetValue updates the value of the key in the group table.
	SetValue(value interface{})

	// Delete deletes a value from the group table. IMPORTANT: this deletes the
	// value associated with the key from both the local cache and the persisted
	// table in Kafka.
	Delete()

	// Timestamp returns the timestamp of the input message. If the timestamp is
	// invalid, a zero time will be returned.
	Timestamp() time.Time

	// Join returns the value of key in the copartitioned table.
	Join(topic Table) interface{}

	// Lookup returns the value of key in the view of table.
	Lookup(topic Table, key string) interface{}

	// Emit asynchronously writes a message into a topic.
	Emit(topic Stream, key string, value interface{})

	// Loopback asynchronously sends a message to another key of the group
	// table. Value passed to loopback is encoded via the codec given in the
	// Loop subscription.
	Loopback(key string, value interface{})

	// Fail stops execution and shuts down the processor
	Fail(err error)
}

Context provides access to the processor's table and emit capabilities to arbitrary topics in kafka. Upon arrival of a message from subscribed topics, the respective ConsumeCallback is invoked with a context object along with the input message.

type Edge

type Edge interface {
	String() string
	Topic() string
	Codec() Codec
}

func Input

func Input(topic Stream, c Codec, cb ProcessCallback) Edge

Stream returns a subscription for a co-partitioned topic. The processor subscribing for a stream topic will start reading from the newest offset of the partition.

func Inputs

func Inputs(topics Streams, c Codec, cb ProcessCallback) Edge

Inputs creates Edges for multiple input streams sharing the same codec and callback.

func Join

func Join(topic Table, c Codec) Edge

Table is one or more co-partitioned, log-compacted topic. The processor subscribing for a table topic will start reading from the oldest offset of the partition.

func Lookup

func Lookup(topic Table, c Codec) Edge

func Loop

func Loop(c Codec, cb ProcessCallback) Edge

Loop defines a consume callback on the loop topic

func Output

func Output(topic Stream, c Codec) Edge

func Persist

func Persist(c Codec) Edge

type Edges

type Edges []Edge

func (Edges) Topics

func (e Edges) Topics() []string

type EmitHandler

type EmitHandler func(topic string, key string, value []byte) *kafka.Promise

EmitHandler abstracts a function that allows to overwrite kafkamock's Emit function to simulate producer errors

type Emitter

type Emitter struct {
	// contains filtered or unexported fields
}

func NewEmitter

func NewEmitter(brokers []string, topic Stream, codec Codec, options ...EmitterOption) (*Emitter, error)

NewEmitter creates a new emitter using passed brokers, topic, codec and possibly options

func (*Emitter) Emit

func (e *Emitter) Emit(key string, msg interface{}) (*kafka.Promise, error)

Emit sends a message for passed key using the emitter's codec.

func (*Emitter) EmitSync

func (e *Emitter) EmitSync(key string, msg interface{}) error

EmitSync sends a message to passed topic and key

func (*Emitter) Finish

func (e *Emitter) Finish()

Finish waits until the emitter is finished producing all pending messages

type EmitterOption

type EmitterOption func(*eoptions)

EmitterOption defines a configuration option to be used when creating an emitter.

func WithEmitterClientID

func WithEmitterClientID(clientID string) EmitterOption

WithEmitterClientID defines the client ID used to identify with kafka.

func WithEmitterHasher

func WithEmitterHasher(hasher func() hash.Hash32) EmitterOption

WithEmitterHasher sets the hash function that assigns keys to partitions.

func WithEmitterLogger

func WithEmitterLogger(log logger.Logger) EmitterOption

WithEmitterLogger sets the logger the emitter should use. By default, emitters use the standard library logger.

func WithEmitterProducerBuilder

func WithEmitterProducerBuilder(pb kafka.ProducerBuilder) EmitterOption

WithEmitterProducerBuilder replaces the default producer builder.

func WithEmitterTopicManagerBuilder

func WithEmitterTopicManagerBuilder(tmb kafka.TopicManagerBuilder) EmitterOption

WithEmitterTopicManager replaces the default topic manager builder.

type Getter

type Getter func(string) (interface{}, error)

Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.

type Group

type Group string

type GroupGraph

type GroupGraph struct {
	// contains filtered or unexported fields
}

func DefineGroup

func DefineGroup(group Group, edges ...Edge) *GroupGraph

func (*GroupGraph) Group

func (gg *GroupGraph) Group() Group

func (*GroupGraph) GroupTable

func (gg *GroupGraph) GroupTable() Edge

func (*GroupGraph) InputStreams

func (gg *GroupGraph) InputStreams() Edges

func (*GroupGraph) JointTables

func (gg *GroupGraph) JointTables() Edges

func (*GroupGraph) LookupTables

func (gg *GroupGraph) LookupTables() Edges

func (*GroupGraph) LoopStream

func (gg *GroupGraph) LoopStream() Edge

func (*GroupGraph) OutputStreams

func (gg *GroupGraph) OutputStreams() Edges

func (*GroupGraph) Validate

func (gg *GroupGraph) Validate() error

type InputStats

type InputStats struct {
	Count uint
	Bytes int
	Delay time.Duration
}

Input Streams/Table

type Iterator

type Iterator interface {
	Next() bool
	Key() string
	Value() (interface{}, error)
	Release()
	Seek(key string) bool
}

type KafkaMock

type KafkaMock struct {
	// contains filtered or unexported fields
}

KafkaMock allows interacting with a test processor

func NewKafkaMock

func NewKafkaMock(t Tester, groupName Group) *KafkaMock

NewKafkaMock returns a new testprocessor mocking every external service

func (*KafkaMock) Consume

func (km *KafkaMock) Consume(topic string, key string, msg []byte)

func (*KafkaMock) ConsumeProto

func (km *KafkaMock) ConsumeProto(topic string, key string, msg proto.Message)

ConsumeProto simulates a message on kafka in a topic with a key.

func (*KafkaMock) ConsumeString

func (km *KafkaMock) ConsumeString(topic string, key string, msg string)

func (*KafkaMock) ExpectAllEmitted

func (km *KafkaMock) ExpectAllEmitted(handler func(topic string, key string, value []byte))

ExpectAllEmitted calls passed expected-emit-handler function for all emitted values and clears the emitted values

func (*KafkaMock) ExpectEmit

func (km *KafkaMock) ExpectEmit(topic string, key string, expecter func(value []byte))

ExpectEmit ensures a message exists in passed topic and key. The message may be inspected/unmarshalled by a passed expecter function.

func (*KafkaMock) Finish

func (km *KafkaMock) Finish(fail bool)

Finish marks the kafkamock that there is no emit to be expected. Set @param fail to true, if kafkamock is supposed to fail the test case in case of remaining emits. Clears the list of emits either case. This should always be called at the end of a test case to make sure no emits of prior test cases are stuck in the list and mess with the test results.

func (*KafkaMock) ProcessorOptions

func (km *KafkaMock) ProcessorOptions() []ProcessorOption

ProcessorOptions returns the options that must be passed to NewProcessor to use the Mock. It essentially replaces the consumer/producer/topicmanager with a mock. For convenience, the storage is also mocked. For example, a normal call to NewProcessor like this

NewProcessor(brokers, group, subscriptions,
                  option_a,
                  option_b,
                  option_c,
)

would become in the unit test: kafkaMock := NewKafkaMock(t) NewProcessor(brokers, group, subscriptions,

                  append(kafkaMock.ProcessorOptions(),
                  option_a,
                  option_b,
                  option_c,
                  )...,
)

func (*KafkaMock) ReplaceEmitHandler

func (km *KafkaMock) ReplaceEmitHandler(emitter EmitHandler)

func (*KafkaMock) SetCodec

func (km *KafkaMock) SetCodec(codec Codec) *KafkaMock

func (*KafkaMock) SetGroupTableCreator

func (km *KafkaMock) SetGroupTableCreator(creator func() (string, []byte))

func (*KafkaMock) SetValue

func (km *KafkaMock) SetValue(key string, value interface{})

SetValue sets a value in the storage.

func (*KafkaMock) ValueForKey

func (km *KafkaMock) ValueForKey(key string) interface{}

ValueForKey attempts to get a value from KafkaMock's storage.

type NilHandling

type NilHandling int
const (
	// NilIgnore drops any message with nil value.
	NilIgnore NilHandling = 0 + iota
	// NilProcess passes the nil value to ProcessCallback.
	NilProcess
	// NilDecode passes the nil value to decoder before calling ProcessCallback.
	NilDecode
)

type OutputStats

type OutputStats struct {
	Count uint
	Bytes int
}

Output Streams/Table

type PartitionStats

type PartitionStats struct {
	Now time.Time

	Table struct {
		Status  PartitionStatus
		Stalled bool

		Offset int64 // last offset processed or recovered
		Hwm    int64 // next offset to be written

		StartTime    time.Time
		RecoveryTime time.Time
	}
	Input  map[string]InputStats
	Output map[string]OutputStats
}

type PartitionStatus

type PartitionStatus int
const (
	PartitionRecovering PartitionStatus = iota
	PartitionPreparing
	PartitionRunning
)

type ProcessCallback

type ProcessCallback func(ctx Context, msg interface{})

ProcessCallback function is called for every message received by the processor.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is a set of stateful callback functions that, on the arrival of messages, modify the content of a table (the group table) and emit messages into other topics. Messages as well as rows in the group table are key-value pairs. A group is composed by multiple processor instances.

Example (Simplest)

Example shows how to use a callback. For each partition of the topics, a new goroutine will be created. Topics should be co-partitioned (they should have the same number of partitions and be partitioned by the same key).

var (
	brokers        = []string{"127.0.0.1:9092"}
	group   Group  = "group"
	topic   Stream = "topic"
)

consume := func(ctx Context, m interface{}) {
	fmt.Printf("Hello world: %v", m)
}

c, err := NewProcessor(brokers, DefineGroup(group, Input(topic, rawCodec, consume)))
if err != nil {
	log.Fatalln(err)
}

// start consumer with a goroutine (blocks)
go func() {
	err := c.Start()
	panic(err)
}()

// wait for bad things to happen
wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
<-wait
c.Stop()
Output:

func NewProcessor

func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error)

NewProcessor creates a processor instance in a group given the address of Kafka brokers, the consumer group name, a list of subscriptions (topics, codecs, and callbacks), and series of options.

func (*Processor) Get

func (g *Processor) Get(key string) (interface{}, error)

Get returns a read-only copy of a value from the group table if the respective partition is owned by the processor instace. Get can be called by multiple goroutines concurrently. Get can be only used with stateful processors (ie, when group table is enabled) and after Recovered returns true.

func (*Processor) Graph

func (g *Processor) Graph() *GroupGraph

func (*Processor) Recovered

func (g *Processor) Recovered() bool

Recovered returns true when the processor has caught up with events from kafka.

func (*Processor) Start

func (g *Processor) Start() error

Start starts receiving messages from Kafka for the subscribed topics. For each partition, a recovery will be attempted.

func (*Processor) Stats

func (g *Processor) Stats() *ProcessorStats

func (*Processor) Stop

func (g *Processor) Stop()

Stop gracefully stops the consumer

type ProcessorOption

type ProcessorOption func(*poptions)

ProcessorOption defines a configuration option to be used when creating a processor.

func WithClientID

func WithClientID(clientID string) ProcessorOption

WithClientID defines the client ID used to identify with Kafka.

func WithConsumerBuilder

func WithConsumerBuilder(cb kafka.ConsumerBuilder) ProcessorOption

WithConsumerBuilder replaces the default consumer builder.

func WithHasher

func WithHasher(hasher func() hash.Hash32) ProcessorOption

WithHasher sets the hash function that assigns keys to partitions.

func WithLogger

func WithLogger(log logger.Logger) ProcessorOption

WithLogger sets the logger the processor should use. By default, processors use the standard library logger.

func WithNilHandling

func WithNilHandling(nh NilHandling) ProcessorOption

WithNilHandling configures how the processor should handle messages with nil value. By default the processor ignores nil messages.

func WithPartitionChannelSize

func WithPartitionChannelSize(size int) ProcessorOption

WithPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithProducerBuilder

func WithProducerBuilder(pb kafka.ProducerBuilder) ProcessorOption

WithProducerBuilder replaces the default producer builder.

func WithStorageBuilder

func WithStorageBuilder(sb storage.Builder) ProcessorOption

WithStorageBuilder defines a builder for the storage of each partition.

func WithTopicManagerBuilder

func WithTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ProcessorOption

WithTopicManagerBuilder replaces the default topic manager builder.

func WithUpdateCallback

func WithUpdateCallback(cb UpdateCallback) ProcessorOption

WithUpdateCallback defines the callback called upon recovering a message from the log.

type ProcessorStats

type ProcessorStats struct {
	Group  map[int32]*PartitionStats
	Joined map[int32]map[string]*PartitionStats
	Lookup map[string]*ViewStats
}

type Stream

type Stream string

type Streams

type Streams []Stream

type Table

type Table string

func GroupTable

func GroupTable(group Group) Table

GroupTable returns the name of the group table of group.

type Tester

type Tester interface {
	Errorf(format string, args ...interface{})
	Fatalf(format string, args ...interface{})
	Fatal(a ...interface{})
}

Tester abstracts the interface we assume from the test case. Will most likely be *testing.T

type UpdateCallback

type UpdateCallback func(s storage.Storage, partition int32, key string, value []byte) error

UpdateCallback is invoked upon arrival of a message for a table partition. The partition storage shall be updated in the callback.

type View

type View struct {
	// contains filtered or unexported fields
}

View is a materialized (i.e. persistent) cache of a group table.

Example (Simple)
var (
	brokers       = []string{"localhost:9092"}
	group   Group = "group-name"
)
sr, err := NewView(brokers, GroupTable(group), nil)
if err != nil {
	panic(err)
}
errs := sr.Start()
if errs != nil {
	panic(errs)
}
Output:

func NewView

func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) (*View, error)

NewView creates a new View object from a group.

func (*View) Evict

func (v *View) Evict(key string) error

Evict removes the given key only from the local cache. In order to delete a key from Kafka and other Views, context.Delete should be used on a Processor.

func (*View) Get

func (v *View) Get(key string) (interface{}, error)

Get returns the value for the key in the view, if exists. Nil if it doesn't. Get can be called by multiple goroutines concurrently. Get can only be called after Recovered returns true.

func (*View) Has

func (v *View) Has(key string) (bool, error)

Has checks whether a value for passed key exists in the view.

func (*View) Iterator

func (v *View) Iterator() (Iterator, error)

Iterator returns an iterator that iterates over the state of the View.

func (*View) IteratorWithRange

func (v *View) IteratorWithRange(start, limit string) (Iterator, error)

IteratorWithRange returns an iterator that iterates over the state of the View. This iterator is build using the range.

func (*View) Recovered

func (v *View) Recovered() bool

Recovered returns true when the view has caught up with events from kafka.

func (*View) Start

func (v *View) Start() error

Start starts consuming the view's topic.

func (*View) Stats

func (v *View) Stats() *ViewStats

func (*View) Stop

func (v *View) Stop()

Stop stops the view, closes storage partitions, and frees resources

func (*View) Topic

func (v *View) Topic() string

Topic returns the view's topic

type ViewOption

type ViewOption func(*voptions)

ViewOption defines a configuration option to be used when creating a view.

func WithViewCallback

func WithViewCallback(cb UpdateCallback) ViewOption

WithViewCallback defines the callback called upon recovering a message from the log.

func WithViewClientID

func WithViewClientID(clientID string) ViewOption

WithViewClientID defines the client ID used to identify with Kafka.

func WithViewConsumerBuilder

func WithViewConsumerBuilder(cb kafka.ConsumerBuilder) ViewOption

WithViewConsumer replaces default view consumer.

func WithViewHasher

func WithViewHasher(hasher func() hash.Hash32) ViewOption

WithViewHasher sets the hash function that assigns keys to partitions.

func WithViewLogger

func WithViewLogger(log logger.Logger) ViewOption

WithViewLogger sets the logger the view should use. By default, views use the standard library logger.

func WithViewPartitionChannelSize

func WithViewPartitionChannelSize(size int) ViewOption

WithViewPartitionChannelSize replaces the default partition channel size. This is mostly used for testing by setting it to 0 to have synchronous behavior of goka.

func WithViewRestartable

func WithViewRestartable() ViewOption

WithViewRestartable defines the view can be restarted, even when Start() returns errors. If the view is restartable, the client must call Stop() to release all resources.

func WithViewStorageBuilder

func WithViewStorageBuilder(sb storage.Builder) ViewOption

WithViewStorageBuilder defines a builder for the storage of each partition.

func WithViewTopicManagerBuilder

func WithViewTopicManagerBuilder(tmb kafka.TopicManagerBuilder) ViewOption

WithViewTopicManager replaces the default topic manager.

type ViewStats

type ViewStats struct {
	Partitions map[int32]*PartitionStats
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL