producer

package
v0.0.0-...-0820d52 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 5, 2019 License: GPL-3.0 Imports: 11 Imported by: 0

Documentation

Overview

Package producer is a generated protocol buffer package.

It is generated from these files:

messages.proto

It has these top-level messages:

AggregatedRecord
Tag
Record

Package producer is a KPL-like batch producer for Amazon Kinesis built on top of the official Go AWS SDK and using the same aggregation format that KPL use.

Note: This project started as a fork of `tj/go-kinesis`. If you are not interested in the KPL aggregation logic, you will probably want to check it out.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	ErrStoppedProducer     = errors.New("unable to Put record, producer is already stopped")
	ErrIllegalPartitionKey = errors.New("invalid partition key, length must be at least 1 and at most 256")
	ErrRecordSizeExceeded  = errors.New("data must be less than or equal to 1MB in size")
)

Errors

Functions

This section is empty.

Types

type AggregatedRecord

type AggregatedRecord struct {
	PartitionKeyTable    []string  `protobuf:"bytes,1,rep,name=partition_key_table" json:"partition_key_table,omitempty"`
	ExplicitHashKeyTable []string  `protobuf:"bytes,2,rep,name=explicit_hash_key_table" json:"explicit_hash_key_table,omitempty"`
	Records              []*Record `protobuf:"bytes,3,rep,name=records" json:"records,omitempty"`
	XXX_unrecognized     []byte    `json:"-"`
}

func (*AggregatedRecord) Descriptor

func (*AggregatedRecord) Descriptor() ([]byte, []int)

func (*AggregatedRecord) GetExplicitHashKeyTable

func (m *AggregatedRecord) GetExplicitHashKeyTable() []string

func (*AggregatedRecord) GetPartitionKeyTable

func (m *AggregatedRecord) GetPartitionKeyTable() []string

func (*AggregatedRecord) GetRecords

func (m *AggregatedRecord) GetRecords() []*Record

func (*AggregatedRecord) ProtoMessage

func (*AggregatedRecord) ProtoMessage()

func (*AggregatedRecord) Reset

func (m *AggregatedRecord) Reset()

func (*AggregatedRecord) String

func (m *AggregatedRecord) String() string

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator used to aggregate records into kinesis.PutRecordsRequestEntry

func (*Aggregator) Count

func (a *Aggregator) Count() int

Count return how many records stored in the aggregator.

func (*Aggregator) Drain

func (a *Aggregator) Drain() (*k.PutRecordsRequestEntry, error)

Drain create an aggregated `kinesis.PutRecordsRequestEntry` that compatible with the KCL's deaggregation logic.

If you interested to know more about it. see: aggregation-format.md

func (*Aggregator) Put

func (a *Aggregator) Put(data []byte, partitionKey string)

Put record using `data` and `partitionKey`. This method is not thread-safe.

func (*Aggregator) Size

func (a *Aggregator) Size() int

Size return how many bytes stored in the aggregator. including partition keys.

type Config

type Config struct {
	// StreamName is the Kinesis stream.
	StreamName string

	// FlushInterval is a regular interval for flushing the buffer. Defaults to 5s.
	FlushInterval time.Duration

	// BatchCount determine the maximum number of items to pack in batch.
	// Must not exceed length. Defaults to 500.
	BatchCount int

	// BatchSize determine the maximum number of bytes to send with a PutRecords request.
	// Must not exceed 5MiB; Default to 5MiB.
	BatchSize int

	// AggregateBatchCount determine the maximum number of items to pack into an aggregated record.
	AggregateBatchCount int

	// AggregationBatchSize determine the maximum number of bytes to pack into an aggregated record.
	AggregateBatchSize int

	// BacklogCount determines the channel capacity before Put() will begin blocking. Default to `BatchCount`.
	BacklogCount int

	// Number of requests to sent concurrently. Default to 24.
	MaxConnections int

	// Logger is the logger used. Default to zap.L().
	Logger *zap.Logger

	// Enabling verbose logging. Default to false.
	Verbose bool

	// Client is the Putter interface implementation.
	Client Putter
}

Config is the Producer configuration.

type FailureRecord

type FailureRecord struct {
	Data         []byte
	PartitionKey string
	// contains filtered or unexported fields
}

FailureRecord type

type Producer

type Producer struct {
	sync.RWMutex
	*Config
	// contains filtered or unexported fields
}

Producer batches records.

Example
log := zap.L()
s, err := session.NewSession(aws.NewConfig())
if err != nil {
	log.Fatal(err.Error())
}
client := kinesis.New(s)
pr := New(&Config{
	StreamName:   "test",
	BacklogCount: 2000,
	Client:       client,
	Logger:       log,
})

pr.Start()

// Handle failures
go func() {
	for r := range pr.NotifyFailures() {
		// r contains `Data`, `PartitionKey` and `Error()`
		log.Error(r.Error())
	}
}()

go func() {
	for i := 0; i < 5000; i++ {
		err := pr.Put([]byte("foo"), "bar")
		if err != nil {
			log.With(zap.Error(err)).Fatal("error producing")
		}
	}
}()

time.Sleep(3 * time.Second)
pr.Stop()
Output:

func New

func New(config *Config) *Producer

New creates new producer with the given config.

func (*Producer) NotifyFailures

func (p *Producer) NotifyFailures() <-chan *FailureRecord

NotifyFailures registers and return listener to handle undeliverable messages. The incoming struct has a copy of the Data and the PartitionKey along with some error information about why the publishing failed.

func (*Producer) Put

func (p *Producer) Put(data []byte, partitionKey string) error

Put `data` using `partitionKey` asynchronously. This method is thread-safe.

Under the covers, the Producer will automatically re-attempt puts in case of transient errors. When unrecoverable error has detected(e.g: trying to put to in a stream that doesn't exist), the message will returned by the Producer. Add a listener with `Producer.NotifyFailures` to handle undeliverable messages.

func (*Producer) Start

func (p *Producer) Start()

Start the producer

func (*Producer) Stop

func (p *Producer) Stop()

Stop the producer gracefully. Flushes any in-flight data.

type Putter

type Putter interface {
	PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error)
}

Putter is the interface that wraps the KinesisAPI.PutRecords method.

type Record

type Record struct {
	PartitionKeyIndex    *uint64 `protobuf:"varint,1,req,name=partition_key_index" json:"partition_key_index,omitempty"`
	ExplicitHashKeyIndex *uint64 `protobuf:"varint,2,opt,name=explicit_hash_key_index" json:"explicit_hash_key_index,omitempty"`
	Data                 []byte  `protobuf:"bytes,3,req,name=data" json:"data,omitempty"`
	Tags                 []*Tag  `protobuf:"bytes,4,rep,name=tags" json:"tags,omitempty"`
	XXX_unrecognized     []byte  `json:"-"`
}

func (*Record) Descriptor

func (*Record) Descriptor() ([]byte, []int)

func (*Record) GetData

func (m *Record) GetData() []byte

func (*Record) GetExplicitHashKeyIndex

func (m *Record) GetExplicitHashKeyIndex() uint64

func (*Record) GetPartitionKeyIndex

func (m *Record) GetPartitionKeyIndex() uint64

func (*Record) GetTags

func (m *Record) GetTags() []*Tag

func (*Record) ProtoMessage

func (*Record) ProtoMessage()

func (*Record) Reset

func (m *Record) Reset()

func (*Record) String

func (m *Record) String() string

type Tag

type Tag struct {
	Key              *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
	Value            *string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
	XXX_unrecognized []byte  `json:"-"`
}

func (*Tag) Descriptor

func (*Tag) Descriptor() ([]byte, []int)

func (*Tag) GetKey

func (m *Tag) GetKey() string

func (*Tag) GetValue

func (m *Tag) GetValue() string

func (*Tag) ProtoMessage

func (*Tag) ProtoMessage()

func (*Tag) Reset

func (m *Tag) Reset()

func (*Tag) String

func (m *Tag) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL