gooutstore

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2024 License: AGPL-3.0 Imports: 5 Imported by: 3

README

gooutstore GoDoc

gooutstore is a library for managing outbox messages in a distributed system. It provides interfaces and implementations for dispatching and generating outbox messages using various database clients.

Installation

To install gooutstore, use go get:

go get github.com/khv1one/gooutstore

Usage

Dispatcher

The Dispatcher is responsible for dispatching outbox messages. It reads messages from the outbox, processes them, and updates their status.

Generator

The Generator is responsible for generating outbox messages and storing them in the database.

Examples
Example dispatcher
package main

import (
    "context"
    "log"
    "time"

    "github.com/khv1one/gooutstore"
    "github.com/khv1one/gooutstore/client/pgpq"
)

func main() {
    db, err := sql.Open("postgres", "your-database-connection-string")
    if err != nil {
        log.Fatal(err)
    }

    dispatcher := gooutstore.NewDispatcherWithClient(pgpq.NewClient(db), processMessage)

    ctx := context.Background()
    if err := dispatcher.Start(ctx); err != nil {
        log.Fatal(err)
    }
}

func processMessage(ctx context.Context, msg gooutstore.IOutboxMessage) error {
    // Process the message
    return nil
}
Example generator
package main

import (
    "context"
    "log"

    "github.com/khv1one/gooutstore"
    "github.com/khv1one/gooutstore/client/pgpq"
)

func main() {
    db, err := sql.Open("postgres", "your-database-connection-string")
    if err != nil {
        log.Fatal(err)
    }

    generator := gooutstore.NewGeneratorWithClient(pgpq.NewClient(db))

    ctx := context.Background()
    messages := []gooutstore.IOutboxMessage{
        ExampleMessage{ID: 1},
        ExampleMessage{ID: 2},
    }

    if err := generator.Send(ctx, messages...); err != nil {
        log.Fatal(err)
    }
}

type ExampleMessage struct {
    ID int
}

func (m *ExampleMessage) Key() string {
    return "key"
}

func (m *ExampleMessage) Type() string {
    return "type"
}

Interfaces

WIP

License

This project is licensed under the MIT License. See the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher[T IOutboxMessage] struct {
	// contains filtered or unexported fields
}

Dispatcher is responsible for dispatching outbox messages.

func NewDispatcher

func NewDispatcher[T IOutboxMessage](call func(context.Context, T) error, opts ...DispatcherOption[T]) *Dispatcher[T]

NewDispatcher creates a new Dispatcher with the provided options.

func NewDispatcherWithClient

func NewDispatcherWithClient[T IOutboxMessage](client IOutboxDispatcherClient, call func(context.Context, T) error, opts ...DispatcherOption[T]) *Dispatcher[T]

NewDispatcherWithClient creates a new Dispatcher with the provided client and options.

func (*Dispatcher[T]) Name

func (d *Dispatcher[T]) Name() string

Name returns the name of the dispatcher.

func (*Dispatcher[T]) Start

func (d *Dispatcher[T]) Start(startCtx context.Context) error

Start begins the dispatching process.

func (*Dispatcher[T]) Stop

func (d *Dispatcher[T]) Stop(_ context.Context) error

Stop halts the dispatching process.

type DispatcherOption

type DispatcherOption[T IOutboxMessage] func(*Dispatcher[T])

DispatcherOption represents a configuration option for the Dispatcher.

func WithBatchSize

func WithBatchSize[T IOutboxMessage](batchSize int) DispatcherOption[T]

WithBatchSize sets the batch size for reading messages.

func WithErrorLogger

func WithErrorLogger[T IOutboxMessage](logger Logger) DispatcherOption[T]

WithErrorLogger sets the logger for error messages.

func WithInterval

func WithInterval[T IOutboxMessage](interval time.Duration) DispatcherOption[T]

WithInterval sets the interval between dispatching attempts.

func WithMaxRetry

func WithMaxRetry[T IOutboxMessage](maxRetry int) DispatcherOption[T]

WithMaxRetry sets the maximum number of retries for a message.

type Generator

type Generator struct {
	// contains filtered or unexported fields
}

Generator is responsible for generating outbox messages.

func NewGenerator

func NewGenerator(opts ...GeneratorOption) *Generator

NewGenerator creates a new Generator with the provided options.

func NewGeneratorWithClient

func NewGeneratorWithClient(client IOutboxGeneratorClient, opts ...GeneratorOption) *Generator

NewGeneratorWithClient creates a new Generator with the provided client and options.

func (*Generator) Send

func (g *Generator) Send(ctx context.Context, messages ...IOutboxMessage) error

Send sends the provided messages using the Generator's client.

type GeneratorOption

type GeneratorOption func(*Generator)

GeneratorOption represents a configuration option for the Generator.

func WithGeneratorClient

func WithGeneratorClient(client IOutboxGeneratorClient) GeneratorOption

WithGeneratorClient sets the client for the Generator.

type IOutboxDispatcherClient

type IOutboxDispatcherClient interface {
	ReadBatch(ctx context.Context, messageType string, batchSize int) ([]Message, error)
	SetDone(ctx context.Context, m Message) error
	IncRetry(ctx context.Context, m Message) error
	SetBroken(ctx context.Context, m Message) error

	WithTransaction(ctx context.Context, fn func(context.Context) error) (err error)
}

IOutboxDispatcherClient defines the interface for the outbox dispatcher client.

type IOutboxGeneratorClient

type IOutboxGeneratorClient interface {
	Create(ctx context.Context, messages []Message) error
}

IOutboxGeneratorClient defines the interface for the outbox generator client.

type IOutboxMessage

type IOutboxMessage interface {
	Key() string
	Type() string
}

IOutboxMessage defines the interface for an outbox message.

type Logger

type Logger func(string, ...interface{})

Logger is a function type for logging messages.

type Message

type Message struct {
	ID           int64  // ID is the unique identifier of the message.
	MessageType  string // MessageType is the type of the message.
	Body         []byte // Body is the content of the message.
	AggregateKey string // AggregateKey is the key used to group related messages.
	Status       Status // Status is the current status of the message.
	RetryCount   int    // RetryCount is the number of times the message has been retried.
}

Message represents an outbox message.

type Status

type Status int

Status represents the status of an outbox message.

const (
	Pending  Status = iota // Pending indicates the message is pending processing.
	Retrying               // Retrying indicates the message is being retried.
	Done                   // Done indicates the message has been processed successfully.
	Broken                 // Broken indicates the message has failed and will not be retried.
)

Directories

Path Synopsis
gorm module
pgpq module
pgx module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL