nsq

package module
v1.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2021 License: MIT Imports: 21 Imported by: 0

README

nsq-go CircleCI Go Report Card GoDoc

Go package providing tools for building NSQ clients, servers and middleware.

Motivations

We ran into production issues with the standard nsq-go package where our workers would enter deadlock situations where they would stop consuming messages and just hang in endless termination loops. After digging through the code and trying to figure out how to address the problem it became clear that a rewrite was going to be faster than dealing with the amount of state synchronization that was done in nsq-go (multiple mutexes, channels and atomic variables involved).

This package is designed to offer less features than the standard nsq-go package and instead focus on simplicity and ease of maintenance and integration.

Consumer

package main

import (
    "github.com/xenking/nsq-go"
)

func main() {
    // Create a new consumer, looking up nsqd nodes from the listed nsqlookup
    // addresses, pulling messages from the 'world' channel of the 'hello' topic
    // with a maximum of 250 in-flight messages.
    consumer, _ := nsq.StartConsumer(nsq.ConsumerConfig{
        Topic:   "hello",
        Channel: "world",
        Lookup:  []string{
            "nsqlookup-001.service.local:4161",
            "nsqlookup-002.service.local:4161",
            "nsqlookup-003.service.local:4161",
        },
        MaxInFlight: 250,
    })

    // Consume messages, the consumer automatically connects to the nsqd nodes
    // it discovers and handles reconnections if something goes wrong.
    for msg := range consumer.Messages() {
        // handle the message, then call msg.Finish or msg.Requeue
        // ...
        msg.Finish()
    }
}

Producer

package main

import (
    "github.com/xenking/nsq-go"
)

func main() {
     // Starts a new producer that publishes to the TCP endpoint of a nsqd node.
     // The producer automatically handles connections in the background.
    producer, _ := nsq.StartProducer(nsq.ProducerConfig{
        Topic:   "hello",
        Address: "localhost:4150",
    })

    // Publishes a message to the topic that this producer is configured for,
    // the method returns when the operation completes, potentially returning an
    // error if something went wrong.
    producer.Publish([]byte("Hello World!"))

    // Stops the producer, all in-flight requests will be canceled and no more
    // messages can be published through this producer.
    producer.Stop()
}

Documentation

Index

Constants

View Source
const (
	DefaultUserAgent       = "github.com/xenking/nsq-go"
	DefaultMaxConcurrency  = 1
	DefaultMaxInFlight     = 1
	DefaultDialTimeout     = 5 * time.Second
	DefaultReadTimeout     = 1 * time.Minute
	DefaultWriteTimeout    = 10 * time.Second
	DefaultLookupTimeout   = 10 * time.Second
	DefaultMaxRetryTimeout = 10 * time.Second
	DefaultMinRetryTimeout = 10 * time.Millisecond
	DefaultDrainTimeout    = 10 * time.Second

	NoTimeout = time.Duration(0)
)
View Source
const CommandIdentify = "IDENTIFY"

Variables

This section is empty.

Functions

func NewTLSConfig added in v1.2.8

func NewTLSConfig(config TLSConfig) (*tls.Config, error)

func RateLimit

func RateLimit(limit int, messages <-chan Message) <-chan Message

RateLimit consumes messages from the messages channel and limits the rate at which they are produced to the channel returned by this function.

The limit is the maximum number of messages per second that are produced. No rate limit is applied if limit is negative or zero.

The returned channel is closed when the messages channel is closed.

Types

type Auth

type Auth struct {
	// Secret set for authentication.
	Secret string
}

Auth represents the AUTH command.

func (Auth) Name

func (c Auth) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Auth) Write

func (c Auth) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Client

type Client struct {
	http.Client
	Address   string
	Scheme    string
	UserAgent string
}

func (*Client) CreateChannel

func (c *Client) CreateChannel(topic string, channel string) error

func (*Client) CreateTopic

func (c *Client) CreateTopic(topic string) error

func (*Client) DeleteChannel

func (c *Client) DeleteChannel(topic string, channel string) error

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(topic string) error

func (*Client) EmptyChannel

func (c *Client) EmptyChannel(topic string, channel string) error

func (*Client) EmptyTopic

func (c *Client) EmptyTopic(topic string) error

func (*Client) MutliPublish

func (c *Client) MutliPublish(topic string, messages ...[]byte) (err error)

func (*Client) PauseChannel

func (c *Client) PauseChannel(topic string, channel string) error

func (*Client) PauseTopic

func (c *Client) PauseTopic(topic string) error

func (*Client) Ping

func (c *Client) Ping() error

func (*Client) Publish

func (c *Client) Publish(topic string, message []byte) (err error)

func (*Client) UnpauseChannel

func (c *Client) UnpauseChannel(topic string, channel string) error

func (*Client) UnpauseTopic

func (c *Client) UnpauseTopic(topic string) error

type Cls

type Cls struct {
}

Cls represents the CLS command.

func (Cls) Name

func (c Cls) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Cls) Write

func (c Cls) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Command

type Command interface {
	// Name returns the name of the command.
	Name() string

	// Write serializes the command to the given buffered output.
	Write(*bufio.Writer) error
}

The Command interface is implemented by types that represent the different commands of the NSQ protocol.

func ReadCommand

func ReadCommand(r *bufio.Reader) (cmd Command, err error)

ReadCommand reads a command from the buffered input r, returning it or an error if something went wrong.

if cmd, err := nsq.ReadCommand(r); err != nil {
	// handle the error
	...
} else {
	switch c := cmd.(type) {
	case nsq.Pub:
		...
	}
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func Dial

func Dial(addr string, identify Identify) (c *Conn, err error)

func DialTimeout

func DialTimeout(addr string, timeout time.Duration, identify Identify) (*Conn, error)

func NewConn

func NewConn(conn net.Conn) *Conn

func (*Conn) Close

func (c *Conn) Close() (err error)

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

func (*Conn) Read

func (c *Conn) Read(b []byte) (n int, err error)

func (*Conn) ReadCommand

func (c *Conn) ReadCommand() (cmd Command, err error)

func (*Conn) ReadFrame

func (c *Conn) ReadFrame() (frame Frame, err error)

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) (err error)

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) (err error)

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) (err error)

func (*Conn) Write

func (c *Conn) Write(b []byte) (n int, err error)

func (*Conn) WriteCommand

func (c *Conn) WriteCommand(cmd Command) (err error)

func (*Conn) WriteFrame

func (c *Conn) WriteFrame(frame Frame) (err error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(config ConsumerConfig) (c *Consumer, err error)

NewConsumer configures a new consumer instance.

func StartConsumer

func StartConsumer(config ConsumerConfig) (c *Consumer, err error)

StartConsumer creates and starts consuming from NSQ right away. This is the fastest way to get up and running.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan Message

func (*Consumer) Start

func (c *Consumer) Start()

Start explicitly begins consumption in case the consumer was initialized with NewConsumer instead of StartConsumer.

func (*Consumer) Stop

func (c *Consumer) Stop()

type ConsumerConfig

type ConsumerConfig struct {
	Topic        string
	Channel      string
	Address      string
	Lookup       []string
	MaxInFlight  int
	Identify     Identify
	TLS          TLSConfig
	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	DrainTimeout time.Duration
}

type Error

type Error string

Error is a frame type representing error responses to commands.

const (
	ErrInvalid      Error = "E_INVALID"
	ErrBadBody      Error = "E_BAD_BODY"
	ErrBadTopic     Error = "E_BAD_TOPIC"
	ErrBadChannel   Error = "E_BAD_CHANNEL"
	ErrBadMessage   Error = "E_BAD_MESSAGE"
	ErrPubFailed    Error = "E_PUB_FAILED"
	ErrMPubFailed   Error = "E_MPUB_FAILED"
	ErrFinFailed    Error = "E_FIN_FAILED"
	ErrReqFailed    Error = "E_REQ_FAILED"
	ErrTouchFailed  Error = "E_TOUCH_FAILED"
	ErrAuthFailed   Error = "E_AUTH_FAILED"
	ErrUnauthorized Error = "E_UNAUTHORIZED"
)

func (Error) Error

func (e Error) Error() string

String returns the error as a string, satisfies the error interface.

func (Error) FrameType

func (e Error) FrameType() FrameType

FrameType returns FrameTypeError, satisfies the Frame interface.

func (Error) String

func (e Error) String() string

String returns the error as a string.

func (Error) Write

func (e Error) Write(w *bufio.Writer) (err error)

Write serializes the frame to the given buffered output, satisfies the Frame interface.

type Fin

type Fin struct {
	// MessageID is the ID of the message to mark finished.
	MessageID MessageID
}

Fin represents the FIN command.

func (Fin) Name

func (c Fin) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Fin) Write

func (c Fin) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Frame

type Frame interface {
	// FrameType returns the code representing the frame type.
	FrameType() FrameType

	// Write serializes the frame to the given buffered output.
	Write(*bufio.Writer) error
}

The Frame interface is implemented by types that represent the different types of frames that a consumer may receive.

func ReadFrame

func ReadFrame(r *bufio.Reader) (frame Frame, err error)

ReadFrame reads a frame from the buffer input r, returning it or an error if something went wrong.

if frame, err := ReadFrame(r); err != nil {
	// handle the error
	...
} else {
	switch f := frame.(type) {
	case nsq.Message:
		...
	}
}

type FrameType

type FrameType int

FrameType is used to represent the different types of frames that a consumer may receive.

const (
	// FrameTypeResponse is the code for frames that carry success responses to
	// commands.
	FrameTypeResponse FrameType = 0

	// FrameTypeError is the code for frames that carry error responses to
	// commands.
	FrameTypeError FrameType = 1

	// FrameTypeMessage is the code for frames that carry messages.
	FrameTypeMessage FrameType = 2
)

func (FrameType) String

func (t FrameType) String() string

String returns a human-readable representation of the frame type.

type Identify

type Identify struct {
	// ClientID should be set to a unique identifier representing the client.
	ClientID string

	// Hostname represents the hostname of the client, by default it is set to
	// the value returned by os.Hostname is used.
	Hostname string

	// UserAgent represents the type of the client, by default it is set to
	// nsq.DefaultUserAgent.
	UserAgent string

	// TLSV1 can be set to configure the secure tcp with TLSV1, by default it is set to
	// false.
	TLSV1     bool
	TLSConfig *tls.Config

	// Compression Settings
	Deflate      bool
	DeflateLevel int
	Snappy       bool

	// MessageTimeout can bet set to configure the server-side message timeout
	// for messages delivered to this consumer.  By default it is not sent to
	// the server.
	MessageTimeout time.Duration
}

Identify represents the IDENTIFY command.

func (Identify) Name

func (c Identify) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Identify) Write

func (c Identify) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type IdentityResponse added in v1.2.8

type IdentityResponse struct {
	MaxRdyCount  int  `json:"max_rdy_count"`
	TLS          bool `json:"tls_v1"`
	Deflate      bool `json:"deflate"`
	Snappy       bool `json:"snappy"`
	AuthRequired bool `json:"auth_required"`
}

type LookupClient

type LookupClient struct {
	http.Client
	Addresses []string
	Scheme    string
	UserAgent string
}

func (*LookupClient) Lookup

func (c *LookupClient) Lookup(topic string) (result LookupResult, err error)

type LookupResult

type LookupResult struct {
	Channels  []string       `json:"channels"`
	Producers []ProducerInfo `json:"producers"`
}

type MPub

type MPub struct {
	// Topic must be set to the name of the topic to which the messages will be
	// published.
	Topic string

	// Messages is the list of raw messages to publish.
	Messages [][]byte
}

MPub represents the MPUB command.

func (MPub) Name

func (c MPub) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (MPub) Write

func (c MPub) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Message

type Message struct {
	// The ID of the message.
	ID MessageID

	// Attempts is set to the number of attempts made to deliver the message.
	Attempts uint16

	// Body contains the raw data of the message frame.
	Body []byte

	// Timestamp is the time at which the message was published.
	Timestamp time.Time
	// contains filtered or unexported fields
}

Message is a frame type representing a NSQ message.

func NewMessage

func NewMessage(id MessageID, body []byte, cmdChan chan<- Command) *Message

NewMessage is a helper for creating Message instances directly. A common use-case is for writing tests, generally you won't use this directly.

If you do use this, the Command channel is used internally to communicate message commands, such as "Finish" or "Requeue". When using this for testing, you can make a channel and inspect any message sent along it for assertions.

func (*Message) Complete

func (m *Message) Complete() bool

Complete will return a bool indicating whether Finish or Requeue has been called for this message.

func (*Message) Finish

func (m *Message) Finish()

Finish must be called on every message received from a consumer to let the NSQ server know that the message was successfully processed.

One of Finish or Requeue should be called on every message, and the methods will panic if they are called more than once.

func (Message) FrameType

func (m Message) FrameType() FrameType

FrameType returns FrameTypeMessage, satisfies the Frame interface.

func (*Message) Requeue

func (m *Message) Requeue(timeout time.Duration)

Requeue must be called on messages received from a consumer to let the NSQ server know that the message could not be proessed and should be retried. The timeout is the amount of time the NSQ server waits before offering this message again to its consumers.

One of Finish or Requeue should be called on every message, and the methods will panic if they are called more than once.

func (Message) Write

func (m Message) Write(w *bufio.Writer) (err error)

Write serializes the frame to the given buffered output, satisfies the Frame interface.

type MessageID

type MessageID uint64

MessageID is used to represent NSQ message IDs.

func ParseMessageID

func ParseMessageID(s string) (id MessageID, err error)

ParseMessageID attempts to parse s, which should be an hexadecimal representation of an 8 byte message ID.

func (MessageID) String

func (id MessageID) String() string

String returns the hexadecimal representation of the message ID as a string.

func (MessageID) WriteTo

func (id MessageID) WriteTo(w io.Writer) (int64, error)

WriteTo writes the message ID to w.

This method satisfies the io.WriterTo interface.

type Nop

type Nop struct {
}

Nop represents the NOP command.

func (Nop) Name

func (c Nop) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Nop) Write

func (c Nop) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer provide an abstraction around using direct connections to nsqd nodes to send messages.

func NewProducer

func NewProducer(config ProducerConfig) (p *Producer, err error)

NewProducer configures a new producer instance.

func StartProducer

func StartProducer(config ProducerConfig) (p *Producer, err error)

StartProducer starts and returns a new producer p, configured with the variables from the config parameter, or returning an non-nil error if some of the configuration variables were invalid.

func (*Producer) Connected

func (p *Producer) Connected() bool

Connected returns true if the producer has successfully established a connection to nsqd, false otherwise.

func (*Producer) Publish

func (p *Producer) Publish(message []byte) (err error)

Publish sends a message using the producer p, returning an error if it was already closed or if an error occurred while publishing the message.

Note that no retry is done internally, the producer will fail after the first unsuccessful attempt to publish the message. It is the responsibility of the caller to retry if necessary.

func (*Producer) PublishTo

func (p *Producer) PublishTo(topic string, message []byte) (err error)

PublishTo sends a message to a specific topic using the producer p, returning an error if it was already closed or if an error occurred while publishing the message.

Note that no retry is done internally, the producer will fail after the first unsuccessful attempt to publish the message. It is the responsibility of the caller to retry if necessary.

func (*Producer) Requests

func (p *Producer) Requests() chan<- ProducerRequest

Requests returns a write-only channel that can be used to submit requests to p.

This method is useful when the publish operation needs to be associated with other operations on channels in a select statement for example, or to publish in a non-blocking fashion.

func (*Producer) Start

func (p *Producer) Start()

Start explicitly begins the producer in case it was initialized with NewProducer instead of StartProducer.

func (*Producer) Stop

func (p *Producer) Stop()

Stop gracefully shutsdown the producer, cancelling all inflight requests and waiting for all backend connections to be closed.

It is safe to call the method multiple times and from multiple goroutines, they will all block until the producer has been completely shutdown.

type ProducerConfig

type ProducerConfig struct {
	Address        string
	Topic          string
	MaxConcurrency int
	Identify       Identify
	TLS            TLSConfig
	DialTimeout    time.Duration
	ReadTimeout    time.Duration
	WriteTimeout   time.Duration
}

ProducerConfig carries the different variables to tune a newly started producer.

type ProducerInfo

type ProducerInfo struct {
	BroadcastAddress string `json:"broadcast_address,omitempty"`
	RemoteAddress    string `json:"remote_address,omitempty"`
	Hostname         string `json:"hostname,omitempty"`
	Version          string `json:"version,omitempty"`
	TcpPort          int    `json:"tcp_port"`
	HttpPort         int    `json:"http_port"`
}

type ProducerRequest

type ProducerRequest struct {
	Topic    string
	Message  []byte
	Response chan<- error
	Deadline time.Time
}

ProducerRequest are used to represent operations that are submitted to producers.

type Pub

type Pub struct {
	// Topic must be set to the name of the topic to which the message will be
	// published.
	Topic string

	// Message is the raw message to publish.
	Message []byte
}

Pub represents the PUB command.

func (Pub) Name

func (c Pub) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Pub) Write

func (c Pub) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Rdy

type Rdy struct {
	// Count is the number of messages that a consumer is ready to accept.
	Count int
}

Rdy represents the RDY command.

func (Rdy) Name

func (c Rdy) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Rdy) Write

func (c Rdy) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Req

type Req struct {
	// MessageID is the ID of the message to requeue.
	MessageID MessageID

	// Timeout is the duration NSQ will wait for before sending this message
	// again to a client.
	Timeout time.Duration
}

Req represents the REQ command.

func (Req) Name

func (c Req) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Req) Write

func (c Req) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type Response

type Response string

Response is a frame type representing success responses to commands.

const (
	// OK is returned for most successful responses.
	OK Response = "OK"

	// CloseWait is the response sent to the CLS command.
	CloseWait Response = "CLOSE_WAIT"

	// Heartbeat is the response used by NSQ servers for health checks of the
	// connections.
	Heartbeat Response = "_heartbeat_"
)

func (Response) FrameType

func (r Response) FrameType() FrameType

FrameType returns FrameTypeResponse, satisfies the Frame interface.

func (Response) String

func (r Response) String() string

String returns the response as a string.

func (Response) Write

func (r Response) Write(w *bufio.Writer) (err error)

Write serializes the frame to the given buffered output, satisfies the Frame interface.

type Sub

type Sub struct {
	// Topic must be set to the name of the topic to subscribe to.
	Topic string

	// Channel must be set to the name of the channel to subscribe to.
	Channel string
}

Sub represents the SUB command.

func (Sub) Name

func (c Sub) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Sub) Write

func (c Sub) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type TLSConfig added in v1.2.8

type TLSConfig struct {
	RootFile string
	CertFile string
	KeyFile  string
}

type Touch

type Touch struct {
	// MessageID is the ID of the message that the timeout will be reset.
	MessageID MessageID
}

Touch represents the TOUCH command.

func (Touch) Name

func (c Touch) Name() string

Name returns the name of the command in order to satisfy the Command interface.

func (Touch) Write

func (c Touch) Write(w *bufio.Writer) (err error)

Write serializes the command to the given buffered output, satisfies the Command interface.

type UnknownFrame

type UnknownFrame struct {
	// Type is the type of the frame.
	Type FrameType

	// Data contains the raw data of the frame.
	Data []byte
}

UnknownFrame is used to represent frames of unknown types for which the library has no special implementation.

func (UnknownFrame) FrameType

func (f UnknownFrame) FrameType() FrameType

FrameType returns the code representing the frame type, satisfies the Frame interface.

func (UnknownFrame) Write

func (f UnknownFrame) Write(w *bufio.Writer) (err error)

Write serializes the frame to the given buffered output, satisfies the Frame interface.

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL