kinesis

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2019 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AfterRecord is a checkpoint strategy
	// When set it stores checkpoint in every record.
	AfterRecord CheckpointStrategy = iota
	// AfterRecordBatch is a checkpoint strategy
	// When set it stores checkpoint in every record batch.
	AfterRecordBatch
	// IteratorTypeTail is a iterator type that defines that consumer starts reading from tail.
	IteratorTypeTail IteratorType = iota
	// IteratorTypeHead is a iterator type that defines that consumer starts reading from beginning.
	IteratorTypeHead
	// IteratorTypeSequence is a iterator type that defines that consumer starts reading from a sequence number.
	IteratorTypeSequence
	// IteratorTypeAfterSequence is a iterator type that defines that consumer starts reading from a sequence number + 1.
	IteratorTypeAfterSequence
)
View Source
const (
	// StreamCheckedTriggered event
	StreamCheckedTriggered = "stream_checker_triggered"
	// ShardManagerTriggered event
	ShardManagerTriggered = "shard_manager_triggered"
	// ShardIteratorTriggered event
	ShardIteratorTriggered = "shard_iterator_triggered"
	// RecordProcessedSuccess event
	RecordProcessedSuccess = "record_processed_success"
	// RecordProcessedFail event
	RecordProcessedFail = "record_processed_fail"
)
View Source
const (
	// LevelDebug LogLevel
	LevelDebug = "debug"
	// LevelInfo LogLevel
	LevelInfo = "info"
	// LevelError LogLevel
	LevelError = "error"
)

Variables

This section is empty.

Functions

func NewClient

func NewClient(config AWSConfig) (*kinesis.Kinesis, error)

NewClient creates a new kinesis client

Types

type AWSConfig

type AWSConfig struct {
	Endpoint string `json:"endpoint" mapstructure:"endpoint"`
	Region   string `json:"region" mapstructure:"region"`
}

AWSConfig is a aws configuration.

type AverageStats

type AverageStats struct {
	Count       int
	MaxDuration time.Duration
	SumDuration time.Duration
}

AverageStats holds average counters.

func (*AverageStats) Add

func (s *AverageStats) Add(duration time.Duration)

Add adds stat.

type Checkpoint

type Checkpoint interface {
	Get(key string) (string, error)
	Set(key, value string) error
}

Checkpoint manages last checkpoint.

type CheckpointStrategy

type CheckpointStrategy = int

CheckpointStrategy checkpoint behaviour.

type Consumer

type Consumer struct {
	ConsumerOptions
	// contains filtered or unexported fields
}

Consumer is a kinesis stream consumer.

func NewConsumer

func NewConsumer(config ConsumerConfig, handler MessageHandler, checkpoint Checkpoint, opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates a new kinesis consumer

func (*Consumer) Log

func (c *Consumer) Log(level string, data map[string]interface{}, format string, args ...interface{})

Log main logger.

func (*Consumer) LogEvent

func (c *Consumer) LogEvent(event EventLog)

LogEvent main log event.

func (*Consumer) ResetIterators

func (c *Consumer) ResetIterators() error

ResetIterators resets iterator for active shards.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run runs the consumer.

func (*Consumer) SetEventLogger

func (c *Consumer) SetEventLogger(eventLogger EventLogger)

SetEventLogger allows you to set the event logger.

func (*Consumer) SetLogger

func (c *Consumer) SetLogger(logger Logger)

SetLogger allows you to set the logger.

func (*Consumer) Start

func (c *Consumer) Start() error

Start starts consumer.

func (*Consumer) Stats

func (c *Consumer) Stats() ConsumerStats

Stats returns consumer stats.

func (*Consumer) Stop

func (c *Consumer) Stop() error

Stop stops consumer

type ConsumerConfig

type ConsumerConfig struct {
	AWS                  AWSConfig     `json:"aws" mapstructure:"aws"`
	Group                string        `json:"group" mapstructure:"group" validate:"nonzero"`
	Stream               string        `json:"stream" mapstructure:"stream" validate:"nonzero"`
	StreamCheckTick      time.Duration `json:"stream_tick" mapstructure:"stream_tick"`
	RunnerFactoryTick    time.Duration `json:"runner_factory_tick" mapstructure:"runner_factory_tick"`
	RunnerTick           time.Duration `json:"runner_tick" mapstructure:"runner_tick"`
	RunnerGetRecordsRate time.Duration `json:"runner_get_records_rate" mapstructure:"runner_get_records_rate"`
}

ConsumerConfig is a kinesis consumer configuration.

type ConsumerIterator

type ConsumerIterator struct {
	Type     IteratorType
	ShardID  string
	Sequence string
}

ConsumerIterator is a iterator configuration by shard.

type ConsumerOption

type ConsumerOption func(*ConsumerOptions)

ConsumerOption is the abstract functional-parameter type used for worker configuration.

func SinceLatest

func SinceLatest() ConsumerOption

SinceLatest allows you to set kinesis iterator. Starts reading just after the most recent record in the shard

func SinceOldest

func SinceOldest() ConsumerOption

SinceOldest allows you to set kinesis iterator. Starts reading at the last untrimmed record in the shard in the system

func SinceSequence

func SinceSequence(shardID string, sequence string) ConsumerOption

SinceSequence allows you to set kinesis iterator. Starts reading since sequence number in a specific shard.

func SkipReshardingOrder

func SkipReshardingOrder() ConsumerOption

SkipReshardingOrder allows you to set consumer to start reading shards since detected.

func WithCheckpointStrategy

func WithCheckpointStrategy(strategy CheckpointStrategy) ConsumerOption

WithCheckpointStrategy allows you to configure checkpoint strategy.

func WithClient

func WithClient(client kinesisiface.KinesisAPI) ConsumerOption

WithClient allows you to set a kinesis client.

func WithShards

func WithShards(shardIDs ...string) ConsumerOption

WithShards allows you to set a filtered shards. Consumer only reads specified shards ids.

func WithSpecificIterators

func WithSpecificIterators(iterators map[string]ConsumerIterator) ConsumerOption

WithSpecificIterators allows you to set a specific iterators per shard.

type ConsumerOptions

type ConsumerOptions struct {
	// contains filtered or unexported fields
}

ConsumerOptions holds all consumer options.

type ConsumerStats

type ConsumerStats struct {
	RecordsFailed  AverageStats
	RecordsSuccess AverageStats
}

ConsumerStats structure collects all stats.

type EventLog

type EventLog struct {
	Event  string
	Elapse time.Duration
}

EventLog structure contains all information of what happened.

type EventLogger

type EventLogger interface {
	LogEvent(event EventLog)
}

EventLogger callback that is called every event.

type IteratorType

type IteratorType = int

IteratorType iterator type.

type Logger

type Logger interface {
	Log(level string, data map[string]interface{}, format string, args ...interface{})
}

Logger interface

type Message

type Message struct {
	Data         []byte
	PartitionKey string
	ShardID      string
}

Message is a kinesis message.

type MessageHandler

type MessageHandler func(ctx context.Context, msg Message) error

MessageHandler is the message handler.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer should be able to dispatch messages

func NewProducer

func NewProducer(config ProducerConfig) (*Producer, error)

NewProducer creates a new kinesis producer

func (Producer) Publish

func (producer Producer) Publish(message Message) error

Publish publishes content to kinesis.

func (Producer) PublishBatch

func (producer Producer) PublishBatch(messages []Message) error

PublishBatch publishes contents to kinesis.

func (Producer) PublishBatchWithContext

func (producer Producer) PublishBatchWithContext(ctx context.Context, messages []Message) error

PublishBatchWithContext publishes contents to kinesis.

func (Producer) PublishWithContext

func (producer Producer) PublishWithContext(ctx context.Context, message Message) error

PublishWithContext publishes content to kinesis.

type ProducerConfig

type ProducerConfig struct {
	AWS    AWSConfig `json:"aws" mapstructure:"aws" validate:"nonzero"`
	Stream string    `json:"stream" mapstructure:"stream" validate:"nonzero"`
}

ProducerConfig is a kinesis producer configuration.

type Runner

type Runner interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	ShardID() string
	Closed() bool
	RestartCursor()
}

Runner is a shard iterator.

type RunnerFactory

type RunnerFactory interface {
	Run(ctx context.Context) error
	ResetCursors()
}

RunnerFactory handler stream sharding.

type StreamChecker

type StreamChecker interface {
	Run(ctx context.Context) error
	SetDeletingCallback(cb func())
}

StreamChecker checks stream state and handles it.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL