connector

package module
v0.0.0-...-d7f8826 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2021 License: MIT Imports: 9 Imported by: 0

README

Golang Kinesis Connectors

Kinesis connector applications written in Go

With the new release of Kinesis Firehose I'd recommend using the Lambda Streams to Firehose project for loading data directly into S3 and Redshift.

Inspired by the Amazon Kinesis Connector Library. This library is intended to be a lightweight wrapper around the Kinesis API to handle batching records, setting checkpoints, respecting ratelimits, and recovering from network errors.

golang_kinesis_connector

Overview

The consumer expects a handler func that will process a buffer of incoming records.

func main() {
  var(
    app    = flag.String("app", "", "The app name")
    stream = flag.String("stream", "", "The stream name")
  )
  flag.Parse()

  // create new consumer
  c := connector.NewConsumer(connector.Config{
    AppName:        *app,
    MaxRecordCount: 400,
    Streamname:     *stream,
  })

  // process records from the stream
  c.Start(connector.HandlerFunc(func(b connector.Buffer) {
    fmt.Println(b.GetRecords())
  }))

  select {}
}
Config

The default behavior for checkpointing uses Redis on localhost. To set a custom Redis URL pass the RedisURL into the config struct

Logging

Apex Log is used for logging Info. Override the logs format with other Log Handlers. For example using the "json" log handler:

import(
  "github.com/apex/log"
  "github.com/apex/log/handlers/json"
)

func main() {
  // ...

  log.SetHandler(json.New(os.Stderr))
  log.SetLevel(log.DebugLevel)
}

Which will producde the following logs:

  INFO[0000] processing                app=test shard=shardId-000000000000 stream=test
  INFO[0008] emitted                   app=test count=500 shard=shardId-000000000000 stream=test
  INFO[0012] emitted                   app=test count=500 shard=shardId-000000000000 stream=test
Installation

Get the package source:

$ go get github.com/gametimesf/kinesis-connectors
Examples

Use the seed stream code to put sample data onto the stream.

Contributing

Please see CONTRIBUTING.md for more information. Thank you, contributors!

License

Copyright (c) 2015 Harlow Ward. It is free software, and may be redistributed under the terms specified in the LICENSE file.

www.hward.com  ·  GitHub @harlow  ·  Twitter @harlow_ward

Documentation

Index

Constants

View Source
const (
	ShardIteratorAfterSequenceNumber = "AFTER_SEQUENCE_NUMBER"
	ShardIteratorAtSequenceNumber    = "AT_SEQUENCE_NUMBER"
	ShardIteratorAtTimestamp         = "AT_TIMESTAMP"
	ShardIteratorLatest              = "LATEST"
	ShardIteratorTrimHorizon         = "TRIM_HORIZON"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Buffer

type Buffer struct {
	MaxRecordCount int
	// contains filtered or unexported fields
}

Buffer holds records and answers questions on when it should be periodically flushed.

func (*Buffer) AddRecord

func (b *Buffer) AddRecord(r *kinesis.Record)

AddRecord adds a record to the buffer.

func (*Buffer) FirstSeq

func (b *Buffer) FirstSeq() string

FirstSequenceNumber returns the sequence number of the first record in the buffer.

func (*Buffer) Flush

func (b *Buffer) Flush()

Flush empties the buffer and resets the sequence counter.

func (*Buffer) GetRecords

func (b *Buffer) GetRecords() []*kinesis.Record

GetRecords returns the records in the buffer.

func (*Buffer) LastSeq

func (b *Buffer) LastSeq() string

LastSeq returns the sequence number of the last record in the buffer.

func (*Buffer) RecordCount

func (b *Buffer) RecordCount() int

RecordCount returns the number of records in the buffer.

func (*Buffer) ShardID

func (b *Buffer) ShardID() string

ShardID returns the shard ID watched by the consumer

func (*Buffer) ShouldFlush

func (b *Buffer) ShouldFlush() bool

ShouldFlush determines if the buffer has reached its target size.

type Checkpoint

type Checkpoint interface {
	CheckpointExists(string) bool
	SequenceNumber(string) string
	SetCheckpoint(string, string)
}

Checkpoint interface for functions that checkpoints need to implement in order to track consumer progress.

type Config

type Config struct {
	// AppName is the application name and checkpoint namespace.
	AppName string

	// StreamName is the Kinesis stream.
	StreamName string

	// FlushInterval is a regular interval for flushing the buffer. Defaults to 1s.
	FlushInterval time.Duration

	// BufferSize determines the batch request size. Must not exceed 500. Defaults to 500.
	BufferSize int

	// MaxRetries sets the Kinesis client's retry limit. Deafults to 10.
	MaxRetries int

	// Logger is the logger used. Defaults to log.Log.
	Logger log.Interface

	// Checkpoint for tracking progress of consumer.
	Checkpoint Checkpoint

	// Shard Iterator if not checkpoint
	ShardIteratorType string

	// URL for Redis Checkpoint for tracking progress of consumer. Defaults to 127.0.0.1:6379
	RedisURL string

	// URL for Kinesis, used for local development. Defaults to empty and is discovered by the Kinesis client.
	KinesisURL string
}

Config vars for the application

type Consumer

type Consumer struct {
	Config
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config Config) *Consumer

NewConsumer creates a new consumer with initialied kinesis connection

func (*Consumer) Start

func (c *Consumer) Start(handler Handler)

Start takes a handler and then loops over each of the shards processing each one with the handler.

type Handler

type Handler interface {
	HandleRecords(b Buffer)
}

type HandlerFunc

type HandlerFunc func(b Buffer)

HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:

consumer.AddHandler(connector.HandlerFunc(func(b Buffer) {
  // ...
}))

func (HandlerFunc) HandleRecords

func (h HandlerFunc) HandleRecords(b Buffer)

HandleRecords implements the Handler interface

type RedisCheckpoint

type RedisCheckpoint struct {
	AppName    string
	StreamName string
	// contains filtered or unexported fields
}

RedisCheckpoint implements the Checkpont interface. Used to enable the Pipeline.ProcessShard to checkpoint it's progress while reading records from Kinesis stream.

func (*RedisCheckpoint) CheckpointExists

func (c *RedisCheckpoint) CheckpointExists(shardID string) bool

CheckpointExists determines if a checkpoint for a particular Shard exists. Typically used to determine whether we should start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).

func (*RedisCheckpoint) SequenceNumber

func (c *RedisCheckpoint) SequenceNumber(shardID string) string

SequenceNumber returns the current checkpoint stored for the specified shard.

func (*RedisCheckpoint) SetCheckpoint

func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string)

SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). Upon failover, record processing is resumed from this point.

Directories

Path Synopsis
emitter
s3
examples
s3

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL