rin

package module
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: MIT Imports: 30 Imported by: 0

README

Rin

Rin is a Redshift data Importer by SQS messaging.

Architecture

  1. (Someone) creates a S3 object.
  2. S3 event notifications will send to a message to SQS.
  3. Rin will fetch messages from SQS, and publish a "COPY" query to Redshift.

Installation

Binary packages

Releases

Homebrew
$ brew install fujiwara/tap/rin
Docker

GitHub Packages

$ docker pull ghcr.io/fujiwara/rin:v1.1.3

Configuration

Configuring Amazon S3 Event Notifications.

  1. Create SQS queue.
  2. Attach SQS access policy to the queue. Example Walkthrough 1:
  3. Enable Event Notifications on a S3 bucket.
  4. Run rin process with configuration for using the SQS and S3.
config.yaml
queue_name: my_queue_name    # SQS queue name

credentials:
  aws_region: ap-northeast-1

redshift:
  host: localhost
  port: 5439
  dbname: test
  user: test_user
  password: '{{ must_env "REDSHIFT_PASSWORD" }}'
  schema: public
  reconnect_on_error: true # disconnect Redshift on error occurred

s3:
  bucket: test.bucket.test
  region: ap-northeast-1

sql_option: "JSON 'auto' GZIP"       # COPY SQL option

# define import target mappings
targets:
  - s3:
      key_prefix: test/foo/ignore
    discard: true  # Do not import and do not try following targets. Matches only.

- redshift:
      table: foo
    s3:
      key_prefix: test/foo

  - redshift:
      schema: xxx
      table: bar
    s3:
      key_prefix: test/bar
    break: true       # Do not try following targets.

  - redshift:
      schema: $1      # expand by key_regexp captured value.
      table: $2
    s3:
      key_regexp: test/schema-([a-z]+)/table-([a-z]+)/

  - redshift:
      host: redshift.example.com       # override default section in this target
      port: 5439
      dbname: example
      user: example_user
      password: example_pass
      schema: public
      table: example
    s3:
      bucket: redshift.example.com
      region: ap-northeast-1
      key_prefix: logs/example/
    sql_option: "CSV DELIMITER ',' ESCAPE"

A configuration file is parsed by kayac/go-config.

go-config expands environment variables using syntax {{ env "FOO" }} or {{ must_env "FOO" }} in a configuration file.

When the password for Redshift is empty, Rin will try call GetClusterCredentials API to get a temporary password for the cluster.

Credentials

Rin requires credentials for SQS and Redshift.

  1. credentials.aws_access_key_id and credentials.aws_secret_access_key
  • used for SQS and Redshift(COPY query and Data API access).
  1. credentials.aws_iam_role
  • used for Redshift COPY query only.
  • for SQS and Redshift Data API, Rin will try to get a instance credentials.

Run

daemon mode

Rin waits new SQS messages and processing it continually.

$ rin -config config.yaml [-debug]

-config also accepts HTTP/S3/File URL to specify the location of configuration file. For example,

$ rin -config s3://rin-config.my-bucket/config.yaml
batch mode

Rin process new SQS messages and exit.

$ rin -config config.yaml -batch [-debug]

Set max execution time

A CLI option -max-execution-time is set max execution time for running SQS worker and batch process.

SQL Drivers

Rin has two ways to connect to Redshift.

postgres driver

postgres driver is the default. Rin connects to Redshift with PostgreSQL protocol over TCP in the VPC network.

host, port, user and password fields are required in the redshift section.

redshift:
  driver: postgres # default
  host: localhost
  port: 5439
  user: test_user
  password: '{{ must_env "REDSHIFT_PASSWORD" }}'
redshift-data driver

redshift-data driver connects to Redshift via Redshift Data API.

Redshift Data API does not require a VPC network.

With provisoned cluster, driver, cluster and user are required.

redshift:
  driver: redshift-data
  cluster: your-cluster-name
  user: test_user

With Redshift serverless, driver, workgroup are required.

redshift:
  driver: redshift-data
  workgroup: your-workgroup-name

See also github.com/mashiike/redshift-data-sql-driver.

Documentation

Index

Constants

View Source
const (
	S3URITemplate = "s3://%s/%s"
	SQLTemplate   = "/* Rin */ COPY %s FROM %s CREDENTIALS '%s' REGION '%s' %s"

	DriverPostgres     = "postgres"
	DriverRedshiftData = "redshift-data"
)

Variables

View Source
var (
	DBPool      = make(map[string]*sql.DB, 0)
	DBPoolMutex sync.Mutex
)
View Source
var MaxDeleteRetry = 8

Functions

func BoolValue added in v1.3.0

func BoolValue(b *bool) bool

func DryRun added in v1.1.1

func DryRun(configFile string, opt *Option) error

func Import

func Import(ctx context.Context, event Event) (int, error)

func Run

func Run(configFile string, opt *Option) error

func RunWithContext added in v1.0.0

func RunWithContext(ctx context.Context, configFile string, opt *Option) error

Types

type BatchItemFailureItem added in v1.3.0

type BatchItemFailureItem struct {
	ItemIdentifier string `json:"itemIdentifier"`
}

type Config

type Config struct {
	QueueName   string      `yaml:"queue_name"`
	Targets     []*Target   `yaml:"targets"`
	Credentials Credentials `yaml:"credentials"`
	Redshift    *Redshift   `yaml:"redshift"`
	S3          *S3         `yaml:"s3"`
	SQLOption   string      `yaml:"sql_option"`
}

func LoadConfig

func LoadConfig(ctx context.Context, path string) (*Config, error)

type Credentials

type Credentials struct {
	AWS_ACCESS_KEY_ID     string `yaml:"aws_access_key_id"`
	AWS_SECRET_ACCESS_KEY string `yaml:"aws_secret_access_key"`
	AWS_REGION            string `yaml:"aws_region"`
	AWS_IAM_ROLE          string `yaml:"aws_iam_role"`
}

func (Credentials) RedshiftCredential added in v0.1.2

func (c Credentials) RedshiftCredential() string

type Event

type Event struct {
	Records []*EventRecord `json:"Records"`
	Event   string
	Bucket  string
}

func ParseEvent

func ParseEvent(b []byte) (Event, error)

func (Event) IsTestEvent added in v1.1.2

func (e Event) IsTestEvent() bool

func (Event) String added in v0.0.2

func (e Event) String() string

type EventRecord

type EventRecord struct {
	EventVersion string  `json:"eventVersion"`
	EventName    string  `json:"eventName"`
	EventSource  string  `json:"eventSource"`
	EventTime    string  `json:"eventTime"`
	AWSRegion    string  `json:"awsRegion"`
	S3           S3Event `json:"s3"`
}

func (EventRecord) String

func (r EventRecord) String() string

type MaxExecutionTimeReachedError added in v1.3.2

type MaxExecutionTimeReachedError struct{}

func (MaxExecutionTimeReachedError) Error added in v1.3.2

type NoMessageError added in v0.0.6

type NoMessageError struct {
	// contains filtered or unexported fields
}

func (NoMessageError) Error added in v0.0.6

func (e NoMessageError) Error() string

type Option added in v1.3.2

type Option struct {
	MaxExecutionTime time.Duration `json:"max_execution_time"`
	BatchMode        bool          `json:"batch_mode"`
}

func (*Option) String added in v1.3.2

func (o *Option) String() string

type Redshift

type Redshift struct {
	Driver string `yaml:"driver"`

	// for postgres driver
	Host string `yaml:"host"`
	Port int    `yaml:"port"`

	// for redshift-data driver provisioned
	Cluster string `yaml:"cluster"`

	// for redshift-data driver serverless
	Workgroup string `yaml:"workgroup"`

	DBName           string `yaml:"dbname"`
	User             string `yaml:"user"`
	Password         string `yaml:"password"`
	Schema           string `yaml:"schema"`
	Table            string `yaml:"table"`
	ReconnectOnError *bool  `yaml:"reconnect_on_error"`
}

func (Redshift) DSN

func (r Redshift) DSN() string

func (Redshift) DSNWith added in v1.0.0

func (r Redshift) DSNWith(user string, password string) string

func (Redshift) String

func (r Redshift) String() string

func (Redshift) UseTransaction added in v1.3.0

func (r Redshift) UseTransaction() bool

func (Redshift) VisibleDSN added in v0.0.8

func (r Redshift) VisibleDSN() string

type S3

type S3 struct {
	Region    string `yaml:"region"`
	Bucket    string `yaml:"bucket"`
	KeyPrefix string `yaml:"key_prefix"`
	KeyRegexp string `yaml:"key_regexp"`
}

func (S3) String

func (s3 S3) String() string

type S3Bucket

type S3Bucket struct {
	Name string `json:"name"`
	ARN  string `json:"arn"`
}

type S3Event

type S3Event struct {
	S3SchemaVersion string   `json:"s3SchemaVersion"`
	ConfigurationID string   `json:"configurationId"`
	Bucket          S3Bucket `json:"bucket"`
	Object          S3Object `json:"object"`
}

type S3Object

type S3Object struct {
	Key  string `json:"key"`
	Size int64  `json:"size"`
	ETag string `json:"eTag"`
}

type SQLParam

type SQLParam struct {
	Table  string
	Option string
}

type SQSBatchResponse added in v1.3.0

type SQSBatchResponse struct {
	BatchItemFailures []BatchItemFailureItem `json:"batchItemFailures,omitempty"`
}

type SessionStore added in v1.0.0

type SessionStore struct {
	SQS            *aws.Config
	SQSOptFns      []func(*sqs.Options)
	Redshift       *aws.Config
	RedshiftOptFns []func(*redshift.Options)
	S3             *aws.Config
	S3OptFns       []func(*s3.Options)
}
var Sessions *SessionStore

type SnsEvent added in v1.2.0

type SnsEvent struct {
	Message *string
}

type Target

type Target struct {
	Redshift  *Redshift `yaml:"redshift"`
	S3        *S3       `yaml:"s3"`
	SQLOption string    `yaml:"sql_option"`
	Break     bool      `yaml:"break"`
	Discard   bool      `yaml:"discard"`
	// contains filtered or unexported fields
}

func (*Target) BuildCopySQL

func (t *Target) BuildCopySQL(key string, cred Credentials, capture *[]string) (string, error)

func (*Target) ConnectToRedshift added in v1.3.0

func (target *Target) ConnectToRedshift(ctx context.Context) (*sql.DB, error)

func (*Target) DisconnectToRedshift added in v1.3.0

func (target *Target) DisconnectToRedshift()

func (*Target) ImportRedshift added in v1.3.0

func (target *Target) ImportRedshift(ctx context.Context, record *EventRecord, cap *[]string) error

func (*Target) Match

func (t *Target) Match(bucket, key string) (bool, *[]string)

func (*Target) MatchEventRecord

func (t *Target) MatchEventRecord(r *EventRecord) (bool, *[]string)

func (*Target) String added in v0.0.8

func (t *Target) String() string

Directories

Path Synopsis
cmd
rin

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL