sqsprocessor

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: MIT Imports: 6 Imported by: 0

README

SQS Processor

Attempt to wrap AWS SQS client with functionality similar to that of the gcloud pub/sub client.

Usage

The processor will simply run a given ProcessFunc over any messages found on a given SQS queue. ProcessFunc takes the sqs message body in as a string and decides how to decode and action on the message.

The ProcessFunc must return either ProcessResultAck or ProcessResultNack. Ack implies a success and leads to the message being deleted from the queue, whereas Nack will re-publish the message to the queue.

Example
import (
    "context"
    "time"

    "github.com/barrett370/sqs-processor/middleware"
    sqsprocessor "github.com/barrett370/sqs-processor"
    "github.com/aws/aws-sdk-go-v2/service/sqs"
)

type messageBody struct {
    ID string
    Action EnumType
}

func (s *service) process(ctx context.Context, message messageBody) (ret sqsprocessor.ProcessResult) {
    err := s.DoAction(message.ID, message.Action)
    if err == nil {
        ret = sqsprocessor.ProcessResultAck
    }
    return
}


func main() {
    // initialise v2 sqs client
    c := newClient() 

	config := sqsprocessor.ProcessorConfig{
		Receive: sqs.ReceiveMessageInput{
			WaitTimeSeconds:     10,
			MaxNumberOfMessages: 10,
			VisibilityTimeout:   2,
		},
		NumWorkers: 10,
		Backoff:    time.Second,
	}

	p := sqsprocessor.NewProcessor(c, config)
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan struct{})


    cleanup := func() {
        cancel()
        <- done
    }

	go func() {
		p.Process(ctx, middleware.JSONDecode(svc.process))
		close(done)
	}()

    // some other code

    cleanup()
}

Documentation

Overview

Package sqsprocessor contains an implementation of an sqs processor, similar in design to the provided pubsub client in the gcloud go sdk

The main structure is the Processor, which handles spawning, managing and feeding a pool of workers which execute a provided ProcessFunc over each message they receive.

Basic Usage

c := sqs.NewFromConfig(cfg)

p := NewProcessor(c, ProcessorConfig{
	NumWorkers: 10,
	Backoff: time.Second,
	Receive: sqs.ReceiveMessageInput{
		QueueUrl: sqsQueueURL,
		MaxNumberOfMessages: 10,
		VisibilityTimeout: 2,
		WaitTimeSeconds: 1,
	},
})

ctx, cancel := context.WithCancel(context.Background())

var wg sync.WaitGroup

wg.Add(1)
go func() {
	defer wg.Done()
	p.Process(ctx, func(ctx context.Context, message types.Message) ProcessResult {
		if message.Body == nil {
			// Delete bad messages from queue
			return ProcessResultAck
		}
		if *msg.Body == "good" {
			// Happy path
			return ProcessResultAck
		}
		// Sad path
		return ProcessResultNack
	})
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for err := range p.Errors() {
		log.Printf("received error from processor, %v\n", err)
	}
}()

sigC := make(chan os.Signal, 1)
signal.Notify(sigC, syscall.SIGTERM, syscall.SIGINT)

<-sigC

log.Print("Receieved signal to quit, stopping processor")
cancel()
wg.Wait()
log.Print("Processor stopped, exiting")

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ErrMessageExpired

type ErrMessageExpired struct {
	// contains filtered or unexported fields
}

ErrMessageExpired is returned when a message with an expired deadline is encountered and processing is abandoned

func (ErrMessageExpired) Error

func (e ErrMessageExpired) Error() string

type Middleware added in v0.2.0

type Middleware func(ProcessFunc) ProcessFunc

type ProcessFunc

type ProcessFunc func(ctx context.Context, msg types.Message) ProcessResult

ProcessFunc is the signature of functions the user provides to process each message received off the queue

type ProcessResult

type ProcessResult uint8

ProcessResult is an enum used to signal success or failure when processing a message in a ProcessFunc

const (
	/*
		ProcessResultNack indicates that the
		ProcessFunc either does not want to
		process a message or has failed to,
		upon receiving this, the Processor
		expedites the re-processing of the
		message by making it visable in the queue
	*/
	ProcessResultNack ProcessResult = iota
	/*
				ProcessResultAck indicates that the
				ProcessFunc was successful in processing
		 		the message. Upon receiving this,
				the Processor deletes the message from
				the queue to prevent re-delivery
	*/
	ProcessResultAck
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor is the struct which orchestrates polling for messages as well as starting and feeding a configured number of workers in a pool

func New added in v0.1.4

func New(c SQSClienter, config ProcessorConfig) *Processor

New returns a pointer to a new Processor given a config and sqs client

func (*Processor) Errors

func (p *Processor) Errors() <-chan error

Errors returns a channel to which any errors encountered during processing are sent to. If it has not been previously called, a new, blocking channel is created. If you call this method, make sure there is a goroutine cosuming from the returned channel to prevent deadlock

func (*Processor) Process

func (p *Processor) Process(ctx context.Context, pf ProcessFunc)

Process starts the processor and workers in the pool. It passes each message received to a worker in the pool which executes the given ProcessFunc. To stop processing, cancel the provided context

type ProcessorConfig

type ProcessorConfig struct {
	// NumWorkers is the number of worker
	// goroutines spawned, managed and
	// used by the Processor to process any
	// received messages
	NumWorkers int
	// Backoff is the amount of time the
	// Processor will block before polling
	// for new messages if none were received
	// in the previous call
	Backoff time.Duration
	// TODO abstract these?
	Receive        sqs.ReceiveMessageInput
	ReceiveOptions []func(*sqs.Options)
}

type SQSClienter

type SQSClienter interface {
	ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
	DeleteMessage(ctx context.Context, params *sqs.DeleteMessageInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageOutput, error)
	ChangeMessageVisibility(ctx context.Context, params *sqs.ChangeMessageVisibilityInput, optFns ...func(*sqs.Options)) (*sqs.ChangeMessageVisibilityOutput, error)
}

SQSClienter encapsulates all sqs methods a Processor will use

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL