Documentation
¶
Index ¶
- type InTransactionFn
- type Message
- type MessageRepository
- type Option
- func WithBatchSize(size uint) Option
- func WithErrorHandler(handler func(error)) Option
- func WithMaxRetries(retries uint) Option
- func WithOtelMeter(meter metric.Meter) Option
- func WithOtelTracer(tracer trace.Tracer) Option
- func WithPeriod(period time.Duration) Option
- func WithSendTimeout(timeout time.Duration) Option
- type Outbox
- type SendStatus
- type Sender
- type TransactionalMessageRepository
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type InTransactionFn ¶
type InTransactionFn func(ctx context.Context, s MessageRepository) error
type Message ¶
type Message struct {
ID uint64
Key string
Value []byte
Timestamp time.Time
Retries uint
// contains filtered or unexported fields
}
func (*Message) MarkAsFailed ¶
func (m *Message) MarkAsFailed()
func (*Message) MarkAsSent ¶
func (m *Message) MarkAsSent()
type MessageRepository ¶
type MessageRepository interface {
// FetchAndLock fetches a batch of ready-to-sent messages from the outbox and locks them for processing.
// It fetches at most batchSize messages sorted by timestamp in ascending order.
FetchAndLock(ctx context.Context, batchSize uint) ([]Message, error)
// UpdateRetryCount updates the retry count for the message with the given ID.
// It's called when the message is retried.
UpdateRetryCount(ctx context.Context, ids []uint64) error
// MarkAsDead marks the message as dead so that it won't be retried anymore.
// It's called when the message cannot be sent after maxRetries.
MarkAsDead(ctx context.Context, ids []uint64) error
// MarkAsSent marks the message as sent.
MarkAsSent(ctx context.Context, ids []uint64) error
}
MessageRepository describes an interface for interacting with the outbox storage (e.g., a database).
type Option ¶
type Option func(*options)
func WithBatchSize ¶
WithBatchSize sets the batch size to fetch messages from the repository. The default batch size is 10.
func WithErrorHandler ¶
WithErrorHandler sets the error handler for the outbox. The error handler is called when an error occurs while sending messages. The default error handler logs the error to the standard logger. Be careful, it blocks the main sending loop.
func WithMaxRetries ¶
WithMaxRetries sets the maximum number of retries for sending messages. The default maximum number of retries is 3.
func WithOtelMeter ¶
WithOtelMeter sets the OpenTelemetry meter for the outbox. By default, the default OpenTelemetry meter is used.
func WithOtelTracer ¶
WithOtelTracer sets the OpenTelemetry tracer for the outbox. By default, the default OpenTelemetry tracer is used.
func WithPeriod ¶
WithPeriod sets the period for sending messages. The default period is 5 seconds.
func WithSendTimeout ¶
WithSendTimeout sets the timeout for sending messages, including communication with db. The default timeout is 2 second.
type Outbox ¶
type Outbox[T any] struct { // contains filtered or unexported fields }
func NewOutbox ¶
func NewOutbox[T any](storage TransactionalMessageRepository[T], sender Sender, opts ...Option) (*Outbox[T], error)
NewOutbox creates a new Outbox instance.
func (*Outbox[T]) AddMessage ¶
AddMessage adds a message to the outbox. It sets the timestamp to the current UTC time.
type SendStatus ¶
type SendStatus uint8
const ( StatusNone SendStatus = 0 // not tried to send StatusSent SendStatus = 1 // sent successfully StatusFail SendStatus = 2 // failed to send StatusDead SendStatus = 3 // retries exceeded, message is dead )
type Sender ¶
type Sender interface {
// Send sends a batch of messages to the message broker. The passed messages are
// sorted by timestamp in ascending order.
// It should return the same list of messages with updated statuses
// (method SetStatusSent or SetStatusFail of the Message). Returned error should be
// returned if the sending failed completely or partially.
Send(ctx context.Context, msg []Message) ([]Message, error)
}
type TransactionalMessageRepository ¶
type TransactionalMessageRepository[T any] interface { MessageRepository // AddMessage adds a message to the outbox. AddMessage(ctx context.Context, tx T, msg Message) error // WithTransaction executes a function within a transaction. // The function should return an error if the transaction should be rolled back. WithTransaction(ctx context.Context, cb InTransactionFn) error }
TransactionalMessageRepository extends MessageRepository to support transactional operations.