ddbstreams

package module
v0.0.0-...-76b4aa3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 24, 2018 License: Apache-2.0 Imports: 13 Imported by: 0

README

GoDoc Build Status

ddbstreams

ddbstreams is a DynamoDB Streams consumer.

Usage

When subscribing to a table with ddbstreams, a goroutine will be spawned that periodically polls the underlying dynamodb stream for the table. ddbstreams handles tracking stream splits. Records from a given stream shard will be processed in order, but there's no ordering guarantee between peer stream shards.

Offsets

ddbstreams keeps track of offsets and allows the stream processor to continue from where it left off similar to a Kafka consumer.

Example

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/dynamodbstreams"
	"github.com/savaki/ddbstreams"
)

func main() {
	var (
		cfg       = aws.NewConfig().WithRegion("us-east-1")
		sess      = session.Must(session.NewSession(cfg))
		api       = dynamodbstreams.New(sess)
		processor = ddbstreams.New(api)
		ctx       = context.Background()
		tableName = "blah"
	)

	handler := func(ctx context.Context, record *dynamodbstreams.StreamRecord) error {
		fmt.Println("received record")
		return nil
	}

	sub, err := processor.Subscribe(ctx, tableName, handler)
	if err != nil {
		log.Fatalln(err)
	}
	defer sub.Close()

	// allow time for some records to be processed
	time.Sleep(5 * time.Second)
}

Options

  • WithOffsetManager - defines storage for persistent offsets
  • WithAutoCommit - requests offsets be published after every commit. Only used in conjunction with WithOffsetManager or WithOffsetManagerDynamoDB

Prerequisites

Requires go 1.9 or later.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type HandlerFunc

type HandlerFunc func(ctx context.Context, record *dynamodbstreams.StreamRecord) error

HandlerFunc wraps custom stream processing code

type Offset

type Offset struct {
	ShardID        string
	SequenceNumber string
}

type OffsetFinder

type OffsetFinder interface {
	Find(ctx context.Context, groupID, tableName string) ([]Offset, error)
}

type OffsetManager

type OffsetManager interface {
	OffsetSaver
	OffsetFinder
}

type OffsetSaver

type OffsetSaver interface {
	Save(ctx context.Context, groupID, tableName string, offsets ...Offset) error
}

type Option

type Option func(*Options)

func WithAutoCommit

func WithAutoCommit() Option

WithAutoCommit indicates offsets should be saved after each successful commit. Caution should be used when enabling this as streams with high traffic will generate a significant number of commits.

func WithDebug

func WithDebug(logFunc func(args ...interface{})) Option

WithDebug indicates delay between polling requests on open (e.g. not complete) shards

func WithGroupID

func WithGroupID(groupID string) Option

func WithInitialOffsets

func WithInitialOffsets(offsets ...Offset) Option

func WithOffsetInterval

func WithOffsetInterval(interval time.Duration) Option

func WithOffsetManager

func WithOffsetManager(manager OffsetManager) Option

func WithOffsetManagerDynamoDB

func WithOffsetManagerDynamoDB(api dynamodbiface.DynamoDBAPI, tableName string) Option

func WithPollInterval

func WithPollInterval(pollInterval time.Duration) Option

WithPollInterval indicates delay between polling requests on open (e.g. not complete) shards

func WithTrace

func WithTrace(printFunc func(args ...interface{})) Option

WithTrace indicates delay between polling requests on open (e.g. not complete) shards

type Options

type Options struct {
	// contains filtered or unexported fields
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

func (*Processor) Subscribe

func (p *Processor) Subscribe(ctx context.Context, tableName string, h HandlerFunc, opts ...Option) (*Subscriber, error)

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber reference the subscription to the stream

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close the subscription, freeing any consumed resources

func (*Subscriber) Flush

func (s *Subscriber) Flush()

Flush offsets to persistent store

func (*Subscriber) Wait

func (s *Subscriber) Wait() error

Directories

Path Synopsis
_examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL