Documentation
¶
Index ¶
Constants ¶
const ( // MessageGroupIDKey is the metadata key for FIFO queue message grouping. // Compatible with AWS SQS FIFO queues (x-sqs-message-group-id). MessageGroupIDKey = "x-message-group-id" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a gRPC consumer to process gRPC requests from a queue. It implements grpc.ServiceRegistrar, allowing gRPC services to be registered and invoked.
func NewConsumer ¶
func NewConsumer(receiver Receiver, opts ...ConsumerOption) *Consumer
NewConsumer creates a Consumer that processes gRPC calls from a queue.
The Consumer implements grpc.ServiceRegistrar and allows standard gRPC service implementations to be registered. Messages are processed concurrently using errgroup.
Example:
consumer := grpcqueue.NewConsumer(
transport,
grpcqueue.WithServerInterceptor(authInterceptor),
)
pb.RegisterMyServiceServer(consumer, &MyServiceImpl{})
if err := consumer.Serve(ctx); err != nil {
log.Fatal(err)
}
func (*Consumer) RegisterService ¶
func (c *Consumer) RegisterService(sd *grpc.ServiceDesc, ss any)
RegisterService registers a service and its implementation to the gRPC consumer. Called from IDL generated code. This must be called before invoking Serve.
func (*Consumer) Serve ¶
Serve starts consuming messages until the context is canceled.
Messages are processed concurrently in separate goroutines using errgroup. Each message is automatically Acked on success or Nacked on error/panic for retry. The method blocks until the context is canceled or an unrecoverable error occurs.
Returns nil when context is canceled, or an error if message receiving fails.
type ConsumerInterceptor ¶
type ConsumerInterceptor grpc.UnaryServerInterceptor
ConsumerInterceptor is a gRPC unary server interceptor for the consumer.
type ConsumerOption ¶
type ConsumerOption func(*Consumer)
ConsumerOption configures a Consumer.
func WithServerInterceptor ¶
func WithServerInterceptor(interceptor grpc.UnaryServerInterceptor) ConsumerOption
WithServerInterceptor adds a server interceptor to the consumer.
type Message ¶
type Message struct {
// UUID is the unique identifier of the message.
UUID string
// Payload is the message content.
Payload []byte
// Metadata contains key-value pairs for message attributes.
Metadata map[string]string
// contains filtered or unexported fields
}
Message represents a queue message. It provides context handling, metadata, and acknowledgment support.
func NewMessage ¶
NewMessage creates a new Message with the given UUID and payload.
func (*Message) Ack ¶
func (m *Message) Ack()
Ack acknowledges the message, indicating successful processing. Safe to call multiple times; only the first call has effect.
func (*Message) Acked ¶
func (m *Message) Acked() <-chan struct{}
Acked returns a channel that is closed when the message is acknowledged.
func (*Message) Nack ¶
func (m *Message) Nack()
Nack negatively acknowledges the message, indicating failed processing. Safe to call multiple times; only the first call has effect.
func (*Message) Nacked ¶
func (m *Message) Nacked() <-chan struct{}
Nacked returns a channel that is closed when the message is negatively acknowledged.
func (*Message) SetContext ¶
SetContext sets the message's context.
type MessageGroupIDExtractor ¶
MessageGroupIDExtractor extracts a message group ID from a proto message. Used for FIFO queues or fair queuing in standard queues.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provides a way to make RPC calls over a queue system. It implements grpc.ClientConnInterface, making it a drop-in replacement for grpc.ClientConn.
func NewProducer ¶
func NewProducer(sender Sender, opts ...ProducerOption) *Producer
NewProducer creates a Producer that sends gRPC calls to a queue.
The Producer implements grpc.ClientConnInterface and can be used as a drop-in replacement for grpc.ClientConn with generated gRPC clients.
Example:
producer := grpcqueue.NewProducer(
transport,
grpcqueue.WithInterceptor(authInterceptor),
grpcqueue.WithMessageGroupIDExtractor(extractTenantID),
)
defer producer.Close()
client := pb.NewMyServiceClient(producer)
err := client.MyMethod(ctx, &pb.MyRequest{})
func (*Producer) Invoke ¶
func (c *Producer) Invoke(ctx context.Context, method string, args, reply any, opts ...grpc.CallOption) error
Invoke implements the grpc.ClientConnInterface Invoke method.
func (*Producer) NewStream ¶
func (c *Producer) NewStream(context.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error)
NewStream implements the grpc.ClientConnInterface NewStream method. Streaming is not supported over queues.
type ProducerInterceptor ¶
type ProducerInterceptor grpc.UnaryClientInterceptor
ProducerInterceptor is a gRPC unary client interceptor for the producer.
type ProducerOption ¶
type ProducerOption func(*Producer)
ProducerOption configures a Producer.
func WithInterceptor ¶
func WithInterceptor(interceptor grpc.UnaryClientInterceptor) ProducerOption
WithInterceptor adds a client interceptor to the producer.
func WithMessageGroupIDExtractor ¶
func WithMessageGroupIDExtractor(extractor MessageGroupIDExtractor) ProducerOption
WithMessageGroupIDExtractor configures FIFO grouping. The extractor function is called for each message to determine its group ID.
func WithMessageGroupIDKey ¶
func WithMessageGroupIDKey(key string) ProducerOption
WithMessageGroupIDKey sets the metadata key for message group ID. Default: "x-message-group-id"
type Receiver ¶
type Receiver interface {
// Receive returns a channel that emits messages until the context is canceled
// or the receiver is closed.
Receive(ctx context.Context) (<-chan *Message, error)
}
Receiver receives messages from a queue.