sqpulser

package module
v0.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2022 License: MIT Imports: 16 Imported by: 0

README

sqpulser

sqpulser is a tool for compiling SQS messages and emitting them in a pulsatile cycle

Usage

sqpulser -in sqpulser-in -out sqpulser-out -emit-interval 1h -offset 15m
2022/08/10 12:14:30 [info] try get incoming queue url: queue name `sqpulser-in`
2022/08/10 12:14:31 [info] try get outgoing queue url: queue name `sqpulser-out`
2022/08/10 12:14:31 [info] start polling: https://sqs.ap-northeast-1.amazonaws.com/012345678900/sqpulser-in

If SendMessage is sent at 12:40, 12:41, or 12:43, ReciveMessage can be sent at 13:15 from the outgoing queue. This is an application that chunks messages sent to a specified incoming queue at a certain time, and summarizes them so that they will be recived at a specific time.

LICENSE

MIT

Documentation

Index

Constants

View Source
const (
	OriginalMessageSentTimestampAttributeKey = "OriginalSentTimestamp"
	OriginalMessageIDAttributeKey            = "OriginalMessageID"
)

Variables

This section is empty.

Functions

func ExtructSentTimestamp

func ExtructSentTimestamp(msg *types.Message) (int64, error)

Types

type App

type App struct {
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context, opt *Option, optFns ...func(*config.LoadOptions) error) (*App, error)

func NewWithClient

func NewWithClient(ctx context.Context, client SQSClient, opt *Option) (*App, error)

func (*App) HandleMessage

func (app *App) HandleMessage(ctx context.Context, msg *types.Message) error

func (*App) LambdaHandler

func (app *App) LambdaHandler(ctx context.Context, event *SQSEvent) (*SQSBatchResponse, error)

func (*App) Run

func (app *App) Run(ctx context.Context) error

type BatchItemFailureItem

type BatchItemFailureItem struct {
	ItemIdentifier string `json:"itemIdentifier"`
}

type Option

type Option struct {
	IncomingQueueURL  string
	OutgoingQueueURL  string
	IncomingQueueName string
	OutgoingQueueName string
	EmitInterval      time.Duration
	Offset            time.Duration
}

Option represents option values of app

type OriginalAttributes

type OriginalAttributes struct {
	MessageID     string
	SentTimestamp int64
}

func ExtructOriginalAttribute

func ExtructOriginalAttribute(msg *types.Message) (*OriginalAttributes, error)

func (*OriginalAttributes) DelayDuration

func (attr *OriginalAttributes) DelayDuration(emitInterval, offset time.Duration) time.Duration

func (*OriginalAttributes) EmitTime

func (attr *OriginalAttributes) EmitTime(emitInterval, offset time.Duration) time.Time

func (*OriginalAttributes) SentTime

func (attr *OriginalAttributes) SentTime() time.Time

func (*OriginalAttributes) SetMessageAttribute

func (attr *OriginalAttributes) SetMessageAttribute(attributes map[string]types.MessageAttributeValue) map[string]types.MessageAttributeValue

type SQSBatchResponse

type SQSBatchResponse struct {
	BatchItemFailures []BatchItemFailureItem `json:"batchItemFailures,omitempty"`
}

type SQSClient

type SQSClient interface {
	SendMessage(ctx context.Context, params *sqs.SendMessageInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
}

type SQSEvent

type SQSEvent struct {
	Records []types.Message
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL