consumer

package
v0.0.0-...-a53e933 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2024 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	QueueURL string

	// processor configuration
	ProcessorWorkerPoolSize int32

	// poller configuration
	PollerWorkerPoolSize int32
	// MaxNumberOfMessages is the maximum number of messages to receive in a single poll request: 1-10
	MaxNumberOfMessages int32
	// WaitTimeSeconds is the duration (0 to 20 seconds) for which the call waits for a message to arrive
	WaitTimeSeconds int32
	// VisibilityTimeout is the duration (0 to 12 hours) that the received messages are hidden from subsequent retrieve requests
	VisibilityTimeout int32
	// MaxNumberOfRetries is the maximum number of retries for a message polling. -1 means infinite retries
	ErrorNumberThreshold int32
	// GracefulShutdownTimeout is the timeout for graceful shutdown.
	GracefulShutdownTimeout int32
}

func NewConfig

func NewConfig(
	queueURL string,
	handlerWorkerPoolSize int32,
	pollerWorkerPoolSize int32,
	maxNumberOfMessages int32,
	waitTimeSeconds int32,
	visibilityTimeout int32,
	errorNumberThreshold int32,
	gracefulShutdownTimeout int32,
) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig(queueURL string) *Config

NewDefaultConfig creates a new Config with default values

func (*Config) IsValid

func (c *Config) IsValid() (bool, error)

type DummyAdapter

type DummyAdapter[T sqstypes.Message] struct{}

func NewDummyAdapter

func NewDummyAdapter[T sqstypes.Message]() *DummyAdapter[T]

func (*DummyAdapter[T]) Transform

func (a *DummyAdapter[T]) Transform(_ context.Context, msg sqstypes.Message) (T, error)

type ErrWrongConfig

type ErrWrongConfig struct {
	Err error
}

func (*ErrWrongConfig) Error

func (e *ErrWrongConfig) Error() string

type Handler

type Handler[T any] interface {
	Handle(ctx context.Context, msg T) error
}

Handler is a generic interface for message handlers. The type parameter T specifies the type of message the handler accepts.

type HandlerFunc

type HandlerFunc[T any] func(ctx context.Context, msg T) error

func (HandlerFunc[T]) Handle

func (f HandlerFunc[T]) Handle(ctx context.Context, msg T) error

type JSONMessageAdapter

type JSONMessageAdapter[T any] struct{}

JSONMessageAdapter is a message adapter for json messages

func NewJSONMessageAdapter

func NewJSONMessageAdapter[T any]() *JSONMessageAdapter[T]

func (*JSONMessageAdapter[T]) Transform

func (a *JSONMessageAdapter[T]) Transform(_ context.Context, msg sqstypes.Message) (T, error)

type MessageAdapter

type MessageAdapter[T any] interface {
	Transform(ctx context.Context, msg sqstypes.Message) (T, error)
}

type MessageAdapterFunc

type MessageAdapterFunc[T any] func(ctx context.Context, msg sqstypes.Message) (T, error)

func (MessageAdapterFunc[T]) Transform

func (f MessageAdapterFunc[T]) Transform(ctx context.Context, msg sqstypes.Message) (T, error)

type Middleware

type Middleware[T any] func(next HandlerFunc[T]) HandlerFunc[T]

func MiddlewareAdapter

func MiddlewareAdapter[T any](mw Middleware[any]) Middleware[T]

MiddlewareAdapter adapts a middleware of any type to a middleware of a specific type T. It creates a new handler that operates on T and a new handler that matches the HandlerFunc[Message] type.

func NewIgnoreErrorsMiddleware

func NewIgnoreErrorsMiddleware[T any](l *slog.Logger) Middleware[T]

NewIgnoreErrorsMiddleware creates a new middleware that ignores errors that occur during message processing. If the logger is provided, it will log the error.

func NewPanicRecoverMiddleware

func NewPanicRecoverMiddleware[T any]() Middleware[T]

func NewTimeLimitMiddleware

func NewTimeLimitMiddleware[T any](timeout time.Duration) Middleware[T]

type Processor

type Processor[T any] interface {
	Process(ctx context.Context, ch <-chan sqstypes.Message, handler Handler[T]) error
}

type SQSConsumer

type SQSConsumer[T any] struct {
	// contains filtered or unexported fields
}

func NewSQSConsumer

func NewSQSConsumer[T any](
	cfg Config,
	sqsClient sqsConnector,
	messageAdapter MessageAdapter[T],
	middlewares []Middleware[T],
	logger *slog.Logger,
) *SQSConsumer[T]

func (*SQSConsumer[T]) Close

func (c *SQSConsumer[T]) Close() error

func (*SQSConsumer[T]) Consume

func (c *SQSConsumer[T]) Consume(ctx context.Context, queueURL string, messageHandler Handler[T]) error

func (*SQSConsumer[T]) IsRunning

func (c *SQSConsumer[T]) IsRunning() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL