README
¶
Apache Pulsar Golang Client Library
An alternative Golang client library for the Apache Pulsar project.
Benefits over other Pulsar Go libraries
- Faster message processing
- Pure Golang, works without use of Cgo
- Idiomatic and cleaner Go
- Better stability
- Allows specifying of initial positions for topic pattern subscriptions
- Higher test coverage
- Pluggable logger interface
Status
The library is in an early state of development, the API is not stable yet. Any help or input is welcome.
Alternative libraries
-
apache/pulsar-client-go the official Golang Client that inspired the creation of this alternative Client.
-
apache/pulsar/pulsar-client-go Cgo based Client library that will be deprecated.
-
Comcast/pulsar-client-go an older Client that appears to not be maintained anymore and lacking features like Batching.
Documentation
¶
Overview ¶
Package pulsar implements a Apache Pulsar Client.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) CloseConsumer(consumerID uint64) error
- func (c *Client) CloseProducer(producerID uint64) error
- func (c *Client) Dial(ctx context.Context) error
- func (c *Client) NewConsumer(ctx context.Context, config ConsumerConfig) (Consumer, error)
- func (c *Client) NewProducer(ctx context.Context, config ProducerConfig) (*Producer, error)
- func (c *Client) Topics(namespace string) ([]*Topic, error)
- type ClientOption
- type Consumer
- type ConsumerConfig
- type InitialPosition
- type InitialPositionCallback
- type Logger
- type Message
- type MessageID
- type Producer
- type ProducerConfig
- type SubscriptionType
- type Topic
Constants ¶
const ( ExclusiveSubscription = SubscriptionType(pb.CommandSubscribe_Exclusive) )
Subscription type options.
const ( // LatestPosition starts reading from the topic end, only getting // messages published after the reader was created. LatestPosition = InitialPosition(pb.CommandSubscribe_Latest) // EarliestPosition starts reading from the earliest message // available in the topic. EarliestPosition = InitialPosition(pb.CommandSubscribe_Earliest) )
Subscription initial position options.
const (
DefaultNamespace = "default"
)
...
Variables ¶
var ErrClientClosing = errors.New("client is closing connections")
ErrClientClosing is returned when a new consumer or producer should be created while the client is closing all connections.
var ErrConsumerOfMessageNotFound = errors.New("consumer of message not found")
ErrConsumerOfMessageNotFound is returned when the consumer for a received message is not found.
var ErrNetClosing = errors.New("use of closed network connection")
ErrNetClosing is returned when a network descriptor is used after it has been closed.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client implements a Pulsar client.
func NewClient ¶
func NewClient(serverURL string, opts ...ClientOption) (*Client, error)
NewClient creates a new Pulsar client.
func (*Client) CloseConsumer ¶
CloseConsumer closes a specific consumer.
func (*Client) CloseProducer ¶
CloseProducer closes a specific producer.
func (*Client) Dial ¶
Dial connects to the Pulsar server. This needs to be called before a Consumer or Producer can be created.
func (*Client) NewConsumer ¶
NewConsumer creates a new Consumer, returning after the connection has been made. nolint: ireturn
func (*Client) NewProducer ¶
NewProducer creates a new Producer, returning after the connection has been made.
type Consumer ¶
type Consumer interface { // Close closes the subscription and unregisters from the Client. Close() error AckMessage(*Message) error // ReadMessage reads and return the next message from the Pulsar. ReadMessage(context.Context) (*Message, error) SeekMessage(*Message) error // HasNext returns whether there is a message available to read HasNext() bool // LastMessageID returns the last message ID of the topic. // If the topic is empty, EntryId will be math.MaxUint64 LastMessageID() (*MessageID, error) }
Consumer provides a high-level API for consuming messages from Pulsar.
type ConsumerConfig ¶
type ConsumerConfig struct { // The topic name to read messages from. Topic string // A regular expression for topics to read messages from. TopicPattern string // Interval in ms in which the client checks for topic changes // that match the set topic pattern and updates the subscriptions. // Default is 30000 TopicPatternDiscoveryInterval int // A unique name for the subscription. If not specified, a random name // will be used. Subscription string // A unique name for the Consumer. If not specified, a random name // will be used. Name string // Select the subscription type to be used when subscribing to the topic. // Default is `Subscribe_Exclusive` Type SubscriptionType // Signal whether the subscription will initialize on latest // or earliest position. InitialPosition InitialPosition // Callback function for every discovered topic when using a topic // pattern to allow the client to specify an initial position and // start message ID for the topic. InitialPositionCallback InitialPositionCallback // If specified, the subscription will position the cursor // on the particular message id and will send messages from // that point. StartMessageID []byte // Include the message StartMessageID in the read messages. // If StartMessageID is not set but InitialPosition is set // to LatestPosition, the latest message ID of the topic // will be sent. StartMessageIDInclusive bool // Signal whether the subscription should be backed by a // durable cursor or not. For Readers, set to false, for // Consumers set Durable to true and specify a Subscription. // If Durable is true, StartMessageID will be ignored, as it // will be determined by the broker. Durable bool // If true, the subscribe operation will cause a topic to be // created if it does not exist already (and if topic auto-creation // is allowed by broker. // If false, the subscribe operation will fail if the topic // does not exist. ForceTopicCreation bool // MessageChannel sets a channel that receives all messages that the // consumer receives. If not set, a default channel for 1000 messages // will be created. MessageChannel chan *Message }
ConsumerConfig is a configuration object used to create new instances of Consumer.
func (*ConsumerConfig) Validate ¶
func (config *ConsumerConfig) Validate() error
Validate method validates the config properties.
type InitialPositionCallback ¶ added in v0.1.3
type InitialPositionCallback func(topic string) (position InitialPosition, StartMessageID []byte, err error)
InitialPositionCallback declares a callback that allows a client to specify a start position or message for every discovered topic when using topic pattern subscriptions.
type Logger ¶
type Logger interface { Debugf(format string, args ...interface{}) Errorf(format string, args ...interface{}) }
Logger ...
type Message ¶
type Message struct { Body []byte Topic string ID *MessageID // contains filtered or unexported fields }
Message is a data structure representing Pulsar messages.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer provides a high-level API for sending messages to Pulsar.
func (*Producer) WriteMessage ¶
WriteMessage puts the message into the message queue, blocks until the message has been sent and an acknowledgement message is received from Pulsar.
func (*Producer) WriteMessageAsync ¶
WriteMessageAsync puts the message into the message queue. If the message queue is full, this function will block until it can write to the queue. The queue size can be specified in the Producer options.
type ProducerConfig ¶
type ProducerConfig struct { // The topic to write messages to. Topic string // The name of the producer. Name string // Limit on how many messages will be buffered before being sent as a batch. // // The default is a batch size of 100 messages. BatchSize int // Time limit on how often a batch that is not full yet will be flushed and // sent to Pulsar. // // The default is to flush every second. BatchTimeout time.Duration // Capacity of the internal producer message queue. // // The default is to use a queue capacity of 1000 messages. QueueCapacity int }
ProducerConfig is a configuration object used to create new instances of Producer.
func (*ProducerConfig) Validate ¶
func (config *ProducerConfig) Validate() error
Validate method validates the config properties.