dynasc

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2024 License: MIT Imports: 15 Imported by: 0

README

dynasc

A cross-platform client tool for DynamoDB Streams written in Go.

Release GitHub Actions Go Report Card GoDoc Docker Pulls

Overview

Dynasc (DynamoDB Streams Client) is a client tool for processing stream records written by DynamoDB Streams.

Dynasc reads the stream records from the any shards in DynamoDB Streams and invokes any lambda functions with the payloads that contain the DynamoDB Streams record event. The primary use case is to emulate the integration of Amazon DynamoDB and AWS Lambda in your own development environment while combining it with tools such as DynamoDB local and AWS SAM.

[!NOTE]
Do not use this tool in a production environment. You can create triggers to integrate Amazon DynamoDB with AWS Lambda without this tool.

Installing

Pre-Build binaries

Prebuilt binaries are available for a variety of operating systems and architectures. Visit the latest release page, and scroll down to the Assets section.

  1. Download the archive for the desired operating system and architecture.
  2. Extract the archive.
  3. Move the executable to the desired directory.
  4. Add this directory to the PATH environment variable.
  5. Verify that you have execute permission on the file.

Please consult your operating system documentation if you need help setting file permissions or modifying your PATH environment variable.

If you do not see a prebuilt binary for the desired operating system and architecture, install Dynasc using one of the methods described below.

Build from source

To build from source you must:

  1. Install Git.
  2. Install Go version 1.22 or later.

Then build and test:

go install github.com/omoide/dynasc/cmd/dynasc@latest
dynasc -v

Usage

Start a client:

dynasc --dynamo-endpoint http://localhost:8000 --lambda-endpoint http://localhost:3000 --triggers TableA=Function1,TableB=Function2

You can configure Dynasc in in several ways, such as system or user environment variables, a local configuration file, or explicitly declared command line options.
Configuration settings take precedence in the following order. Each item takes precedence over the item below it:

  1. command line options
  2. environment variables
  3. configuration file
Command line options

You can use the following command line options to override any configuration file setting, or environment variable setting.

Option Description
--dynamo-endpoint <string> Specifies an Amazon DynamoDB endpoint to read stream records.
You need to specify this endpoint, if you are emulating Amazon DynamoDB in local using tools such as DynamoDB Local or Local Stack.
If this option is not specified, the standard service endpoint is used automatically based on the specified AWS Region.
--lambda-endpoint <string> Specifies an AWS Lambda endpoint to execute functions.
You need to specify this endpoint, if you are emulating AWS Lambda in local using tools such as AWS SAM CLI.
If this option is not specified, the standard service endpoint is used automatically based on the specified AWS Region.
--triggers <string> Specifies trigger definition consisting of a set of key-value pairs of Amazon DynamoDB table names and AWS Lambda function names.

Multiple key-value pairs are separated by commas, for example: TableA=Function1,TableB=Function2.
You can also define multiple functions for the same table, for example: TableA=Function1,TableA=Function2.

Based on this definition, Dynasc polls shards in the streams of the specified tables. When stream records are available, Dynasc invokes corresponding Lambda function.
--config <string> Specifies a path and file name of the configuration file containing default parameter values to use.
For more information about configuration files, see configuration file.
-d, --debug A Boolean switch that enables debug logging.
The --debug option provides the full Go logs. This includes additional stderr diagnostic information about the operation of the command that can be useful when troubleshooting why a command provides unexpected results.
-h, --help A Boolean switch that displays help for the options.
-v, --version A Boolean switch that displays the current version of Dynasc.
Ennvironment variables

Environment variables provide another way to specify configuration options.
Dynasc supports the following environment variables.

Option Description
AWS_REGION Specifies the AWS Region to send the request to.
AWS_ACCESS_KEY_ID Specifies an AWS access key associated with an IAM account.
AWS_SECRET_ACCESS_KEY Specifies the secret key associated with the access key.
DYNASC_DYNAMO_ENDPOINT Same as the command line --dynamo-endpoint option.
DYNASC_LAMBDA_ENDPOINT Same as the command line --lambad-endpoint option.
DYNASC_TRIGGERS Same as the command line --triggers option.
However, multiple key-value par must be separated by spaces, not commas, for example: TableA=Function1 TableB=Function2.
DYNASC_CONFIG Same as the command line --config option.

If you use a local DynamoDB or Lambda that cares about credentials, you will specify AWS_REGION, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Configurationn file

Dynasc supports a configuration file that you can use to configure Dynasc command parameter values.
Supported formats are JSON, YAML, TOML, and HCL.

Samples of condfiguration file are as follows.

JSON
{
  "dynamo-endpoint": "http://localhost:8000",
  "lambda-endpoint": "http://localhost:3001",
  "triggers": [
    { "table": "Table1", "functions": ["Function1", "Function2"] },
    { "table": "Table2", "functions": "Function1" }
  ]
}
YAML
dynamo-endpoint: http://localhost:8000
lambda-endpoint: http://localhost:3001
triggers:
  - table: Table1
    functions:
      - Function1
      - Function2
  - table: Table2
    functions: Function1

As you can see from the samples above, functions element of trigger can be specified as string or array of strings.

License

Dynasc is released under the MIT license. See LICENSE.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BoolPtr

func BoolPtr(v bool) *bool

BoolPtr returns a pointer to bool.

func BoolValue

func BoolValue(v *bool) bool

BoolValue returns the value of the bool pointer passed in or false if the pointer is nil.

func Float32Ptr

func Float32Ptr(v float32) *float32

Float32Ptr returns a pointer to float32.

func Float32Value

func Float32Value(v *float32) float32

Float32Value returns the value of the float32 pointer passed in or 0 if the pointer is nil.

func Float64Ptr

func Float64Ptr(v float64) *float64

Float64Ptr returns a pointer to float64.

func Float64Value

func Float64Value(v *float64) float64

Float64Value returns the value of the float64 pointer passed in or 0 if the pointer is nil.

func Int16Ptr

func Int16Ptr(v int16) *int16

Int16Ptr returns a pointer to int16.

func Int16Value

func Int16Value(v *int16) int16

Int16Value returns the value of the int16 pointer passed in or 0 if the pointer is nil.

func Int32Ptr

func Int32Ptr(v int32) *int32

Int32Ptr returns a pointer to int32.

func Int32Value

func Int32Value(v *int32) int32

Int32Value returns the value of the int32 pointer passed in or 0 if the pointer is nil.

func Int64Ptr

func Int64Ptr(v int64) *int64

Int64Ptr returns a pointer to int64.

func Int64Value

func Int64Value(v *int64) int64

Int64Value returns the value of the int64 pointer passed in or 0 if the pointer is nil.

func Int8Ptr

func Int8Ptr(v int8) *int8

Int8Ptr returns a pointer to int8.

func Int8Value

func Int8Value(v *int8) int8

Int8Value returns the value of the int8 pointer passed in or 0 if the pointer is nil.

func IntPtr

func IntPtr(v int) *int

IntPtr returns a pointer to int.

func IntValue

func IntValue(v *int) int

IntValue returns the value of the int pointer passed in or 0 if the pointer is nil.

func NewDBClient

func NewDBClient(ctx context.Context, endpoint string) (*dynamodb.Client, error)

NewDBClient returns a client of Amazon DynamoDB.

func NewDBStreamsClient

func NewDBStreamsClient(ctx context.Context, endpoint string) (*dynamodbstreams.Client, error)

NewDBStreamsClient returns a client of Amazon DynamoDB Streams.

func NewLambdaClient

func NewLambdaClient(ctx context.Context, endpoint string) (*lambda.Client, error)

NewLambdaClient returns a client of AWS Lambda.

func RandomString

func RandomString(n int) string

func StringPtr

func StringPtr(v string) *string

StringPtr returns a pointer to string.

func StringValue

func StringValue(v *string) string

StringValue returns the value of the string pointer passed in or empty string if the pointer is nil.

func Uint16Ptr

func Uint16Ptr(v uint16) *uint16

Uint16Ptr returns a pointer to uint16.

func Uint16Value

func Uint16Value(v *uint16) uint16

Uint16Value returns the value of the uint16 pointer passed in or 0 if the pointer is nil.

func Uint32Ptr

func Uint32Ptr(v uint32) *uint32

Uint32Ptr returns a pointer to uint32.

func Uint32Value

func Uint32Value(v *uint32) uint32

Uint32Value returns the value of the uint32 pointer passed in or 0 if the pointer is nil.

func Uint64Ptr

func Uint64Ptr(v uint64) *uint64

Uint64Ptr returns a pointer to uint64.

func Uint64Value

func Uint64Value(v *uint64) uint64

Uint64Value returns the value of the uint64 pointer passed in or 0 if the pointer is nil.

func Uint8Ptr

func Uint8Ptr(v uint8) *uint8

Uint8Ptr returns a pointer to uint8.

func Uint8Value

func Uint8Value(v *uint8) uint8

Uint8Value returns the value of the uint8 pointer passed in or 0 if the pointer is nil.

func UintPtr

func UintPtr(v uint) *uint

UintPtr returns a pointer to uint.

func UintValue

func UintValue(v *uint) uint

UintValue returns the value of the uint pointer passed in or 0 if the pointer is nil.

Types

type DBClient

type DBClient interface {
	BatchExecuteStatement(context.Context, *dynamodb.BatchExecuteStatementInput, ...func(*dynamodb.Options)) (*dynamodb.BatchExecuteStatementOutput, error)
	BatchGetItem(context.Context, *dynamodb.BatchGetItemInput, ...func(*dynamodb.Options)) (*dynamodb.BatchGetItemOutput, error)
	BatchWriteItem(context.Context, *dynamodb.BatchWriteItemInput, ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
	CreateBackup(context.Context, *dynamodb.CreateBackupInput, ...func(*dynamodb.Options)) (*dynamodb.CreateBackupOutput, error)
	CreateGlobalTable(context.Context, *dynamodb.CreateGlobalTableInput, ...func(*dynamodb.Options)) (*dynamodb.CreateGlobalTableOutput, error)
	CreateTable(context.Context, *dynamodb.CreateTableInput, ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
	DeleteBackup(context.Context, *dynamodb.DeleteBackupInput, ...func(*dynamodb.Options)) (*dynamodb.DeleteBackupOutput, error)
	DeleteItem(context.Context, *dynamodb.DeleteItemInput, ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	DeleteTable(context.Context, *dynamodb.DeleteTableInput, ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error)
	DescribeBackup(context.Context, *dynamodb.DescribeBackupInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeBackupOutput, error)
	DescribeContinuousBackups(context.Context, *dynamodb.DescribeContinuousBackupsInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeContinuousBackupsOutput, error)
	DescribeContributorInsights(context.Context, *dynamodb.DescribeContributorInsightsInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeContributorInsightsOutput, error)
	DescribeEndpoints(context.Context, *dynamodb.DescribeEndpointsInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeEndpointsOutput, error)
	DescribeExport(context.Context, *dynamodb.DescribeExportInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeExportOutput, error)
	DescribeGlobalTable(context.Context, *dynamodb.DescribeGlobalTableInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeGlobalTableOutput, error)
	DescribeGlobalTableSettings(context.Context, *dynamodb.DescribeGlobalTableSettingsInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeGlobalTableSettingsOutput, error)
	DescribeImport(context.Context, *dynamodb.DescribeImportInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeImportOutput, error)
	DescribeKinesisStreamingDestination(context.Context, *dynamodb.DescribeKinesisStreamingDestinationInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeKinesisStreamingDestinationOutput, error)
	DescribeLimits(context.Context, *dynamodb.DescribeLimitsInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeLimitsOutput, error)
	DescribeTable(context.Context, *dynamodb.DescribeTableInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeTableOutput, error)
	DescribeTableReplicaAutoScaling(context.Context, *dynamodb.DescribeTableReplicaAutoScalingInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeTableReplicaAutoScalingOutput, error)
	DescribeTimeToLive(context.Context, *dynamodb.DescribeTimeToLiveInput, ...func(*dynamodb.Options)) (*dynamodb.DescribeTimeToLiveOutput, error)
	DisableKinesisStreamingDestination(context.Context, *dynamodb.DisableKinesisStreamingDestinationInput, ...func(*dynamodb.Options)) (*dynamodb.DisableKinesisStreamingDestinationOutput, error)
	EnableKinesisStreamingDestination(context.Context, *dynamodb.EnableKinesisStreamingDestinationInput, ...func(*dynamodb.Options)) (*dynamodb.EnableKinesisStreamingDestinationOutput, error)
	ExecuteStatement(context.Context, *dynamodb.ExecuteStatementInput, ...func(*dynamodb.Options)) (*dynamodb.ExecuteStatementOutput, error)
	ExecuteTransaction(context.Context, *dynamodb.ExecuteTransactionInput, ...func(*dynamodb.Options)) (*dynamodb.ExecuteTransactionOutput, error)
	ExportTableToPointInTime(context.Context, *dynamodb.ExportTableToPointInTimeInput, ...func(*dynamodb.Options)) (*dynamodb.ExportTableToPointInTimeOutput, error)
	GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	ImportTable(context.Context, *dynamodb.ImportTableInput, ...func(*dynamodb.Options)) (*dynamodb.ImportTableOutput, error)
	ListBackups(context.Context, *dynamodb.ListBackupsInput, ...func(*dynamodb.Options)) (*dynamodb.ListBackupsOutput, error)
	ListContributorInsights(context.Context, *dynamodb.ListContributorInsightsInput, ...func(*dynamodb.Options)) (*dynamodb.ListContributorInsightsOutput, error)
	ListExports(context.Context, *dynamodb.ListExportsInput, ...func(*dynamodb.Options)) (*dynamodb.ListExportsOutput, error)
	ListGlobalTables(context.Context, *dynamodb.ListGlobalTablesInput, ...func(*dynamodb.Options)) (*dynamodb.ListGlobalTablesOutput, error)
	ListImports(context.Context, *dynamodb.ListImportsInput, ...func(*dynamodb.Options)) (*dynamodb.ListImportsOutput, error)
	ListTables(context.Context, *dynamodb.ListTablesInput, ...func(*dynamodb.Options)) (*dynamodb.ListTablesOutput, error)
	ListTagsOfResource(context.Context, *dynamodb.ListTagsOfResourceInput, ...func(*dynamodb.Options)) (*dynamodb.ListTagsOfResourceOutput, error)
	PutItem(context.Context, *dynamodb.PutItemInput, ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	Query(context.Context, *dynamodb.QueryInput, ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
	RestoreTableFromBackup(context.Context, *dynamodb.RestoreTableFromBackupInput, ...func(*dynamodb.Options)) (*dynamodb.RestoreTableFromBackupOutput, error)
	RestoreTableToPointInTime(context.Context, *dynamodb.RestoreTableToPointInTimeInput, ...func(*dynamodb.Options)) (*dynamodb.RestoreTableToPointInTimeOutput, error)
	Scan(context.Context, *dynamodb.ScanInput, ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error)
	TagResource(context.Context, *dynamodb.TagResourceInput, ...func(*dynamodb.Options)) (*dynamodb.TagResourceOutput, error)
	TransactGetItems(context.Context, *dynamodb.TransactGetItemsInput, ...func(*dynamodb.Options)) (*dynamodb.TransactGetItemsOutput, error)
	TransactWriteItems(context.Context, *dynamodb.TransactWriteItemsInput, ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
	UntagResource(context.Context, *dynamodb.UntagResourceInput, ...func(*dynamodb.Options)) (*dynamodb.UntagResourceOutput, error)
	UpdateContinuousBackups(context.Context, *dynamodb.UpdateContinuousBackupsInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateContinuousBackupsOutput, error)
	UpdateContributorInsights(context.Context, *dynamodb.UpdateContributorInsightsInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateContributorInsightsOutput, error)
	UpdateGlobalTable(context.Context, *dynamodb.UpdateGlobalTableInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateGlobalTableOutput, error)
	UpdateGlobalTableSettings(context.Context, *dynamodb.UpdateGlobalTableSettingsInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateGlobalTableSettingsOutput, error)
	UpdateItem(context.Context, *dynamodb.UpdateItemInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	UpdateTable(context.Context, *dynamodb.UpdateTableInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateTableOutput, error)
	UpdateTableReplicaAutoScaling(context.Context, *dynamodb.UpdateTableReplicaAutoScalingInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateTableReplicaAutoScalingOutput, error)
	UpdateTimeToLive(context.Context, *dynamodb.UpdateTimeToLiveInput, ...func(*dynamodb.Options)) (*dynamodb.UpdateTimeToLiveOutput, error)
}

DBClient represents interfaces of Amazon DynamoDB client.

type DynamoDBAttributeValues

type DynamoDBAttributeValues map[string]dbstreamstypes.AttributeValue

DynamoDBAttributeValues represents a set of DynamoDBStreams AttributeValues.

func (DynamoDBAttributeValues) ToEventAttributeValues

func (dav DynamoDBAttributeValues) ToEventAttributeValues() map[string]events.DynamoDBAttributeValue

ToEventAttributeValues converts a set of DynamoDBStreams AttributeValues to a set of Lambda Event DynamoDBAttributeValues.

type LambdaClient

type LambdaClient interface {
	AddLayerVersionPermission(context.Context, *lambda.AddLayerVersionPermissionInput, ...func(*lambda.Options)) (*lambda.AddLayerVersionPermissionOutput, error)
	AddPermission(context.Context, *lambda.AddPermissionInput, ...func(*lambda.Options)) (*lambda.AddPermissionOutput, error)
	CreateAlias(context.Context, *lambda.CreateAliasInput, ...func(*lambda.Options)) (*lambda.CreateAliasOutput, error)
	CreateCodeSigningConfig(context.Context, *lambda.CreateCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.CreateCodeSigningConfigOutput, error)
	CreateEventSourceMapping(context.Context, *lambda.CreateEventSourceMappingInput, ...func(*lambda.Options)) (*lambda.CreateEventSourceMappingOutput, error)
	CreateFunction(context.Context, *lambda.CreateFunctionInput, ...func(*lambda.Options)) (*lambda.CreateFunctionOutput, error)
	CreateFunctionUrlConfig(context.Context, *lambda.CreateFunctionUrlConfigInput, ...func(*lambda.Options)) (*lambda.CreateFunctionUrlConfigOutput, error)
	DeleteAlias(context.Context, *lambda.DeleteAliasInput, ...func(*lambda.Options)) (*lambda.DeleteAliasOutput, error)
	DeleteCodeSigningConfig(context.Context, *lambda.DeleteCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.DeleteCodeSigningConfigOutput, error)
	DeleteEventSourceMapping(context.Context, *lambda.DeleteEventSourceMappingInput, ...func(*lambda.Options)) (*lambda.DeleteEventSourceMappingOutput, error)
	DeleteFunction(context.Context, *lambda.DeleteFunctionInput, ...func(*lambda.Options)) (*lambda.DeleteFunctionOutput, error)
	DeleteFunctionCodeSigningConfig(context.Context, *lambda.DeleteFunctionCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.DeleteFunctionCodeSigningConfigOutput, error)
	DeleteFunctionConcurrency(context.Context, *lambda.DeleteFunctionConcurrencyInput, ...func(*lambda.Options)) (*lambda.DeleteFunctionConcurrencyOutput, error)
	DeleteFunctionEventInvokeConfig(context.Context, *lambda.DeleteFunctionEventInvokeConfigInput, ...func(*lambda.Options)) (*lambda.DeleteFunctionEventInvokeConfigOutput, error)
	DeleteFunctionUrlConfig(context.Context, *lambda.DeleteFunctionUrlConfigInput, ...func(*lambda.Options)) (*lambda.DeleteFunctionUrlConfigOutput, error)
	DeleteLayerVersion(context.Context, *lambda.DeleteLayerVersionInput, ...func(*lambda.Options)) (*lambda.DeleteLayerVersionOutput, error)
	DeleteProvisionedConcurrencyConfig(context.Context, *lambda.DeleteProvisionedConcurrencyConfigInput, ...func(*lambda.Options)) (*lambda.DeleteProvisionedConcurrencyConfigOutput, error)
	GetAccountSettings(context.Context, *lambda.GetAccountSettingsInput, ...func(*lambda.Options)) (*lambda.GetAccountSettingsOutput, error)
	GetAlias(context.Context, *lambda.GetAliasInput, ...func(*lambda.Options)) (*lambda.GetAliasOutput, error)
	GetCodeSigningConfig(context.Context, *lambda.GetCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.GetCodeSigningConfigOutput, error)
	GetEventSourceMapping(context.Context, *lambda.GetEventSourceMappingInput, ...func(*lambda.Options)) (*lambda.GetEventSourceMappingOutput, error)
	GetFunction(context.Context, *lambda.GetFunctionInput, ...func(*lambda.Options)) (*lambda.GetFunctionOutput, error)
	GetFunctionCodeSigningConfig(context.Context, *lambda.GetFunctionCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.GetFunctionCodeSigningConfigOutput, error)
	GetFunctionConcurrency(context.Context, *lambda.GetFunctionConcurrencyInput, ...func(*lambda.Options)) (*lambda.GetFunctionConcurrencyOutput, error)
	GetFunctionConfiguration(context.Context, *lambda.GetFunctionConfigurationInput, ...func(*lambda.Options)) (*lambda.GetFunctionConfigurationOutput, error)
	GetFunctionEventInvokeConfig(context.Context, *lambda.GetFunctionEventInvokeConfigInput, ...func(*lambda.Options)) (*lambda.GetFunctionEventInvokeConfigOutput, error)
	GetFunctionUrlConfig(context.Context, *lambda.GetFunctionUrlConfigInput, ...func(*lambda.Options)) (*lambda.GetFunctionUrlConfigOutput, error)
	GetLayerVersion(context.Context, *lambda.GetLayerVersionInput, ...func(*lambda.Options)) (*lambda.GetLayerVersionOutput, error)
	GetLayerVersionByArn(context.Context, *lambda.GetLayerVersionByArnInput, ...func(*lambda.Options)) (*lambda.GetLayerVersionByArnOutput, error)
	GetLayerVersionPolicy(context.Context, *lambda.GetLayerVersionPolicyInput, ...func(*lambda.Options)) (*lambda.GetLayerVersionPolicyOutput, error)
	GetPolicy(context.Context, *lambda.GetPolicyInput, ...func(*lambda.Options)) (*lambda.GetPolicyOutput, error)
	GetProvisionedConcurrencyConfig(context.Context, *lambda.GetProvisionedConcurrencyConfigInput, ...func(*lambda.Options)) (*lambda.GetProvisionedConcurrencyConfigOutput, error)
	GetRuntimeManagementConfig(context.Context, *lambda.GetRuntimeManagementConfigInput, ...func(*lambda.Options)) (*lambda.GetRuntimeManagementConfigOutput, error)
	Invoke(context.Context, *lambda.InvokeInput, ...func(*lambda.Options)) (*lambda.InvokeOutput, error)
	InvokeAsync(context.Context, *lambda.InvokeAsyncInput, ...func(*lambda.Options)) (*lambda.InvokeAsyncOutput, error)
	InvokeWithResponseStream(context.Context, *lambda.InvokeWithResponseStreamInput, ...func(*lambda.Options)) (*lambda.InvokeWithResponseStreamOutput, error)
	ListAliases(context.Context, *lambda.ListAliasesInput, ...func(*lambda.Options)) (*lambda.ListAliasesOutput, error)
	ListCodeSigningConfigs(context.Context, *lambda.ListCodeSigningConfigsInput, ...func(*lambda.Options)) (*lambda.ListCodeSigningConfigsOutput, error)
	ListEventSourceMappings(context.Context, *lambda.ListEventSourceMappingsInput, ...func(*lambda.Options)) (*lambda.ListEventSourceMappingsOutput, error)
	ListFunctionEventInvokeConfigs(context.Context, *lambda.ListFunctionEventInvokeConfigsInput, ...func(*lambda.Options)) (*lambda.ListFunctionEventInvokeConfigsOutput, error)
	ListFunctionUrlConfigs(context.Context, *lambda.ListFunctionUrlConfigsInput, ...func(*lambda.Options)) (*lambda.ListFunctionUrlConfigsOutput, error)
	ListFunctions(context.Context, *lambda.ListFunctionsInput, ...func(*lambda.Options)) (*lambda.ListFunctionsOutput, error)
	ListFunctionsByCodeSigningConfig(context.Context, *lambda.ListFunctionsByCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.ListFunctionsByCodeSigningConfigOutput, error)
	ListLayerVersions(context.Context, *lambda.ListLayerVersionsInput, ...func(*lambda.Options)) (*lambda.ListLayerVersionsOutput, error)
	ListLayers(context.Context, *lambda.ListLayersInput, ...func(*lambda.Options)) (*lambda.ListLayersOutput, error)
	ListProvisionedConcurrencyConfigs(context.Context, *lambda.ListProvisionedConcurrencyConfigsInput, ...func(*lambda.Options)) (*lambda.ListProvisionedConcurrencyConfigsOutput, error)
	ListTags(context.Context, *lambda.ListTagsInput, ...func(*lambda.Options)) (*lambda.ListTagsOutput, error)
	ListVersionsByFunction(context.Context, *lambda.ListVersionsByFunctionInput, ...func(*lambda.Options)) (*lambda.ListVersionsByFunctionOutput, error)
	PublishLayerVersion(context.Context, *lambda.PublishLayerVersionInput, ...func(*lambda.Options)) (*lambda.PublishLayerVersionOutput, error)
	PublishVersion(context.Context, *lambda.PublishVersionInput, ...func(*lambda.Options)) (*lambda.PublishVersionOutput, error)
	PutFunctionCodeSigningConfig(context.Context, *lambda.PutFunctionCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.PutFunctionCodeSigningConfigOutput, error)
	PutFunctionConcurrency(context.Context, *lambda.PutFunctionConcurrencyInput, ...func(*lambda.Options)) (*lambda.PutFunctionConcurrencyOutput, error)
	PutFunctionEventInvokeConfig(context.Context, *lambda.PutFunctionEventInvokeConfigInput, ...func(*lambda.Options)) (*lambda.PutFunctionEventInvokeConfigOutput, error)
	PutProvisionedConcurrencyConfig(context.Context, *lambda.PutProvisionedConcurrencyConfigInput, ...func(*lambda.Options)) (*lambda.PutProvisionedConcurrencyConfigOutput, error)
	PutRuntimeManagementConfig(context.Context, *lambda.PutRuntimeManagementConfigInput, ...func(*lambda.Options)) (*lambda.PutRuntimeManagementConfigOutput, error)
	RemoveLayerVersionPermission(context.Context, *lambda.RemoveLayerVersionPermissionInput, ...func(*lambda.Options)) (*lambda.RemoveLayerVersionPermissionOutput, error)
	RemovePermission(context.Context, *lambda.RemovePermissionInput, ...func(*lambda.Options)) (*lambda.RemovePermissionOutput, error)
	TagResource(context.Context, *lambda.TagResourceInput, ...func(*lambda.Options)) (*lambda.TagResourceOutput, error)
	UntagResource(context.Context, *lambda.UntagResourceInput, ...func(*lambda.Options)) (*lambda.UntagResourceOutput, error)
	UpdateAlias(context.Context, *lambda.UpdateAliasInput, ...func(*lambda.Options)) (*lambda.UpdateAliasOutput, error)
	UpdateCodeSigningConfig(context.Context, *lambda.UpdateCodeSigningConfigInput, ...func(*lambda.Options)) (*lambda.UpdateCodeSigningConfigOutput, error)
	UpdateEventSourceMapping(context.Context, *lambda.UpdateEventSourceMappingInput, ...func(*lambda.Options)) (*lambda.UpdateEventSourceMappingOutput, error)
	UpdateFunctionCode(context.Context, *lambda.UpdateFunctionCodeInput, ...func(*lambda.Options)) (*lambda.UpdateFunctionCodeOutput, error)
	UpdateFunctionConfiguration(context.Context, *lambda.UpdateFunctionConfigurationInput, ...func(*lambda.Options)) (*lambda.UpdateFunctionConfigurationOutput, error)
	UpdateFunctionEventInvokeConfig(context.Context, *lambda.UpdateFunctionEventInvokeConfigInput, ...func(*lambda.Options)) (*lambda.UpdateFunctionEventInvokeConfigOutput, error)
	UpdateFunctionUrlConfig(context.Context, *lambda.UpdateFunctionUrlConfigInput, ...func(*lambda.Options)) (*lambda.UpdateFunctionUrlConfigOutput, error)
}

LambdaClient represents interfaces of AWS Lambda client.

type LambdaProcessor

type LambdaProcessor struct {
	// contains filtered or unexported fields
}

LambdaProcessor represents a processor that processes records in DynamoDB Streams by invoking Lambda functions.

func NewLambdaProcessor

func NewLambdaProcessor(client LambdaClient) *LambdaProcessor

NewLambdaProcessor returns an instance of the lambda processor.

func (*LambdaProcessor) Process

func (p *LambdaProcessor) Process(ctx context.Context, functionName string, records []dbstreamstypes.Record) error

Process processes records in DynamoDB Streams.

type Processor

type Processor interface {
	Process(ctx context.Context, functionName string, records []dbstreamstypes.Record) error
}

Processor represents a processor that processes records in DynamoDB Streams.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a DynamoDB Streams worker.

func NewWorker

func NewWorker(config *WorkerConfig, options ...WorkerConfigOption) *Worker

NewWorker returns an instance of a worker.

func (*Worker) Execute

func (w *Worker) Execute(ctx context.Context, triggers map[string][]string) error

Execute executes a worker.

type WorkerConfig

type WorkerConfig struct {
	DB        DBClient
	DBStreams DBStreamsClient
	Processor Processor
}

WorkerConfig represents a configuration to execute a worker.

type WorkerConfigOption

type WorkerConfigOption func(*Worker)

WorkerConfigOption represents a function to set optional configuration.

func WithBatchSize

func WithBatchSize(batchSize int32) WorkerConfigOption

WithBatchSize returns a function that set batch size to a worker.

func WithHooks

func WithHooks(hooks *WorkerHooks) WorkerConfigOption

WithBatchSize returns a function that set hooks to a worker.

type WorkerHooks

type WorkerHooks struct {
	PreSetup  func(ctx context.Context) error
	PostSetup func(ctx context.Context) error
}

WorkerHooks represents lifecycle hooks of a worker.

Directories

Path Synopsis
cmd
tools
lambda Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL