ligo_microservices

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 17, 2026 License: MIT Imports: 14 Imported by: 0

README

ligo-microservices

RabbitMQ-based microservices messaging for Ligo, inspired by @nestjs/microservices.

Go Version License Tests Coverage

Install

go get github.com/linkeunid/ligo-microservices

Quick Start

package main

import (
    "context"
    "time"

    ligo_microservices "github.com/linkeunid/ligo-microservices"
    "github.com/linkeunid/ligo"
)

func AppModule() ligo.Module {
    return ligo.NewModule("app",
        ligo.Imports(
            ligo_microservices.RabbitMQModule(ligo_microservices.RabbitMQConfig{
                URL:      "amqp://guest:guest@localhost:5672/",
                Exchange: "ligo",
                Timeout:  5 * time.Second,
                Retry: ligo_microservices.RetryConfig{
                    MaxAttempts: 5,
                    Delay:       time.Second,
                    MaxDelay:    30 * time.Second,
                },
            }),
        ),
        ligo.Providers(
            ligo.HookedFactory(NewOrderService),
        ),
    )
}

RPC — Request/Response

Server (handler)
type OrderService struct {
    broker *ligo_microservices.Broker
}

func NewOrderService(b *ligo_microservices.Broker) *OrderService {
    return &OrderService{broker: b}
}

func (s *OrderService) Register(r *ligo.HookRegistry) {
    r.OnBootstrap(func() error {
        ligo_microservices.Handle[CreateOrderInput, *Order](
            s.broker, "orders.create", s.HandleCreate,
        )
        return nil
    })
}

func (s *OrderService) HandleCreate(ctx context.Context, input CreateOrderInput) (*Order, error) {
    return s.usecase.Create(ctx, input)
}
Client (caller)
order, err := ligo_microservices.Send[*Order](ctx, broker, "orders.create", CreateOrderInput{
    UserID: "123",
    Items:  []string{"widget"},
})

Events — Fire-and-Forget

Producer
err := ligo_microservices.Emit(ctx, broker, "user.created", UserCreated{
    UserID: "123",
    Name:   "Alice",
})
Consumer
ligo_microservices.On[UserCreated](broker, "user.created", func(ctx context.Context, input UserCreated) error {
    log.Printf("user created: %s", input.UserID)
    return nil
})
Wildcard Patterns
// Matches orders.create, orders.update, etc.
ligo_microservices.On[OrderEvent](broker, "orders.*", handler)

// Matches orders, orders.create, orders.item.create, etc.
ligo_microservices.On[OrderEvent](broker, "orders.#", handler)

Middleware Pipeline

Guards, pipes, interceptors, and exception filters — mirrors Ligo's HTTP middleware patterns.

hb := ligo_microservices.HandleBuilder[EchoInput, EchoOutput](broker, "test.echo").
    Guard(authGuard).
    Pipe(validationPipe).
    Intercept(loggingInterceptor).
    Filter(errorFilter)

ligo_microservices.BuilderAction(hb, func(ctx context.Context, input EchoInput) (EchoOutput, error) {
    return EchoOutput{Echo: input.Message}, nil
})

Execution order: Guards → Pipes → Interceptors → Handler → Exception Filters

For event handlers:

hb := ligo_microservices.OnBuilder[UserCreated](broker, "user.created").
    Guard(authGuard).
    Intercept(loggingInterceptor)

ligo_microservices.BuilderActionEvent(hb, func(ctx context.Context, input UserCreated) error {
    return processUser(input)
})
Types
type MessageGuard          func(ctx context.Context, msg Message) (bool, error)
type MessagePipe           func(ctx context.Context, msg Message) error
type MessageInterceptor    func(ctx context.Context, msg Message, next func() error) error
type MessageExceptionFilter func(err error, ctx context.Context, msg Message) error

Error Handling

// Return structured errors from handlers
return nil, ligo_microservices.NotFound("order not found")
return nil, ligo_microservices.Validation("user_id required")
return nil, ligo_microservices.Timeout("operation timed out")
return nil, ligo_microservices.Internal("database error")

// Inspect on the Send side
var brokerErr *ligo_microservices.BrokerError
if errors.As(err, &brokerErr) {
    switch brokerErr.Type {
    case "NotFound":
        // handle
    case "Validation":
        // handle
    }
}

Reconnection

Available since v0.2.0. On a server-initiated NotifyClose, the broker walks RetryConfig with exponential backoff, re-establishes the AMQP connection and channel, re-declares the exchange and queue, and re-binds every registered RPC and event pattern. Handlers survive the cycle without restart.

RabbitMQConfig{
    Retry: ligo_microservices.RetryConfig{
        MaxAttempts:  5,
        Delay:        time.Second,
        MaxDelay:     30 * time.Second,
        OnRetry:      func(attempt int, err error) {
            log.Printf("retry attempt %d: %v", attempt, err)
        },
        OnReconnect: func() {
            log.Println("reconnected to RabbitMQ")
        },
    },
}
In-flight Send calls

When the connection drops mid-request, Send returns ErrConnectionLost instead of waiting out the timeout (or panicking on the now-closed reply channel — fixed in v0.2.0). Callers should treat it the same way as a transient broker error:

result, err := microservices.Send[*Order](ctx, broker, "orders.get", req)
switch {
case errors.Is(err, microservices.ErrConnectionLost):
    // broker is reconnecting — retry with backoff
case errors.Is(err, microservices.ErrNotConnected):
    // broker has been shut down — do not retry
case err != nil:
    // protocol / handler error — surface to caller
}

User-initiated Shutdown flips an internal closed flag so the reconnect loop short-circuits — your app won't keep trying to dial a broker you're walking away from.

Codec

Default JSON. Optional Protobuf:

RabbitMQConfig{
    Codec: ligo_microservices.ProtobufCodec,
}

License

MIT

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotConnected is returned by Send when the broker has not yet
	// connected (or has been shut down).
	ErrNotConnected = errors.New("microservices: broker not connected")
	// ErrConnectionLost is returned to in-flight Send callers when the
	// broker's underlying AMQP connection drops mid-request. The reply
	// channel is drained as part of reconnect, so the caller gets a
	// determinate error instead of timing out.
	ErrConnectionLost = errors.New("microservices: connection lost during request")
)

Functions

func BuilderAction

func BuilderAction[T, R any](hb *handlerBuilder, handler func(ctx context.Context, input T) (R, error))

func BuilderActionEvent

func BuilderActionEvent[T any](hb *handlerBuilder, handler func(ctx context.Context, input T) error)

func Emit

func Emit(ctx context.Context, b *Broker, pattern string, payload any) error

func Handle

func Handle[T, R any](b *Broker, pattern string, handler func(ctx context.Context, input T) (R, error))

func HandleBuilder

func HandleBuilder[T, R any](b *Broker, pattern string) *handlerBuilder

func On

func On[T any](b *Broker, pattern string, handler func(ctx context.Context, input T) error)

On registers a typed event handler for the given pattern. Panics if a handler is already registered for the same pattern.

func OnBuilder

func OnBuilder[T any](b *Broker, pattern string) *handlerBuilder

func Provider

func Provider() ligo.Provider

Provider returns a ligo.Provider that registers a *Broker as a singleton. Use this when you need fine-grained control over module composition.

func RabbitMQModule

func RabbitMQModule(cfg RabbitMQConfig) ligo.Module

RabbitMQModule returns a ligo module that registers a *Broker provider with the given configuration. The broker connects during OnInit and disconnects during OnShutdown.

func Send

func Send[T any](ctx context.Context, b *Broker, pattern string, payload any) (T, error)

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

func NewBroker

func NewBroker(cfg RabbitMQConfig) *Broker

func (*Broker) Register

func (b *Broker) Register(r *ligo.HookRegistry)

type BrokerError

type BrokerError struct {
	Type    string
	Message string
}

BrokerError is a structured error that travels over the wire during RPC.

func Internal

func Internal(msg string) *BrokerError

Internal creates an "Internal" broker error.

func NotFound

func NotFound(msg string) *BrokerError

NotFound creates a "NotFound" broker error.

func Timeout

func Timeout(msg string) *BrokerError

Timeout creates a "Timeout" broker error.

func Validation

func Validation(msg string) *BrokerError

Validation creates a "Validation" broker error.

func (*BrokerError) Error

func (e *BrokerError) Error() string

type Codec

type Codec interface {
	Encode(any) ([]byte, error)
	Decode(data []byte, target any) error
}

Codec encodes and decodes message payloads.

var JSONCodec Codec = jsonCodec{}

JSONCodec uses encoding/json.

var ProtobufCodec Codec = protobufCodec{}

ProtobufCodec uses google.golang.org/protobuf. Values must implement proto.Message.

type HandlerBuilder

type HandlerBuilder interface {
	Guard(guards ...MessageGuard) *handlerBuilder
	Pipe(pipes ...MessagePipe) *handlerBuilder
	Intercept(interceptors ...MessageInterceptor) *handlerBuilder
	Filter(filters ...MessageExceptionFilter) *handlerBuilder
}

type Message

type Message struct {
	Pattern string
	Data    []byte
	ID      string
	Headers map[string]any
}

type MessageExceptionFilter

type MessageExceptionFilter func(err error, ctx context.Context, msg Message) error

type MessageGuard

type MessageGuard func(ctx context.Context, msg Message) (bool, error)

MessageGuard returns (true, nil) to allow, (false, error) to deny.

type MessageInterceptor

type MessageInterceptor func(ctx context.Context, msg Message, next func() error) error

MessageInterceptor wraps handler execution. Call next() to continue the chain.

type MessagePipe

type MessagePipe func(ctx context.Context, msg Message) error

type RabbitMQConfig

type RabbitMQConfig struct {
	URL      string
	Exchange string
	Queue    string
	Codec    Codec
	Timeout  time.Duration
	Retry    RetryConfig
}

RabbitMQConfig holds configuration for the RabbitMQ transport.

Queue controls the handler queue declaration:

  • "" (default): server-generated name, exclusive, auto-delete, non-durable. One queue per broker instance; gone when the process exits. Good for ephemeral RPC clients and single-process demos.
  • non-empty: the given name, durable, non-exclusive, non-auto-delete. Survives restarts and can be shared by multiple consumers (worker pool / competing consumers).

type RetryConfig

type RetryConfig struct {
	MaxAttempts int
	Delay       time.Duration
	MaxDelay    time.Duration
	OnRetry     func(attempt int, err error)
	OnReconnect func()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL