runsqs

package module
v1.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 9, 2022 License: Apache-2.0 Imports: 13 Imported by: 2

README

runsqs - Prepackaged Runtime Helper For AWS SQS

GoDoc Build Status codecov.io

Overview

This project is a tool bundle for running a service that interacts with AWS SQS written in go. It comes with an opinionated choice of logger, metrics client, and configuration parsing. The benefits are a simple abstraction around the aws-sdk api for consuming from an SQS or producing to an SQS.

Quick Start

package main

import (
    "net/http"
    "fmt"

    "github.com/asecurityteam/runsqs"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sqs"
)

type BasicConsumer string

func (m BasicConsumer) ConsumeMessage(ctx context.Context, message []byte) error {
    fmt.Println(string(message))
    fmt.Println(m)
    return nil
}

func main() {
    // create a new aws session, and establish a SQS instance to connect to.
    // aws new sessions by default reads AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY as environment variables to use
	var sesh = session.Must(session.NewSession())
	queue := sqs.New(sesh, &aws.Config{
		Region:     aws.String("us-west-2"),
		HTTPClient: http.DefaultClient,
		Endpoint:   aws.String("www.aws.com"),
    })



    consumer := runsqs.DefaultSQSQueueConsumer{
        Queue: queue,
        QueueURL: "www.aws.com/url/to/queue",
        MessageConsumer: BasicConsumer{"consooooom"},
        LogFn: runsqs.LoggerFromContext,
    }

    producer := runsqs.DefaultSQSProducer{
        Queue: queue,
        QueueURL: "www.aws.com/url/to/queue",
    }

    go producer.ProduceMessage([]byte("incoming sqs message"))

    // Run the SQS consumer.
    if err := consumer.StartConsuming(); err != nil {
		panic(err.Error())
    }

    // expected output:
    // "incoming sqs message"
    // "consooooom"
}

Status

This project is in incubation which the interfaces and implementations are subject to change.

Contributing

Building And Testing

We publish a docker image called SDCLI that bundles all of our build dependencies. It is used by the included Makefile to help make building and testing a bit easier. The following actions are available through the Makefile:

  • make dep

    Install the project dependencies into a vendor directory

  • make lint

    Run our static analysis suite

  • make test

    Run unit tests and generate a coverage artifact

  • make integration

    Run integration tests and generate a coverage artifact

  • make coverage

    Report the combined coverage for unit and integration tests

License

This project is licensed under Apache 2.0. See LICENSE.txt for details.

Contributing Agreement

Atlassian requires signing a contributor's agreement before we can accept a patch. If you are an individual you can fill out the individual CLA. If you are contributing on behalf of your company then please fill out the corporate CLA.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var LoggerFromContext = logevent.FromContext

LoggerFromContext is the concrete implementation of LogFn that should be used at runtime.

View Source
var StatFromContext = xstats.FromContext

StatFromContext is the concrete implementation of StatFn that should be used at runtime.

Functions

This section is empty.

Types

type ConsumerChain added in v1.0.0

type ConsumerChain []ConsumerDecorator

ConsumerChain is an ordered collection of ConsumerDecorator.

func (ConsumerChain) Apply added in v1.0.0

Apply wraps the given SQSMessageConsumer with the Decorator chain.

type ConsumerDecorator added in v1.0.0

type ConsumerDecorator func(SQSMessageConsumer) SQSMessageConsumer

ConsumerDecorator is a named type for any function that takes a SQSMessageConsumer and returns a SQSMessageConsumer.

type DefaultSQSProducer

type DefaultSQSProducer struct {
	Queue    sqsiface.SQSAPI
	QueueURL string
}

DefaultSQSProducer is a basic sqs producer

func (*DefaultSQSProducer) ProduceMessage

func (producer *DefaultSQSProducer) ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error

ProduceMessage produces a message to the configured sqs queue, along with setting the queueURL to use

type DefaultSQSProducerComponent added in v0.1.0

type DefaultSQSProducerComponent struct {
}

DefaultSQSProducerComponent enables creating configured Component

func NewDefaultSQSProducerComponent added in v0.1.0

func NewDefaultSQSProducerComponent() *DefaultSQSProducerComponent

NewDefaultSQSProducerComponent generates a new DefaultSQSQueueConsumerComponent

func (*DefaultSQSProducerComponent) New added in v0.1.0

New creates a configured DefaultSQSQueueConsumer

func (*DefaultSQSProducerComponent) Settings added in v0.1.0

Settings generates the default configuration for DefaultSQSProducerComponent

type DefaultSQSProducerConfig added in v0.1.0

type DefaultSQSProducerConfig struct {
	AWSEndpoint string
	QueueURL    string
	QueueRegion string
}

DefaultSQSProducerConfig represents the configuration to configure DefaultSQSProducer

func (*DefaultSQSProducerConfig) Name added in v0.1.0

Name of the configuration

type DefaultSQSQueueConsumer

type DefaultSQSQueueConsumer struct {
	Queue    sqsiface.SQSAPI
	LogFn    LogFn
	QueueURL string

	MessageConsumer SQSMessageConsumer
	// contains filtered or unexported fields
}

DefaultSQSQueueConsumer is a naive implementation of an SQSConsumer. This implementation has no support for retries on nonpermanent failures; the result of every message consumption is followed by a deletion of the message. Furthermore, this implementation does not support concurrent processing of messages; messages are processed sequentially.

func (*DefaultSQSQueueConsumer) GetSQSMessageConsumer

func (m *DefaultSQSQueueConsumer) GetSQSMessageConsumer() SQSMessageConsumer

GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.

func (*DefaultSQSQueueConsumer) StartConsuming

func (m *DefaultSQSQueueConsumer) StartConsuming(ctx context.Context) error

StartConsuming starts consuming from the configured SQS queue

func (*DefaultSQSQueueConsumer) StopConsuming

func (m *DefaultSQSQueueConsumer) StopConsuming(ctx context.Context) error

StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue

type DefaultSQSQueueConsumerComponent

type DefaultSQSQueueConsumerComponent struct {
}

DefaultSQSQueueConsumerComponent enables creating configured Component

func NewDefaultSQSQueueConsumerComponent

func NewDefaultSQSQueueConsumerComponent() *DefaultSQSQueueConsumerComponent

NewDefaultSQSQueueConsumerComponent generates a new DefaultSQSQueueConsumerComponent

func (*DefaultSQSQueueConsumerComponent) New

New creates a configured DefaultSQSQueueConsumer

func (*DefaultSQSQueueConsumerComponent) Settings

Settings generates the default configuration for DefaultSQSQueueConsumerComponent

type DefaultSQSQueueConsumerConfig

type DefaultSQSQueueConsumerConfig struct {
	AWSEndpoint string
	QueueURL    string
	QueueRegion string
}

DefaultSQSQueueConsumerConfig represents the configuration to configure DefaultSQSQueueConsumer

func (*DefaultSQSQueueConsumerConfig) Name

Name of the configuration

type LogFn

type LogFn func(context.Context) Logger

LogFn is the type that should be accepted by components that intend to log content using the context logger.

type Logger

type Logger = logevent.Logger

Logger is the project logging client interface. It is currently an alias to the logevent project.

type ProducerChain added in v1.0.0

type ProducerChain []ProducerDecorator

ProducerChain is an ordered collection of Decorators.

func (ProducerChain) Apply added in v1.0.0

func (c ProducerChain) Apply(base SQSProducer) SQSProducer

Apply wraps the given SQSProducer with the Decorator chain.

type ProducerDecorator added in v1.0.0

type ProducerDecorator func(SQSProducer) SQSProducer

ProducerDecorator is a named type for any function that takes a SQSProducer and returns a SQSProducer.

type RetryableConsumerError

type RetryableConsumerError struct {
	WrappedError      error
	VisibilityTimeout int64
}

RetryableConsumerError represents a possible error type an SQSMessageConsumer could return on a call to ConsumeMessage. Users can set VisibilityTimeout, and implementors of SQSConsumer can leverage VisibilityTimeout to change the visibility of an sqs message for retry purposes.

func (RetryableConsumerError) Error

func (e RetryableConsumerError) Error() string

Error implements type error

type SQSConsumer

type SQSConsumer interface {
	StartConsuming(ctx context.Context) error
	StopConsuming(ctx context.Context) error
	GetSQSMessageConsumer() SQSMessageConsumer
}

SQSConsumer is an interface that represents an aws sqs queue worker. Implementers of SQSConsumer are responsible for: - SQS connectivity - Start and Stop consumption - error handling

type SQSMessageConsumer

type SQSMessageConsumer interface {
	ConsumeMessage(ctx context.Context, message *sqs.Message) SQSMessageConsumerError
	// DeadLetter will be called when MaxRetries is exhausted, only in the SmartSQSConsumer
	DeadLetter(ctx context.Context, message *sqs.Message)
}

SQSMessageConsumer is an interface that defines how a message should be consumer. Users are responsible for unmarshalling messages themselves, and returning errors.

type SQSMessageConsumerError added in v0.3.0

type SQSMessageConsumerError interface {
	IsRetryable() bool
	Error() string
	RetryAfter() int64
}

SQSMessageConsumerError represents an error that can be used to indicate to the consumer that an error should be retried. Note: RetryAfter should be expressed in seconds

type SQSProducer

type SQSProducer interface {
	ProduceMessage(ctx context.Context, messageInput *sqs.SendMessageInput) error
}

SQSProducer is an interface for producing messages to an aws sqs instance. Implementors are responsible for placing messages on an sqs, and also: - SQS connectivity - error handling - constructing the input *sqs.SendMessageInput

type SmartSQSConsumer

type SmartSQSConsumer struct {
	Queue    sqsiface.SQSAPI
	LogFn    LogFn
	QueueURL string

	MessageConsumer SQSMessageConsumer
	NumWorkers      uint64
	MessagePoolSize uint64
	MaxRetries      uint64
	// contains filtered or unexported fields
}

SmartSQSConsumer is an implementation of an SQSConsumer. This implementation supports... - retryable and non-retryable errors. - a maximum number of retries to be placed on a retryable sqs message - concurrent workers

func (*SmartSQSConsumer) GetSQSMessageConsumer

func (m *SmartSQSConsumer) GetSQSMessageConsumer() SQSMessageConsumer

GetSQSMessageConsumer returns the MessageConsumer field. This function implies that DefaultSQSQueueConsumer MUST have a MessageConsumer defined.

func (*SmartSQSConsumer) StartConsuming

func (m *SmartSQSConsumer) StartConsuming(ctx context.Context) error

StartConsuming starts consuming from the configured SQS queue

func (*SmartSQSConsumer) StopConsuming

func (m *SmartSQSConsumer) StopConsuming(ctx context.Context) error

StopConsuming stops this DefaultSQSQueueConsumer consuming from the SQS queue

type SmartSQSQueueConsumerComponent added in v0.1.0

type SmartSQSQueueConsumerComponent struct {
}

SmartSQSQueueConsumerComponent enables creating configured Component

func NewSmartSQSQueueConsumerComponent added in v0.1.0

func NewSmartSQSQueueConsumerComponent() *SmartSQSQueueConsumerComponent

NewSmartSQSQueueConsumerComponent generates a new SmartSQSQueueConsumerComponent

func (*SmartSQSQueueConsumerComponent) New added in v0.1.0

New creates a configured SmartSQSConsumer

func (*SmartSQSQueueConsumerComponent) Settings added in v0.1.0

Settings generates the default configuration for DefaultSQSQueueConsumerComponent

type SmartSQSQueueConsumerConfig added in v0.1.0

type SmartSQSQueueConsumerConfig struct {
	AWSEndpoint     string
	QueueURL        string
	QueueRegion     string
	NumWorkers      uint64
	MessagePoolSize uint64
	MaxRetries      uint64
}

SmartSQSQueueConsumerConfig represents the configuration to configure SmartSQSQueueConsumer

func (*SmartSQSQueueConsumerConfig) Name added in v0.1.0

Name of the configuration

type Stat

type Stat = xstats.XStater

Stat is the project metrics client interface. it is currently an alias for xstats.XStater.

type StatFn

type StatFn func(context.Context) Stat

StatFn is the type that should be accepted by components that intend to emit custom metrics using the context stat client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL