ddb

package module
v0.0.0-...-108f04c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2017 License: MIT Imports: 12 Imported by: 0

README

ddb

A collection of DynamoDB helpers written in Golang to assit with reading and writing data.

Installation

go get github.com/clearbit/go-ddb

Parallel Scan

To get maximum read throughput from a DynamodDB table we can leverage the Parallel Scan functionality.

// structure of DDB item
type message struct {
    name string `json:"name"`
}

// new scanner with table name and total segments
scanner := ddb.NewScanner(ddb.Config{
    TableName:     "ddb-table-name",
    TotalSegments: 150,   // calculate value: (table size GB / 2GB)
    SegmentOffset: 0, // optional param for controlling offset
    SegmentCount:  150, // optional param for controlling how many routines get created
})

// start parallel scan w/ handler func
scanner.Start(ddb.HandlerFunc(func(items ddb.Items) {
    for _, item := range items {
        var msg message
        dynamodbattribute.UnmarshalMap(item, &msg)
        fmt.Println(msg.Name)
    }
}))

// wait for all scans to complete
scanner.Wait()

Leverage a checkpoint table to store the last evaluated key of a scan:

scanner := ddb.NewScanner(ddb.Config{
    TableName:           "ddb-table-name",
    CheckpointTableName: "checkpoint-production",  // name of table to store last evaluated keys
    CheckpointNamespace: "my-sample-app",          // namespace to avoid collisions with other scripts
    TotalSegments:       150,
})

License

go-ddb is copyright © 2016 Clearbit. It is free software, and may be redistributed under the terms specified in the LICENSE file.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint struct {
	Svc       *dynamodb.DynamoDB
	Namespace string
	TableName string
}

Checkpoint wraps the interactions with dynamo for setting/getting checkpoints

func (*Checkpoint) Get

func (c *Checkpoint) Get(segment int) LastEvaluatedKey

Get returns the exclusive start key for current segment

func (*Checkpoint) Set

func (c *Checkpoint) Set(segment int, lastEvaluatedKey LastEvaluatedKey)

Set the lastEvaluatedKey as most recent checkpoint

type Config

type Config struct {
	// Svc the dynamodb connection
	Svc *dynamodb.DynamoDB

	// AwsRegion is the region the database is in. Defaults to us-west-1
	AwsRegion string

	// TableName is name of table to scan
	TableName string

	// SegmentOffset determines where to start indexing from
	SegmentOffset int

	// SegmentCount deterines how big a segment is
	SegmentCount int

	// TotalSegments determines the global amount of concurrency this will use
	TotalSegments int

	// Checkpoint
	Checkpoint *Checkpoint

	// CheckpointTableName is the name of checkpont table
	CheckpointTableName string

	// CheckpointNamespace is the unique namespace for checkpoints. This must be unique so
	// checkpoints so differnt scripts can maintain their own checkpoints.
	CheckpointNamespace string

	// Limit is the number of records to return during scan
	Limit int64
}

Config is wrapper around the configuration variables

type Handler

type Handler interface {
	HandleItems(items Items)
}

Handler is interface for handling items from segment scan

type HandlerFunc

type HandlerFunc func(items Items)

HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:

scanner.Start(ddb.HandlerFunc(func(items ddb.Items) {
  // ...
}))

func (HandlerFunc) HandleItems

func (h HandlerFunc) HandleItems(items Items)

HandleItems implements the Handler interface

type Items

type Items []map[string]*dynamodb.AttributeValue

Items is the response from DDB scan

type LastEvaluatedKey

type LastEvaluatedKey map[string]*dynamodb.AttributeValue

LastEvaluatedKey is the attribute value of the last evaluated key in a scan

type Scanner

type Scanner struct {
	Config
	CompletedSegments *expvar.Int
	// contains filtered or unexported fields
}

Scanner is

func NewScanner

func NewScanner(config Config) *Scanner

NewScanner creates a new scanner with ddb connection

func (*Scanner) Start

func (s *Scanner) Start(handler Handler)

Start uses the handler function to process items for each of the total shard

func (*Scanner) Wait

func (s *Scanner) Wait()

Wait pauses program until waitgroup is fulfilled

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL