grpcqueue

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2026 License: MIT Imports: 16 Imported by: 0

README

GRPCQueue - Async gRPC over Message Queues

GRPCQueue - Async gRPC over Message Queues

CI Go Reference Go Report Card

GRPCQueue transforms synchronous gRPC calls into asynchronous queue-based messages, enabling reliable fire-and-forget RPC patterns with any message queue backend.

Overview

GRPCQueue allows you to:

  • Make gRPC calls that are queued instead of executed immediately
  • Use any message queue (SQS, RabbitMQ, Kafka, Redis) as transport
  • Maintain standard gRPC interfaces and generated client code
  • Add reliability, buffering, and decoupling to microservices
  • Process messages concurrently with configurable workers

Inspiration: Based on ppg/grpc-queue

Features

  • Transport Agnostic: Works with any message queue via pluggable Transport interface
  • Standard gRPC: Use generated client code without modifications
  • FIFO Support: Optional message grouping for ordered processing
  • Interceptors: Full support for gRPC client and server interceptors (auth, logging, tracing)
  • Error Handling: Automatic Ack/Nack with retry support
  • Concurrency: Process multiple messages in parallel with errgroup
  • Type Safety: Uses Protocol Buffers for message serialization

Usage

Producer (Client Side)

The Producer implements grpc.ClientConnInterface, making it a drop-in replacement for grpc.ClientConn.

import (
    "github.com/Aryon-Security/grpcqueue"
    pb "your/proto/package"
)

// Create transport that implements grpcqueue.Transport
transport := myCustomTransport

// Create producer with options
producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithInterceptor(yourClientInterceptor),
    grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
        // Extract grouping key for FIFO queues
        return extractTenantID(msg), nil
    }),
)
defer producer.Close()

// Use with generated gRPC client (no code changes needed!)
client := pb.NewYourServiceClient(producer)
err := client.YourMethod(ctx, &pb.YourRequest{...})
Consumer (Server Side)

The Consumer implements grpc.ServiceRegistrar, allowing you to register standard gRPC service implementations.

import (
    "github.com/Aryon-Security/grpcqueue"
    pb "your/proto/package"
)

// Create transport
transport := myCustomTransport

// Create consumer with options
consumer := grpcqueue.NewConsumer(
    transport,
    grpcqueue.WithServerInterceptor(yourServerInterceptor),
)

// Register your gRPC service implementation (standard code!)
pb.RegisterYourServiceServer(consumer, &YourServiceImpl{})

// Start consuming messages
ctx := context.Background()
if err := consumer.Serve(ctx); err != nil {
    log.Fatal(err)
}

Transport Interface

GRPCQueue uses a simple, pluggable Transport interface:

type Transport interface {
    Sender
    Receiver
    io.Closer
}

type Sender interface {
    Send(ctx context.Context, msgs ...*Message) error
}

type Receiver interface {
    Receive(ctx context.Context) (<-chan *Message, error)
}
Custom Transports

Implement the Sender and Receiver interfaces for custom message queues:

type MyCustomTransport struct {
    // your queue client
}

func (t *MyCustomTransport) Send(ctx context.Context, msgs ...*grpcqueue.Message) error {
    // Send messages to your queue
    return nil
}

func (t *MyCustomTransport) Receive(ctx context.Context) (<-chan *grpcqueue.Message, error) {
    // Return channel that emits messages
    ch := make(chan *grpcqueue.Message)
    go t.pollMessages(ctx, ch)
    return ch, nil
}

func (t *MyCustomTransport) Close() error {
    // Cleanup
    return nil
}

Message Format

Messages are serialized as JSON using Protocol Buffers' standard JSON mapping:

{
  "@type": "type.googleapis.com/infra.QueueItem",
  "service": "pkg.YourService",
  "method": "YourMethod",
  "payload": {
    "@type": "type.googleapis.com/pkg.YourRequest",
    "field1": "value1",
    "field2": 42
  }
}

The payload uses google.protobuf.Any for type-safe deserialization.

FIFO Queues and Message Grouping

For FIFO queues (like AWS SQS FIFO), use WithMessageGroupIDExtractor:

producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithMessageGroupIDExtractor(func(ctx context.Context, msg proto.Message) (string, error) {
        // Group by tenant for per-tenant ordering
        if req, ok := msg.(*pb.TenantRequest); ok {
            return req.TenantId, nil
        }
        return "", nil
    }),
)

The group ID is set as metadata with key x-message-group-id (AWS SQS compatible).

Interceptors

GRPCQueue fully supports gRPC interceptors for cross-cutting concerns:

// Client interceptor (auth, retries, logging)
clientInterceptor := func(ctx context.Context, method string, req, reply interface{},
    cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    // Add authentication, logging, etc.
    log.Printf("Sending %s", method)
    return invoker(ctx, method, req, reply, cc, opts...)
}

producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithInterceptor(clientInterceptor),
)

// Server interceptor (auth validation, logging, metrics)
serverInterceptor := func(ctx context.Context, req interface{},
    info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    // Validate auth, log requests, etc.
    log.Printf("Processing %s", info.FullMethod)
    return handler(ctx, req)
}

consumer := grpcqueue.NewConsumer(
    transport,
    grpcqueue.WithServerInterceptor(serverInterceptor),
)

Metadata is automatically propagated through the queue.

Error Handling

  • Producer: Returns errors immediately if queue publish fails
  • Consumer: Automatically Nacks failed messages for retry
  • Panics: Recovered and converted to Nacks
  • Context Cancellation: Gracefully stops processing in-flight messages
func (s *YourService) YourMethod(ctx context.Context, req *pb.Request) (*pb.Response, error) {
    // If this returns an error, message is Nacked for retry
    // If this panics, message is Nacked
    // If context is canceled, processing stops gracefully
    return processRequest(req)
}

Testing

Create an in-memory transport for testing:

// Create in-memory transport
transport := NewInMemoryTransport()

// Use in tests
producer := grpcqueue.NewProducer(transport)
consumer := grpcqueue.NewConsumer(transport)

Limitations

  • Unary RPCs Only: Streaming RPCs are not supported (returns error)
  • Fire-and-Forget: Producer doesn't receive responses (replies are ignored)
  • No Load Balancing: Use queue consumer groups for horizontal scaling
  • Message Size: Limited by underlying queue.

Performance Considerations

  • Concurrency: Consumer uses errgroup to process messages in parallel
  • Batching: Some transports support batch sends (check transport docs)
  • Latency: Adds queue latency (~10-100ms depending on transport)
  • Throughput: Limited by queue throughput (SQS: 3000 msg/s, Kafka: much higher)

Dependencies

  • google.golang.org/grpc - gRPC framework
  • google.golang.org/protobuf - Protocol Buffers
  • golang.org/x/sync/errgroup - Concurrent message processing

Contributing

See CONTRIBUTING.md for guidelines.

License

MIT License

Documentation

Index

Constants

View Source
const (
	// MessageGroupIDKey is the metadata key for FIFO queue message grouping.
	// Compatible with AWS SQS FIFO queues (x-sqs-message-group-id).
	MessageGroupIDKey = "x-message-group-id"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a gRPC consumer to process gRPC requests from a queue. It implements grpc.ServiceRegistrar, allowing gRPC services to be registered and invoked.

func NewConsumer

func NewConsumer(receiver Receiver, opts ...ConsumerOption) *Consumer

NewConsumer creates a Consumer that processes gRPC calls from a queue.

The Consumer implements grpc.ServiceRegistrar and allows standard gRPC service implementations to be registered. Messages are processed concurrently using errgroup.

Example:

consumer := grpcqueue.NewConsumer(
    transport,
    grpcqueue.WithServerInterceptor(authInterceptor),
)

pb.RegisterMyServiceServer(consumer, &MyServiceImpl{})

if err := consumer.Serve(ctx); err != nil {
    log.Fatal(err)
}

func (*Consumer) Close

func (c *Consumer) Close() error

Close closes the underlying receiver.

func (*Consumer) RegisterService

func (c *Consumer) RegisterService(sd *grpc.ServiceDesc, ss any)

RegisterService registers a service and its implementation to the gRPC consumer. Called from IDL generated code. This must be called before invoking Serve.

func (*Consumer) Serve

func (c *Consumer) Serve(ctx context.Context) error

Serve starts consuming messages until the context is canceled.

Messages are processed concurrently in separate goroutines using errgroup. Each message is automatically Acked on success or Nacked on error/panic for retry. The method blocks until the context is canceled or an unrecoverable error occurs.

Returns nil when context is canceled, or an error if message receiving fails.

func (*Consumer) ServeOnce

func (c *Consumer) ServeOnce(ctx context.Context) error

ServeOnce handles only one message and then exits.

type ConsumerInterceptor

type ConsumerInterceptor grpc.UnaryServerInterceptor

ConsumerInterceptor is a gRPC unary server interceptor for the consumer.

type ConsumerOption

type ConsumerOption func(*Consumer)

ConsumerOption configures a Consumer.

func WithServerInterceptor

func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ConsumerOption

WithServerInterceptor adds a server interceptor to the consumer.

type Message

type Message struct {
	// UUID is the unique identifier of the message.
	UUID string

	// Payload is the message content.
	Payload []byte

	// Metadata contains key-value pairs for message attributes.
	Metadata map[string]string
	// contains filtered or unexported fields
}

Message represents a queue message. It provides context handling, metadata, and acknowledgment support.

func NewMessage

func NewMessage(uuid string, payload []byte) *Message

NewMessage creates a new Message with the given UUID and payload.

func (*Message) Ack

func (m *Message) Ack()

Ack acknowledges the message, indicating successful processing. Safe to call multiple times; only the first call has effect.

func (*Message) Acked

func (m *Message) Acked() <-chan struct{}

Acked returns a channel that is closed when the message is acknowledged.

func (*Message) Context

func (m *Message) Context() context.Context

Context returns the message's context.

func (*Message) Nack

func (m *Message) Nack()

Nack negatively acknowledges the message, indicating failed processing. Safe to call multiple times; only the first call has effect.

func (*Message) Nacked

func (m *Message) Nacked() <-chan struct{}

Nacked returns a channel that is closed when the message is negatively acknowledged.

func (*Message) SetContext

func (m *Message) SetContext(ctx context.Context)

SetContext sets the message's context.

type MessageGroupIDExtractor

type MessageGroupIDExtractor func(ctx context.Context, msg proto.Message) (string, error)

MessageGroupIDExtractor extracts a message group ID from a proto message. Used for FIFO queues or fair queuing in standard queues.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provides a way to make RPC calls over a queue system. It implements grpc.ClientConnInterface, making it a drop-in replacement for grpc.ClientConn.

func NewProducer

func NewProducer(sender Sender, opts ...ProducerOption) *Producer

NewProducer creates a Producer that sends gRPC calls to a queue.

The Producer implements grpc.ClientConnInterface and can be used as a drop-in replacement for grpc.ClientConn with generated gRPC clients.

Example:

producer := grpcqueue.NewProducer(
    transport,
    grpcqueue.WithInterceptor(authInterceptor),
    grpcqueue.WithMessageGroupIDExtractor(extractTenantID),
)
defer producer.Close()

client := pb.NewMyServiceClient(producer)
err := client.MyMethod(ctx, &pb.MyRequest{})

func (*Producer) Close

func (c *Producer) Close() error

Close closes the underlying sender.

func (*Producer) Invoke

func (c *Producer) Invoke(ctx context.Context, method string, args, reply any, opts ...grpc.CallOption) error

Invoke implements the grpc.ClientConnInterface Invoke method.

func (*Producer) NewStream

NewStream implements the grpc.ClientConnInterface NewStream method. Streaming is not supported over queues.

type ProducerInterceptor

type ProducerInterceptor grpc.UnaryClientInterceptor

ProducerInterceptor is a gRPC unary client interceptor for the producer.

type ProducerOption

type ProducerOption func(*Producer)

ProducerOption configures a Producer.

func WithInterceptor

func WithInterceptor(interceptor grpc.UnaryClientInterceptor) ProducerOption

WithInterceptor adds a client interceptor to the producer.

func WithMessageGroupIDExtractor

func WithMessageGroupIDExtractor(extractor MessageGroupIDExtractor) ProducerOption

WithMessageGroupIDExtractor configures FIFO grouping. The extractor function is called for each message to determine its group ID.

func WithMessageGroupIDKey

func WithMessageGroupIDKey(key string) ProducerOption

WithMessageGroupIDKey sets the metadata key for message group ID. Default: "x-message-group-id"

type Receiver

type Receiver interface {
	// Receive returns a channel that emits messages until the context is canceled
	// or the receiver is closed.
	Receive(ctx context.Context) (<-chan *Message, error)
}

Receiver receives messages from a queue.

type Sender

type Sender interface {
	// Send sends one or more messages to the queue.
	// The context can be used for cancellation and timeout.
	Send(ctx context.Context, msgs ...*Message) error
}

Sender sends messages to a queue.

type Transport

type Transport interface {
	Sender
	Receiver
	io.Closer
}

Transport defines message queue transport abstraction. It combines Sender, Receiver, and Closer interfaces for complete transport functionality.

Directories

Path Synopsis
internal
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL