Back to godoc.org
github.com/movio/kasper

Package kasper

v0.0.0 (1cd831c)
Latest Go to latest
Published: Jul 23, 2017 | License: MIT | Module: github.com/movio/kasper

Overview

Kasper is a lightweight library for processing Kafka topics. It is heavily inspired by Apache Samza (See http://samza.apache.org). Kasper processes Kafka messages in small batches and is designed to work with centralized key-value stores such as Redis, Cassandra or Elasticsearch for maintaining state during processing. Kasper is a good fit for high-throughput applications (> 10k messages per second) that can tolerate a moderate amount of processing latency (~1000ms). Please note that Kasper is designed for idempotent processing of at-least-once semantics streams. If you require exactly-once semantics or need to perform non-idempotent operations, Kasper is likely not a good choice.

Step 1 - Create a sarama Client

Kasper uses Shopify's excellent sarama library (see https://github.com/Shopify/sarama) for consuming and producing messages to Kafka. All Kasper application must begin with instantiating a sarama Client. Choose the parameters in sarama.Config carefully; the performance, reliability, and correctness of your application are all highly sensitive to these settings. We recommend setting sarama.Config.Producer.RequiredAcks to WaitForAll.

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
client, err := sarama.NewClient([]string{"kafka-broker.local:9092"}, saramaConfig)

Step 2 - create a Config

TopicProcessorName is used for logging, labeling metrics, and is used as a suffix to the Kafka consumer group. InputTopics and InputPartitions are the lists of topics and partitions to consume. Please note that Kasper currently does not support consuming topics with differing numbers of partitions. This limitation can be alleviated by manually adding an extra fan-out step in your processing pipeline to a new topic with the desired number of partitions.

config := &kasper.Config{
	TopicProcessorName:    "twitter-reach",
	Client:                client,
	InputTopics:           []string{"tweets", "twitter-followers"},
	InputPartitions:       []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
	BatchSize: 	       10000,
	BatchWaitDuration:     5 * time.Second,
	Logger: 	       kasper.NewJSONLogger("twitter-reach-0", false),
	MetricsProvider:       kasper.NewPrometheus("twitter-reach-0"),
	MetricsUpdateInterval: 60 * time.Second,
}

Kasper is instrumented with a number of useful metrics so we recommend setting MetricsProvider for production applications. Kasper includes an implementation for collecting metrics in Prometheus and adapting the interface to other tools should be easy.

Step 3 - Create a MessageProcessor per input partition

You need to create a map[int]MessageProcessor. The MessageProcessor instances can safely be shared across partitions. Each MessageProcessor must implement a single function:

func (*TweetProcessor) Process(messages []*sarama.ConsumerMessage, sender Sender) error {
	// process messages here
}

All messages for the input topics on the specified partition will be passed to the appropriate MessageProcessor instance. This is useful for implementing partition-wise joins of different topics. The Sender instance must be used to produce messages to output topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing. If Process returns a non-nil error value, Kasper stops all processing.

Step 4 - Create a TopicProcessor

To start processing messages, call TopicProcessor.RunLoop(). Kasper does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop() will block the current goroutine and will run forever until an error occurs or until Close() is called. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.

Index

Package Files

type Config

type Config struct {
	// Used for logging, metrics, and Kafka consumer group
	TopicProcessorName string
	// Used for consuming and producing messages
	Client sarama.Client
	// Input topics (all topics need to have the same number of partitions)
	InputTopics []string
	// Input partitions (cannot overlap between TopicProcessor instances)
	InputPartitions []int
	// Maximum number of messages processed in one go
	BatchSize int
	// Maximum amount of time spent waiting for a batch to be filled
	BatchWaitDuration time.Duration
	// Use NewBasicLogger() or any other Logger
	Logger Logger
	// Use NewPrometheus() or any other MetricsProvider
	MetricsProvider MetricsProvider
	// 15 seconds is a sensible value
	MetricsUpdateInterval time.Duration
}

Config contains the configuration settings for a TopicProcessor.

type Counter

type Counter interface {
	Inc(labelValues ...string)
	Add(value float64, labelValues ...string)
}

Counter is a single float metric that can be incremented by one or added to.

type Elasticsearch

type Elasticsearch struct {
	// contains filtered or unexported fields
}

Elasticsearch is an implementation of Store that uses Elasticsearch. Each instance provides key-value access to a given index and a given document type. This implementation supports Elasticsearch 5.x and uses Oliver Eilhard's Go Elasticsearch client. See https://github.com/olivere/elastic

func NewElasticsearch

func NewElasticsearch(config *Config, client *elastic.Client, indexName, typeName string) *Elasticsearch

NewElasticsearch creates Elasticsearch instances. All documents read and written will correspond to the URL:

https://{cluster}:9092/{indexName}/{typeName}/{key}

func (*Elasticsearch) Delete

func (s *Elasticsearch) Delete(key string) error

Delete removes a document from the store. It does not return an error if the document was not present. It is implemented using the Elasticsearch Delete API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html

func (*Elasticsearch) Flush

func (s *Elasticsearch) Flush() error

Flush flushes the Elasticsearch translog to disk. It is implemented using the Elasticsearch Flush API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-flush.html

func (*Elasticsearch) Get

func (s *Elasticsearch) Get(key string) ([]byte, error)

Get gets a document by key (i.e. the Elasticsearch _id). It is implemented by using the Elasticsearch Get API. The returned byte slice contains the UTF8-encoded JSON document (i.e., _source). This function returns (nil, nil) if the document does not exist. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-get.html

func (*Elasticsearch) GetAll

func (s *Elasticsearch) GetAll(keys []string) (map[string][]byte, error)

GetAll gets multiple document from the store. It is implemented using the Elasticsearch MultiGet API. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-multi-get.html

func (*Elasticsearch) GetClient

func (s *Elasticsearch) GetClient() *elastic.Client

GetClient returns the underlying elastic.Client

func (*Elasticsearch) Put

func (s *Elasticsearch) Put(key string, value []byte) error

Put inserts or updates a document in the store (key is used as the document _id). It is implemented using the Elasticsearch Index API. The value byte slice must contain the UTF8-encoded JSON document (i.e., _source). See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html

func (*Elasticsearch) PutAll

func (s *Elasticsearch) PutAll(kvs map[string][]byte) error

PutAll inserts or updates a number of documents in the store. It is implemented using the Elasticsearch Bulk and Index APIs. It returns an error if any operation fails. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

type ElasticsearchTenancy

type ElasticsearchTenancy interface {
	TenantIndexAndType(tenant string) (indexName, typeName string)
}

ElasticsearchTenancy defines how tenanted keys are mapped to an index and type. Here is a simple example:

type CustomerTenancy struct{}
func (CustomerTenancy) TenantIndexAndType(tenant string) (indexName, typeName string) {
	indexName = fmt.Sprintf("sales-service~%s", tenant)
	typeName = "customer"
	return
}

type Gauge

type Gauge interface {
	Set(value float64, labelValues ...string)
}

Gauge is a single float metric that can be set to a specific value.

type Logger

type Logger interface {
	Debug(...interface{})
	Debugf(string, ...interface{})

	Info(...interface{})
	Infof(string, ...interface{})

	Error(...interface{})
	Errorf(string, ...interface{})

	Panic(...interface{})
	Panicf(string, ...interface{})
}

Logger is a logging interface for Kasper.

func NewBasicLogger

func NewBasicLogger(debug bool) Logger

NewBasicLogger uses the Go standard library logger. See https://golang.org/pkg/log/

func NewJSONLogger

func NewJSONLogger(label string, debug bool) Logger

NewJSONLogger uses the logrus JSON formatter. See https://github.com/sirupsen/logrus

func NewTextLogger

func NewTextLogger(label string, debug bool) Logger

NewTextLogger uses the logrus text formatter. See https://github.com/sirupsen/logrus

type Map

type Map struct {
	// contains filtered or unexported fields
}

Map wraps a map[string][]byte value and implements the Store interface.

func NewMap

func NewMap(size int) *Map

NewMap creates a new map of the given size.

func (*Map) Delete

func (s *Map) Delete(key string) error

Delete removes a single value by key. Does not return an error if the key is not present.

func (*Map) Flush

func (s *Map) Flush() error

Flush does nothing.

func (*Map) Get

func (s *Map) Get(key string) ([]byte, error)

Get gets a value by key. Returns (nil, nil) if the key is not present.

func (*Map) GetAll

func (s *Map) GetAll(keys []string) (map[string][]byte, error)

GetAll returns multiple values by key. The returned map does not contain entries for missing documents.

func (*Map) GetMap

func (s *Map) GetMap() map[string][]byte

GetMap returns the underlying map.

func (*Map) Put

func (s *Map) Put(key string, value []byte) error

Put inserts or updates a value by key.

func (*Map) PutAll

func (s *Map) PutAll(kvs map[string][]byte) error

PutAll inserts or updates multiple key-value pairs.

type MessageProcessor

type MessageProcessor interface {
	// Process receives a slice of incoming Kafka messages and a Sender to send messages to output topics.
	// References to the byte slice or Sender interface cannot be held between calls.
	// If Process returns a non-nil error value, Kasper stops all processing.
	// This error value is then returned by TopicProcessor.RunLoop().
	Process([]*sarama.ConsumerMessage, Sender) error
}

MessageProcessor is the interface that encapsulates application business logic. It receives all messages of a single partition of the TopicProcessor's input topics.

type MetricsProvider

type MetricsProvider interface {
	NewCounter(name string, help string, labelNames ...string) Counter
	NewGauge(name string, help string, labelNames ...string) Gauge
	NewSummary(name string, help string, labelNames ...string) Summary
}

MetricsProvider is a facility to create metrics instances.

type MultiElasticsearch

type MultiElasticsearch struct {
	// contains filtered or unexported fields
}

MultiElasticsearch is an implementation of MultiStore that uses Elasticsearch. Each instance provides key-value access to a subset of the Elasticsearch documents defined by a tenancy instance (see ElasticsearchTenancy). This implementation supports Elasticsearch 5.x

func NewMultiElasticsearch

func NewMultiElasticsearch(config *Config, client *elastic.Client, tenancy ElasticsearchTenancy) *MultiElasticsearch

NewMultiElasticsearch creates MultiElasticsearch instances. All documents read and written will correspond to the URL:

https://{cluster}:9092/{indexName}/{typeName}/{key}

where indexName and typeName depend on the tenant and the tenancy instance.

func (*MultiElasticsearch) AllTenants

func (s *MultiElasticsearch) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiElasticsearch) Fetch

func (s *MultiElasticsearch) Fetch(keys []TenantKey) (*MultiMap, error)

Fetch performs a single MultiGet operation on the Elasticsearch cluster across multiple tenants (i.e. indexes).

func (*MultiElasticsearch) Push

func (s *MultiElasticsearch) Push(m *MultiMap) error

Push performs a single Bulk index request with all documents provided. It returns an error if any operation fails.

func (*MultiElasticsearch) Tenant

func (s *MultiElasticsearch) Tenant(tenant string) Store

Tenant returns an Elasticsearch Store for the given tenant. Created instances are cached on future invocations.

type MultiMap

type MultiMap struct {
	// contains filtered or unexported fields
}

MultiMap is a multitenanted version of Map that implements the MultiStore interface.

func NewMultiMap

func NewMultiMap(size int) *MultiMap

NewMultiMap creates new MultiMap. Each underlying Map instance is initialized to the given size.

func (*MultiMap) AllTenants

func (mtkv *MultiMap) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiMap) Fetch

func (mtkv *MultiMap) Fetch(tenantKeys []TenantKey) (*MultiMap, error)

Fetch reads multiple values from the MultiStore.

func (*MultiMap) Push

func (mtkv *MultiMap) Push(store *MultiMap) error

Push inserts or updates multiple values in the MultiStore.

func (*MultiMap) Tenant

func (mtkv *MultiMap) Tenant(tenant string) Store

Tenant returns a Map for the given tenant. Created instances are cached on future invocations.

type MultiRedis

type MultiRedis struct {
	// contains filtered or unexported fields
}

MultiRedis is an implementation of MultiStore that uses Redis. Each instance provides multitenant key-value access to keys of the form {tenant}/{keyPrefix}/{key}. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo

func NewMultiRedis

func NewMultiRedis(config *Config, conn redis.Conn, keyPrefix string) *MultiRedis

NewMultiRedis creates MultiRedis instances. All keys read and written will be of the form:

{tenant}/{keyPrefix}/{key}

func (*MultiRedis) AllTenants

func (s *MultiRedis) AllTenants() []string

AllTenants returns the list of tenants known to this instance.

func (*MultiRedis) Fetch

func (s *MultiRedis) Fetch(keys []TenantKey) (*MultiMap, error)

Fetch performs a single MULTI GET Redis command across multiple tenants.

func (*MultiRedis) Push

func (s *MultiRedis) Push(entries *MultiMap) error

Fetch performs a single MULTI SET Redis command across multiple tenants.

func (*MultiRedis) Tenant

func (s *MultiRedis) Tenant(tenant string) Store

Tenant returns an Redis Store for the given tenant. Created instances are cached on future invocations.

type MultiStore

type MultiStore interface {
	// Tenant returns the underlying store for a tenant.
	Tenant(tenant string) Store
	// AllTenants returns the list of known tenants to this instance.
	AllTenants() []string
	// Fetch is a multitenant version of GetAll.
	Fetch(keys []TenantKey) (*MultiMap, error)
	// Push is a multitenant version of PutAll
	Push(store *MultiMap) error
}

MultiStore is a multitenant version of Store. Tenants are represented as strings. Each tenant has an underlying Store.

type NoopMetricsProvider

type NoopMetricsProvider struct{}

NoopMetricsProvider is a dummy implementation of MetricsProvider that does nothing. Useful for testing, not recommended in production.

func (*NoopMetricsProvider) NewCounter

func (m *NoopMetricsProvider) NewCounter(name string, help string, labelNames ...string) Counter

NewCounter creates a new no-op Counter

func (*NoopMetricsProvider) NewGauge

func (m *NoopMetricsProvider) NewGauge(name string, help string, labelNames ...string) Gauge

NewGauge creates a new no-op Gauge

func (*NoopMetricsProvider) NewSummary

func (m *NoopMetricsProvider) NewSummary(name string, help string, labelNames ...string) Summary

NewSummary creates a new no-op Summary

type Prometheus

type Prometheus struct {
	Registry *prometheus.Registry
	// contains filtered or unexported fields
}

Prometheus is an implementation of MetricsProvider that uses Prometheus. See https://github.com/prometheus/client_golang

func NewPrometheus

func NewPrometheus(label string) *Prometheus

NewPrometheus creates new Prometheus instance.

func (*Prometheus) NewCounter

func (provider *Prometheus) NewCounter(name string, help string, labelNames ...string) Counter

NewCounter creates a new prometheus CounterVec

func (*Prometheus) NewGauge

func (provider *Prometheus) NewGauge(name string, help string, labelNames ...string) Gauge

NewGauge creates a new prometheus GaugeVec

func (*Prometheus) NewSummary

func (provider *Prometheus) NewSummary(name string, help string, labelNames ...string) Summary

NewSummary creates a new prometheus SummaryVec

type Redis

type Redis struct {
	// contains filtered or unexported fields
}

Redis is an implementation of Store that uses Redis. Each instance provides key-value access to keys with a specific prefix. This implementation uses Gary Burd's Go Redis client. See https://github.com/garyburd/redigo

func NewRedis

func NewRedis(config *Config, conn redis.Conn, keyPrefix string) *Redis

NewRedis creates Redis instances. All keys read and written in Redis are of the form:

{keyPrefix}/{key}

func (*Redis) Delete

func (s *Redis) Delete(key string) error

Delete deletes a value by key. It is implemented using the Redis DEL command. See https://redis.io/commands/del

func (*Redis) Flush

func (s *Redis) Flush() error

Flush executes the SAVE command. See https://redis.io/commands/save

func (*Redis) Get

func (s *Redis) Get(key string) ([]byte, error)

Get gets a value by key. Returns nil, nil if the key is missing. It is implemented using the Redis GET command. See https://redis.io/commands/get

func (*Redis) GetAll

func (s *Redis) GetAll(keys []string) (map[string][]byte, error)

GetAll gets multiple values by key. It is implemented by using the MULTI and GET commands. See https://redis.io/commands/multi

func (*Redis) Put

func (s *Redis) Put(key string, value []byte) error

Puts inserts or updates a value by key. It is implemented using the Redis SET command. See https://redis.io/commands/set

func (*Redis) PutAll

func (s *Redis) PutAll(entries map[string][]byte) error

PutAll inserts or updates multiple values by key. It is implemented by using the MULTI and SET commands. See https://redis.io/commands/multi

type Sender

type Sender interface {

	// Send appends a message to a slice held by the sender instance.
	// These messages are sent in bulk when Process() returns or when Flush() is called.
	Send(msg *sarama.ProducerMessage)

	// Flush immediately sends all messages held in the sender slice in bulk, and empties the slice. See Send() above.
	Flush() error
}

Sender instances are given to MessageProcessor.Process to send messages to Kafka topics. Messages passed to Sender are not sent directly but are collected in an array instead. When Process returns, the messages are sent to Kafka and Kasper waits for the configured number of acks. When all messages have been successfully produced, Kasper updates the consumer offsets of the input partitions and resumes processing.

type Store

type Store interface {
	// Get gets a value by key.
	Get(key string) ([]byte, error)
	// GetAll gets multiple values by key.
	GetAll(keys []string) (map[string][]byte, error)
	// Put insert or update a value by key.
	Put(key string, value []byte) error
	// PutAll inserts or updates multiple key-value pairs.
	PutAll(map[string][]byte) error
	// Delete deletes a key from the store.
	Delete(key string) error
	// Flush indicates that the underlying storage must be made persistent.
	Flush() error
}

Store is a universal interface for a key-value store. Keys are strings, and values are byte slices.

type Summary

type Summary interface {
	Observe(value float64, labelValues ...string)
}

Summary is a float value metric that provides a history of observations.

type TenantKey

type TenantKey struct {
	Tenant string
	Key    string
}

TenantKey is a pair of tenant and key. Used by MultiStore.GetAll

type TopicProcessor

type TopicProcessor struct {
	// contains filtered or unexported fields
}

TopicProcessor is the main entity in Kasper. It implements a single-threaded processing loop for a set of topics and partitions.

func NewTopicProcessor

func NewTopicProcessor(config *Config, messageProcessors map[int]MessageProcessor) *TopicProcessor

NewTopicProcessor creates a new instance of TopicProcessor. For parallel processing, run multiple TopicProcessor instances in different goroutines or processes (the input partitions cannot overlap). You should set Config.TopicProcessorName to the same value on all instances in order to easily scale the processing up or down.

func (*TopicProcessor) Close

func (tp *TopicProcessor) Close()

Close safely shuts down the TopicProcessor, which makes RunLoop() return.

func (*TopicProcessor) HasConsumedAllMessages

func (tp *TopicProcessor) HasConsumedAllMessages() bool

HasConsumedAllMessages returns true when all input topics have been entirely consumed. Kasper checks all high water marks and offsets for all topics before returning.

func (*TopicProcessor) RunLoop

func (tp *TopicProcessor) RunLoop() error

RunLoop is the main processing loop of Kasper. It does not spawn any goroutines and runs a single-threaded event loop instead. RunLoop will block the current goroutine and will run forever until an error occurs or until Close() is called. RunLoop propagates the error returned by MessageProcessor.Process if not nil.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier