hop

package module
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: MIT Imports: 7 Imported by: 0

README

Hop - Biblioteca de Conexão RabbitMQ para Go

Go Report Card GoDoc License

Hop é uma biblioteca Go simples e resiliente para conexão com RabbitMQ, com suporte a auto-reconnect, graceful shutdown, métricas Prometheus e consumo de mensagens.

🚀 Instalação

go get github.com/KevenMarioN/hop

📦 Dependências

🔧 Uso Básico

Consumo de Mensagens
package main

import (
	"context"
	"fmt"
	"log"

	"github.com/KevenMarioN/hop"
	"github.com/KevenMarioN/hop/protocol"
	"github.com/rabbitmq/amqp091-go"
)

func main() {
	ctx := context.Background()

	// Cria conexão com RabbitMQ
	hopClient, err := hop.New(ctx, "amqp://user:pass@localhost:5672/")
	if err != nil {
		log.Fatalf("Failed to create connection: %v", err)
	}

	defer func() {
		if err := hopClient.Close(); err != nil {
			fmt.Print(err)
		}
	}()

	// Registra consumer
	err = hopClient.Consume(protocol.Consumer{
		Name:    "my-consumer",
		AutoAck: false,
		Queue: protocol.Queue{
			Name:    "my-queue",
			Durable: true,
		},
		Exec: func(ctx context.Context, msg amqp091.Delivery) error {
			defer func() {
				if err := msg.Ack(true); err != nil {
					fmt.Print(err)
				}
			}()

			fmt.Printf("Mensagem recebida: %s\n", string(msg.Body))

			return nil
		},
	})
	if err != nil {
		fmt.Printf("Failed to register consumer: %v", err)
		return
	}

	// Inicia consumers
	hopClient.StartConsumers(ctx)

	// Aguarda conclusão
	if err := hopClient.Shutdown(ctx); err != nil {
		fmt.Printf("Failed to shutdown: %v", err)
		return
	}
}

⚙️ Configuração Avançada

Opções de Conexão
import "github.com/KevenMarioN/hop/conn"

// Com nome de conexão personalizado
hopClient, err := hop.New(ctx, "amqp://user:pass@localhost:5672/",
	conn.WithConnectionName("meu-app"),
)
Múltiplos Consumers
// Consumer 1
err = hopClient.Consume(protocol.Consumer{
	Name:    "consumer-1",
	Queue: protocol.Queue{Name: "queue-1"},
	Exec: func(ctx context.Context, msg amqp091.Delivery) error {
		// Processa mensagem
		return nil
	},
})

// Consumer 2
err = hopClient.Consume(protocol.Consumer{
	Name:    "consumer-2",
	Queue: protocol.Queue{Name: "queue-2"},
	Exec: func(ctx context.Context, msg amqp091.Delivery) error {
		// Processa mensagem
		return nil
	},
})

// Inicia todos os consumers
hopClient.StartConsumers(ctx)
Graceful Shutdown
ctx := context.Background()
hopClient, err := hop.New(ctx, "amqp://user:pass@localhost:5672/")
if err != nil {
	panic(err)
}

// Registra consumers...

hopClient.StartConsumers(ctx)

// Aguarda conclusão de forma segura
if err := hopClient.Shutdown(ctx); err != nil {
	log.Error().Err(err).Msg("Falha no shutdown")
}

📚 API Reference

Client Interface
type Client interface {
	Consume(args protocol.Consumer) error
	StartConsumers(ctx context.Context)
	Shutdown(ctx context.Context) error
	Close() error
}
Opções de Configuração
import "github.com/KevenMarioN/hop/conn"

// Configurações disponíveis:
conn.WithConnectionName("my-app")          // Nome da conexão
conn.WithBackoff(2, 100*time.Millisecond, 30*time.Second) // Backoff
conn.WithTLS(tlsConfig)                   // TLS
conn.WithServiceName("my-service")        // Nome do serviço
conn.WithMetrics(collector)               // Métricas dinâmicas (interface)
conn.WithPrometheusMetrics(registry)      // Métricas Prometheus (conveniência)
Funções
New(ctx context.Context, url string, opts ...conn.HopOption) (Client, error)

Cria uma nova conexão com RabbitMQ.

Parâmetros:

  • ctx: Contexto para controle de vida útil
  • url: URL de conexão RabbitMQ (ex: amqp://user:pass@host:5672/)
  • opts: Opções de conexão (opcional)

Retorno:

  • Client: Interface do cliente Hop
  • error: Erro se a conexão falhar
Client.Consume(args protocol.Consumer) error

Registra um consumer para consumo de mensagens.

Parâmetros:

  • args: Configuração do consumer

Retorno:

  • error: Erro se o registro falhar
Client.StartConsumers(ctx context.Context)

Inicia todos os consumers registrados.

Parâmetros:

  • ctx: Contexto para controle de vida útil
Client.Shutdown(ctx context.Context) error

Encerra a conexão de forma segura, aguardando conclusão de todas as goroutines.

Parâmetros:

  • ctx: Contexto para controle de timeout

Retorno:

  • error: Erro se o shutdown falhar
Client.Close() error

Fecha a conexão RabbitMQ.

Retorno:

  • error: Erro se o fechamento falhar
Estruturas
protocol.Consumer

Configuração de um consumer.

type Consumer struct {
	Name      string                 // Nome do consumer
	Key       string                 // Chave de binding (para exchanges)
	AutoAck   bool                   // Auto-acknowledge
	NoLocal   bool                   // Não consumir mensagens publicadas localmente
	Exclusive bool                   // Consumer exclusivo
	NoWait    bool                   // Não aguardar confirmação
	Headers   map[string]any         // Headers personalizados
	Queue     Queue                  // Configuração da fila
	Exchange  *Exchange              // Configuração do exchange (opcional)
	Exec      Handler                // Função de processamento
}
protocol.Queue

Configuração da fila.

type Queue struct {
	Durable           bool            // Fila durável
	AutoDelete        bool            // Deletar automaticamente quando vazia
	Exclusive         bool            // Fila exclusiva para conexão
	NoWait            bool            // Não aguardar confirmação
	Name              string          // Nome da fila
	Headers           map[string]any  // Headers personalizados
	ShouldCreateQueue bool            // Flag para criar fila automaticamente
}
protocol.Exchange

Configuração do exchange.

type Exchange struct {
	Durable    bool            // Exchange durável
	AutoDelete bool            // Deletar automaticamente quando vazia
	Exclusive  bool            // Exchange exclusiva para conexão
	NoWait     bool            // Não aguardar confirmação
	Internal   bool            // Exchange interna
	Kind       Kind            // Tipo de exchange (fanout, topic, direct)
	Name       string          // Nome do exchange
	Headers    map[string]any  // Headers personalizados
}
protocol.Message

Wrapper para amqp.Delivery que permite extensões futuras da mensagem.

type Message struct {
    amqp091.Delivery
}

Campos principais incorporados:

  • Body: []byte com o payload da mensagem
  • Headers: map[string]interface{} com headers da mensagem
  • ContentType: string descrevendo o formato da mensagem
  • DeliveryTag: uint64 identificador do delivery
  • Exchange: string nome do exchange de origem
  • RoutingKey: string chave de roteamento usada para delivery
  • ConsumerTag: string identificador do consumer
  • MessageCount: uint32 número de mensagens restantes na fila
protocol.Handler

Função de processamento de mensagens.

type Handler func(ctx context.Context, msg Message) error
protocol.Kind

Tipos de exchange disponíveis.

const (
	Fanout  Kind = "fanout"
	Topic   Kind = "topic"
	Direct  Kind = "direct"
	Default Kind = ""
)

🛡️ Recursos

Auto-Reconnect

A biblioteca monitora a conexão e reconecta automaticamente em caso de falha.

Resilience

Implementa exponential backoff para reconexões, evitando sobrecarga do servidor.

Graceful Shutdown

Encerra conexões e goroutines de forma segura, garantindo que todas as mensagens em processamento sejam concluídas.

Logging Estruturado

Utiliza zerolog para logging estruturado e performático.

Métricas Dinâmicas

O sistema de métricas do Hop é totalmente desacoplado e suporta múltiplos backends:

  • Prometheus: metrics.NewPrometheusCollector(registry)
  • Multi-collector: metrics.NewMultiCollector(collector1, collector2)
  • No-op: metrics.NopCollector (desabilitado)

Ative com WithMetrics(collector) ou WithPrometheusMetrics(registry) para compatibilidade.

Exemplo com Prometheus
import (
    "github.com/KevenMarioN/hop"
    "github.com/KevenMarioN/hop/conn"
    "github.com/prometheus/client_golang/prometheus"
)

registry := prometheus.NewRegistry()
hopClient, err := hop.New(ctx, "amqp://user:pass@localhost:5672/",
    conn.WithPrometheusMetrics(registry),
)
Exemplo com múltiplos backends
import (
    "github.com/KevenMarioN/hop/metrics"
    "github.com/prometheus/client_golang/prometheus"
    "go.opentelemetry.io/otel/metric"
)

promCollector := metrics.NewPrometheusCollector(promRegistry)
otelCollector := metrics.NewOpenTelemetryCollector(otelMeter)
multi := metrics.NewMultiCollector(promCollector, otelCollector)

hopClient, err := hop.New(ctx, "amqp://user:pass@localhost:5672/",
    conn.WithMetrics(multi),
)
Métricas disponíveis
  • hop_messages_consumed_total (Counter): Total de mensagens consumidas
  • hop_consumption_errors_total (Counter): Total de erros de consumo
  • hop_reconnects_total (Counter): Total de reconexões
  • hop_connection_duration_seconds (Gauge): Duração da conexão atual
  • hop_active_consumers (Gauge): Número de consumers ativos
  • hop_message_processing_duration_seconds (Histogram): Duração do processamento de mensagens
ConsumerBuilder

API fluida para construção de consumers de forma type-safe e imutável:

consumer, err := protocol.NewConsumerBuilder("my-consumer").
    WithQueue(protocol.Queue{Name: "my-queue", Durable: true}).
    WithExchange(&protocol.Exchange{Name: "my-exchange", Kind: protocol.Direct}).
    WithHandler(func(ctx context.Context, msg amqp091.Delivery) error {
        // Processa mensagem
        return nil
    }).
    Build()

🧪 Testes

Execute os testes unitários:

go test ./...

Execute os testes com cobertura:

go test -cover ./...

📝 Exemplos

Veja o diretório examples/basic/ para exemplos completos de uso.

🤝 Contribuindo

Contribuições são bem-vindas! Por favor, siga estas diretrizes:

  1. Fork o repositório
  2. Crie uma branch para sua feature (git checkout -b feature/amazing-feature)
  3. Commit suas alterações (git commit -m 'Add amazing feature')
  4. Push para a branch (git push origin feature/amazing-feature)
  5. Abra um Pull Request

📄 Licença

Este projeto está licenciado sob a Licença MIT - veja o arquivo LICENSE para detalhes.

🙏 Agradecimentos

Documentation

Overview

Package hop provides a simple and resilient RabbitMQ client for Go.

It offers automatic reconnection, graceful shutdown, and message consumption with built-in metrics support via Prometheus.

Index

Constants

This section is empty.

Variables

View Source
var NopCollector = metrics.NopCollector

NopCollector is a no-op collector that discards all metrics. Useful for testing or when metrics are not needed.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Publish publishes a message to an exchange.
	// NOTE: Not implemented yet.
	Publish(ctx context.Context, exchange, key string, body []byte) error

	// Consume registers a consumer configuration.
	// The consumer will start processing when StartConsumers is called.
	Consume(args Consumer) error

	// StartConsumers begins message processing for all registered consumers.
	// This spawns goroutines and returns immediately.
	StartConsumers(ctx context.Context)

	// Close terminates the connection immediately without waiting for consumers.
	// For graceful shutdown, use Shutdown instead.
	Close() error

	// Shutdown gracefully stops all consumers and closes the connection.
	// It waits for active message processing to complete.
	Shutdown(ctx context.Context) error
}

Client is the main interface for interacting with RabbitMQ.

func New

func New(ctx context.Context, url string, opts ...HopOption) (Client, error)

New creates a new Hop client connected to RabbitMQ. - ctx: Context for connection lifecycle - url: AMQP connection URL (e.g., amqp://user:pass@host:5672/) - opts: Optional configuration (connection name, backoff, TLS, metrics) Returns a Client implementation or error if connection fails.

type Consumer added in v0.1.0

type Consumer = protocol.Consumer

type ConsumerBuilder added in v0.1.0

type ConsumerBuilder = protocol.ConsumerBuilder

type Counter added in v0.1.0

type Counter = metrics.Counter

Counter represents a counter metric.

type Exchange added in v0.1.0

type Exchange = protocol.Exchange

type Gauge added in v0.1.0

type Gauge = metrics.Gauge

Gauge represents a gauge metric.

type Handler added in v0.1.0

type Handler = protocol.Handler

type Histogram added in v0.1.0

type Histogram = metrics.Histogram

Histogram represents a histogram metric for measuring value distributions.

type HopOption added in v0.1.0

type HopOption = conn.HopOption

HopOption configures a hop connection using the functional options pattern.

func WithBackoff added in v0.1.0

func WithBackoff(multiplier float64, initialDelay, maxDelay time.Duration) HopOption

WithBackoff configures the reconnection backoff strategy. - multiplier: exponential factor (e.g., 2.0 doubles delay each attempt) - initialDelay: starting delay before first retry - maxDelay: maximum delay between retries (ceiling)

func WithConnectionName added in v0.1.0

func WithConnectionName(connectionName string) HopOption

WithConnectionName sets a custom name for the AMQP connection. This name appears in RabbitMQ management UI and logs. Useful for identifying connections in a multi-service environment.

func WithMetrics added in v0.1.0

func WithMetrics(collector MetricsCollector) HopOption

WithMetrics enables metrics collection using the provided collector. Pass a MetricsCollector implementation (e.g., PrometheusCollector, MultiCollector). If nil is passed, metrics will be disabled.

func WithPrometheusMetrics added in v0.1.0

func WithPrometheusMetrics(registry prometheus.Registerer) HopOption

WithPrometheusMetrics is a convenience wrapper for backward compatibility. It creates a PrometheusCollector with the given registry.

func WithServiceName added in v0.1.0

func WithServiceName(serviceName string) HopOption

WithServiceName sets the service_name property on the AMQP connection. This metadata helps with monitoring and debugging in distributed systems.

func WithTLS added in v0.1.0

func WithTLS(tls *tls.Config) HopOption

WithTLS enables TLS encryption for the AMQP connection. Provide a configured *tls.Config for secure communication.

type Kind added in v0.1.0

type Kind = protocol.Kind
const (
	Fanout  Kind = "fanout" // Fanout exchange broadcasts to all bound queues
	Topic   Kind = "topic"  // Topic exchange routes based on pattern matching
	Direct  Kind = "direct" // Direct exchange routes by exact routing key
	Default Kind = ""       // Default exchange (amq.direct)
)

Supported exchange types.

type Message added in v0.1.0

type Message = protocol.Message

type MetricsCollector added in v0.1.0

type MetricsCollector = metrics.MetricsCollector

MetricsCollector defines the interface for metrics collection. Implementations can be Prometheus, OpenTelemetry, or any other system.

func NewMultiCollector added in v0.1.0

func NewMultiCollector(collectors ...MetricsCollector) MetricsCollector

NewMultiCollector combines multiple collectors into one. Metrics are sent to all provided collectors.

func NewOpenTelemetryCollector added in v0.1.0

func NewOpenTelemetryCollector(serviceName string) MetricsCollector

NewOpenTelemetryCollector creates an OpenTelemetry-backed metrics collector. It uses the provided service name for metric identification.

func NewPrometheusCollector added in v0.1.0

func NewPrometheusCollector(registry prometheus.Registerer) MetricsCollector

NewPrometheusCollector creates a Prometheus-backed metrics collector. It registers metrics with the provided prometheus.Registerer.

type Queue added in v0.1.0

type Queue = protocol.Queue

Directories

Path Synopsis
cmd
examples
advanced command
basic command
internal
resilience
resilience/backoff.go
resilience/backoff.go

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL