
A high-performance, production-ready Go library for Amazon Web Services (AWS) that provides utilities for working with AWS services like SQS (Simple Queue Service). This toolkit is designed to simplify the development of AWS applications with features like structured logging, error handling, and concurrency management.
Installation
go get github.com/citadel2024/aws-toolkit
Quick Start
See examples folder.
SQS
A high-performance, production-ready Go library for Amazon SQS (Simple Queue Service) that provides both publishing and consuming capabilities with advanced features like batching, graceful shutdown, error handling, and comprehensive observability.
Inspired by https://github.com/awslabs/amazon-sqs-java-temporary-queues-client.
Features
π Publisher
- Batch Processing: Automatic message batching for optimal throughput
- Configurable Intervals: Time-based and size-based batch triggers
- Graceful Shutdown: Clean shutdown with message draining
- Error Handling: Comprehensive error handling with custom callbacks
- Thread Safety: Safe for concurrent use across multiple goroutines
- Observability: Structured logging with zerolog integration
π¨ Consumer
- Concurrent Processing: Multi-threaded message polling and processing
- Error Classification: Distinguishes between retryable and non-retryable errors
- Long Polling: Efficient long polling to reduce API calls
- Graceful Shutdown: Coordinated shutdown of all processing goroutines
- Custom Error Handling: Pluggable exception handlers
Configuration Options
Publisher Options
| Option |
Description |
Default |
WithLogger(logger) |
Custom zerolog logger |
zerolog.Nop() |
WithClient(client) |
Custom SQS client |
AWS default config |
WithBatchMaxMessages(limit) |
Max messages per batch |
10 |
WithPublishInterval(interval) |
Batch publish interval |
5 seconds |
WithSendingMessageTimeoutSeconds(timeout) |
Send timeout |
30 seconds |
WithOnSendMessageBatchComplete(handler) |
Batch completion callback |
Default error logging |
Consumer Options
| Option |
Description |
Default |
WithLogger(logger) |
Custom zerolog logger |
zerolog.Nop() |
WithMessageHandler(handler) |
Message processing function |
Required |
WithExceptionHandler(handler) |
Exception handling function |
Default error logging |
WithPollingGoroutines(count) |
Number of polling goroutines |
1 |
WithProcessingConcurrency(count) |
Max concurrent message processors |
10 |
WithMaxMessagesPerBatch(count) |
Messages per receive call (1-10) |
10 |
WithWaitTimeSeconds(seconds) |
Long polling wait time (0-20) |
20 |
WithPollIntervalMilliseconds(interval) |
Delay between polling attempts (>=0 ms) |
0 |
WithShutdownHook(hook) |
Function called on shutdown |
None |
Advanced Usage
Architecture
Publisher Architecture
βββββββββββββββββββ ββββββββββββββββ βββββββββββββββ
β Application βββββΆβ Publisher βββββΆβ SQS Queue β
βββββββββββββββββββ ββββββββββββββββ βββββββββββββββ
β
βΌ
ββββββββββββββββββββ
β Batch Worker β
β β’ Time-based β
β β’ Size-based β
β β’ Error handlingβ
ββββββββββββββββββββ
Consumer Architecture
βββββββββββββββ βββββββββββββββββββ ββββββββββββββββββββ
β SQS Queue ββββββ Polling Workers βββββΆβ Processing Pool β
βββββββββββββββ β (Configurable) β β (Configurable) β
βββββββββββββββββββ ββββββββββββββββββββ
β β
βΌ βΌ
βββββββββββββββββββ ββββββββββββββββββββ
β Long Polling β β Message Handler β
β β’ Batch fetch β β β’ Error handling β
β β’ Wait timeout β β β’ Retry logic β
βββββββββββββββββββ ββββββββββββββββββββ
Publisher Optimization
- Batch Size: Increase
WithBatchMaxMessages for higher throughput
- Publish Interval: Lower intervals for lower latency, higher for better batching
- Buffer Size: Channel buffer is automatically sized based on batch limit
Consumer Optimization
- Polling Workers: Increase for better queue draining with multiple consumers
- Processing Concurrency: Tune based on message processing time and resources
- Long Polling: Use maximum
WithWaitTimeSeconds(20) to reduce API calls
- Batch Size: Use maximum
WithMaxMessagesPerBatch(10) for efficiency
Testing
make rapid_test
make fuzz_test
make unit_test
License
This project is licensed under the MIT License - see the LICENSE file for details.
Acknowledgments