gokini

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2020 License: MIT Imports: 23 Imported by: 4

README

Gokini

GoDoc Build Status

A Golang Kinesis Consumer Library with minimal dependencies. This library does not depend on the Java MultiLangDaemon but does use the AWS SDK.

Project Goals

This project aims to provide feature parity with the Kinesis Client Library including:

  • Enumerates shards

  • Coordinates shard associations with other workers

  • Instantiates a record processor for every shard it manages

  • Checkpoints processed records

  • Balances shard-worker associations when the worker instance count changes

  • Balances shard-worker associations when shards are split or merged

  • Instrumentation that supports CloudWatch (partial support)

  • Support enhanced fan-out consumers

  • Support aggregated records from Kinesis Producer library

Development Status

Beta - Ready to be used in non-critical Production environments.

Actively used (via a fork) by VMWare

Testing

Unit tests can be run with:

go test consumer_test.go consumer.go checkpointer_test.go checkpointer.go monitoring.go monitoring_test.go

Integration tests can be run in docker with:

make docker-integration

Documentation

Index

Examples

Constants

View Source
const (

	// ErrLeaseNotAquired is returned when we failed to get a lock on the shard
	ErrLeaseNotAquired = "Lease is already held by another node"
	// ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table
	ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created"
)
View Source
const (

	// ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords
	// But it's not a constant?
	ErrCodeKMSThrottlingException = "KMSThrottlingException"
)

Variables

View Source
var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")

ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found

Functions

This section is empty.

Types

type Checkpointer

type Checkpointer interface {
	Init() error
	GetLease(*shardStatus, string) error
	CheckpointSequence(*shardStatus) error
	FetchCheckpoint(*shardStatus) error
	ListActiveWorkers() (map[string][]string, error)
	ClaimShard(*shardStatus, string) error
}

Checkpointer handles checkpointing when a record has been processed

type DynamoCheckpoint

type DynamoCheckpoint struct {
	TableName          string
	LeaseDuration      int
	Retries            int
	ReadCapacityUnits  *int64
	WriteCapacityUnits *int64
	BillingMode        *string
	Session            *session.Session
	// contains filtered or unexported fields
}

DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend

func (*DynamoCheckpoint) CheckpointSequence

func (c *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error

CheckpointSequence writes a checkpoint at the designated sequence ID

func (*DynamoCheckpoint) ClaimShard added in v0.0.6

func (c *DynamoCheckpoint) ClaimShard(shard *shardStatus, claimID string) error

func (*DynamoCheckpoint) FetchCheckpoint

func (c *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error

FetchCheckpoint retrieves the checkpoint for the given shard

func (*DynamoCheckpoint) GetLease

func (c *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo string) error

GetLease attempts to gain a lock on the given shard

func (*DynamoCheckpoint) Init

func (c *DynamoCheckpoint) Init() error

Init initialises the DynamoDB Checkpoint

func (*DynamoCheckpoint) ListActiveWorkers added in v0.0.6

func (c *DynamoCheckpoint) ListActiveWorkers() (map[string][]string, error)

type KinesisConsumer

type KinesisConsumer struct {
	StreamName                  string
	ShardIteratorType           string
	RecordConsumer              RecordConsumer
	EmptyRecordBackoffMs        int
	LeaseDuration               int
	Monitoring                  MonitoringConfiguration
	DisableAutomaticCheckpoints bool
	Retries                     *int
	IgnoreShardOrdering         bool
	TableName                   string
	DynamoReadCapacityUnits     *int64
	DynamoWriteCapacityUnits    *int64
	DynamoBillingMode           *string
	Session                     *session.Session // Setting session means Retries is ignored

	sync.WaitGroup
	// contains filtered or unexported fields
}

KinesisConsumer contains all the configuration and functions necessary to start the Kinesis Consumer

func (*KinesisConsumer) Checkpoint added in v0.0.3

func (kc *KinesisConsumer) Checkpoint(shardID string, sequenceNumber string) error

Checkpoint records the sequence number for the given shard ID as being processed

func (*KinesisConsumer) Shutdown

func (kc *KinesisConsumer) Shutdown()

Shutdown stops consuming records gracefully

func (*KinesisConsumer) StartConsumer

func (kc *KinesisConsumer) StartConsumer() error

StartConsumer starts the RecordConsumer, calls Init and starts sending records to ProcessRecords

type MonitoringConfiguration

type MonitoringConfiguration struct {
	MonitoringService string // Type of monitoring to expose. Supported types are "prometheus"
	Prometheus        prometheusMonitoringService
	CloudWatch        cloudWatchMonitoringService
	// contains filtered or unexported fields
}

MonitoringConfiguration allows you to configure how record processing metrics are exposed

type RecordConsumer

type RecordConsumer interface {
	Init(string) error
	ProcessRecords([]*Records, *KinesisConsumer)
	Shutdown()
}

RecordConsumer is the interface consumers will implement

Example
package main

import (
	"fmt"
	"time"
)

type PrintRecordConsumer struct {
	shardID string
}

func (p *PrintRecordConsumer) Init(shardID string) error {
	fmt.Printf("Checkpointer initializing\n")
	p.shardID = shardID
	return nil
}

func (p *PrintRecordConsumer) ProcessRecords(records []*Records, consumer *KinesisConsumer) {
	if len(records) > 0 {
		fmt.Printf("%s\n", records[0].Data)
	}
}

func (p *PrintRecordConsumer) Shutdown() {
	fmt.Print("PrintRecordConsumer Shutdown\n")
}

func main() {
	// An implementation of the RecordConsumer interface that prints out records
	rc := &PrintRecordConsumer{}
	kc := &KinesisConsumer{
		StreamName:           "KINESIS_STREAM",
		ShardIteratorType:    "TRIM_HORIZON",
		RecordConsumer:       rc,
		TableName:            "gokini",
		EmptyRecordBackoffMs: 1000,
	}

	// Send records to our kinesis stream so we have something to process
	pushRecordToKinesis("KINESIS_STREAM", []byte("foo"), true)
	defer deleteStream("KINESIS_STREAM")
	defer deleteTable("gokini")

	err := kc.StartConsumer()
	if err != nil {
		fmt.Printf("Failed to start consumer: %s", err)
	}

	// Wait for it to do it's thing
	time.Sleep(200 * time.Millisecond)
	kc.Shutdown()

}
Output:

Checkpointer initializing
foo
PrintRecordConsumer Shutdown

type Records

type Records struct {
	Data           []byte `json:"data"`
	PartitionKey   string `json:"partitionKey"`
	SequenceNumber string `json:"sequenceNumber"`
	ShardID        string `json:"shardID"`
}

Records is structure for Kinesis Records

type Worker added in v0.0.6

type Worker struct {
	UUID   string
	Shards []string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL