pulsar

package module
Version: v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2019 License: Apache-2.0 Imports: 14 Imported by: 7

README

pulsar-client-go

A Go client library for the Apache Pulsar project.

GoDoc

Alternatives

The Pulsar project contains a Go client library that is a wrapper for the Pulsar C++ client library.

In comparison, this library is 100% Go (no cgo required). Outside the Go standard library, it has a single dependency on the golang/protbuf library.

Status & Goals

Status

This client is a work-in-progress and as such does not support all Pulsar features. It supports Pulsar 2.0 along with 1.22.

The following is an incomplete list of features that are not yet implemented:

  • Batch frame support
  • Payload compression support
  • Partitioned topics support
  • Athenz authentication support
  • Encryption support

Goals

  • 100% Go
  • Simplicity

Installation

go get -u github.com/Comcast/pulsar-client-go

Note: The package name is pulsar

Example

An example of a producer and consumer can be seen in the included cli application.

Contributions

Contributions are welcome. Please create an issue before beginning work on major contributions. Refer to the CONTRIBUTING.md doc for more information.

Local Development

Integration Tests

Integration tests are provided that connect to a Pulsar sever. They are best run against a local instance of Pulsar, since they expect the standalone properties to exist. See below for instructions on installing Pulsar locally.

Integration tests will be run when provided the pulsar flag with the address of the Pulsar server to connect to. Example:

go test -v -pulsar "localhost:6650"

Protobuf

The Makefile target api/PulsarApi.pb.go will generate the required .go files using the Pulsar source's .proto files.

Usage:

$ make api/PulsarApi.pb.go

Local Pulsar

Notes on installing Pulsar locally.

Prereqs:

  • For Java8 on OSX, use these instructions stackoverflow

  • Checkout source from github

    git clone git@github.com:apache/incubator-pulsar.git
    

    ``

  • Switch to desired tag, eg v1.22.1-incubating

  • Install Maven

    brew install maven
    

    ``

  • Compile (full instructions)

    mvn install -DskipTests
    

    ``

Launch Pulsar from Pulsar directory:

./bin/pulsar standalone --wipe-data --advertised-address localhost

Local Pulsar + TLS

The Makefile has various targets to support certificate generation, Pulsar TLS configuration, and topic setup:

  • Generate certificates for use by brokers, admin tool, and applications:

    make certificates
    
    
     This will create `broker`, `admin`, and `app` private/public pairs in the certs directory.
    
    
  • Generate configuration files for running Pulsar standalone and pulsar-admin with TLS enabled using generated certificates:

    make pulsar-tls-conf
    
    
     This will generate `pulsar-conf/standalone.tls.conf` and `pulsar-conf/client.tls.conf` files that can be used as the configurations
     for the standalone server and `pulsar-admin` tools respectively. They'll use the certificates in the `certs` directory. The files should
     be placed in the appropriate locations for use with those tools (probably the `conf` directory within the Pulsar directory). It's recommended
     to use symbolic-links to easily switch between configurations.
    
    
  • Setup sample topic on standalone server with TLS enabled:

    make standalone-tls-ns
    
    
     This will create a `sample/standalone/ns1` topic. The `app` certificate will have `produce`, `consume` rights on the topic.
    
    

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Documentation

Overview

Package pulsar implements a Pulsar client.

It exposes a lower-level Client type, and higher-level Managed* types that take care of reconnects, pings, topic lookup etc.

Index

Constants

View Source
const (
	// ProtoVersion is the Pulsar protocol version
	// used by this client.
	ProtoVersion = int32(api.ProtocolVersion_v12)
	// ClientVersion is an opaque string sent
	// by the client to the server on connect, eg:
	// "Pulsar-Client-Java-v1.15.2"
	ClientVersion = "pulsar-client-go"
)

Variables

View Source
var ErrClosedProducer = errors.New("producer is closed")

ErrClosedProducer is returned when attempting to send from a closed Producer.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a Pulsar client, capable of sending and receiving messages and managing the associated state.

func NewClient

func NewClient(cfg ClientConfig) (*Client, error)

NewClient returns a Pulsar client for the given configuration options.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection. The channel returned from `Closed` will unblock. The client should no longer be used after calling Close.

func (*Client) Closed

func (c *Client) Closed() <-chan struct{}

Closed returns a channel that unblocks when the client's connection has been closed and is no longer usable. Users should monitor this channel and recreate the Client if closed. TODO: Rename to Done

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)

Connect sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is not set in the CONNECT message. See ConnectTLS for TLS auth method. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw

func (*Client) ConnectTLS

func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)

ConnectTLS sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is set to tls in the CONNECT message. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw

func (*Client) LookupTopic

func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)

LookupTopic returns metadata about the given topic. Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.

The command has to be used in a connection that has already gone through the Connect / Connected initial handshake. See "Topic lookup" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-rxds6i

func (*Client) NewExclusiveConsumer

func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan Message) (*Consumer, error)

NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) NewFailoverConsumer

func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)

NewFailoverConsumer creates a new failover consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) NewProducer

func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*Producer, error)

NewProducer creates a new producer capable of sending message to the given topic.

func (*Client) NewSharedConsumer

func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)

NewSharedConsumer creates a new shared consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping sends a PING message to the Pulsar server, then waits for either a PONG response or the context to timeout.

type ClientConfig

type ClientConfig struct {
	Addr string // pulsar broker address. May start with pulsar://

	DialTimeout time.Duration // timeout to use when establishing TCP connection
	TLSConfig   *tls.Config   // TLS configuration. May be nil, in which case TLS will not be used
	Errs        chan<- error  // asynchronous errors will be sent here. May be nil
	// contains filtered or unexported fields
}

ClientConfig is used to configure a Pulsar client.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer handles all consumer related state.

func (*Consumer) Ack

func (c *Consumer) Ack(msg Message) error

Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

Close closes the consumer. The channel returned from the Closed method will then unblock upon successful closure.

func (*Consumer) Closed

func (c *Consumer) Closed() <-chan struct{}

Closed returns a channel that will block _unless_ the consumer has been closed, in which case the channel will have been closed and unblocked.

func (*Consumer) ConnClosed

func (c *Consumer) ConnClosed() <-chan struct{}

ConnClosed unblocks when the consumer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the consumer.

func (*Consumer) Flow

func (c *Consumer) Flow(permits uint32) error

Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accumulate these messages before the application is ready to consume them. After the consumer is ready, the client needs to give permission to the broker to push messages.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan Message

Messages returns a read-only channel of messages received by the consumer. The channel will never be closed by the consumer.

func (*Consumer) ReachedEndOfTopic

func (c *Consumer) ReachedEndOfTopic() <-chan struct{}

ReachedEndOfTopic unblocks whenever the topic has been "terminated" and all the messages on the subscription were acknowledged.

func (*Consumer) RedeliverOverflow

func (c *Consumer) RedeliverOverflow(ctx context.Context) (int, error)

RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003

func (*Consumer) RedeliverUnacknowledged

func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error

RedeliverUnacknowledged uses the protocol option REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages.

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(ctx context.Context) error

Unsubscribe the consumer from its topic.

type ErrUnexpectedMsg

type ErrUnexpectedMsg struct {
	// contains filtered or unexported fields
}

ErrUnexpectedMsg is returned when an unexpected message is received.

func (*ErrUnexpectedMsg) Error

func (e *ErrUnexpectedMsg) Error() string

Error satisfies the error interface.

type ManagedClient

type ManagedClient struct {
	// contains filtered or unexported fields
}

ManagedClient wraps a Client with re-connect and connection management logic.

func NewManagedClient

func NewManagedClient(cfg ManagedClientConfig) *ManagedClient

NewManagedClient returns a ManagedClient for the given address. The Client will be created and monitored in the background.

func (*ManagedClient) Done

func (m *ManagedClient) Done() <-chan struct{}

Done returns a channel that unblocks when the ManagedClient has been closed.

func (*ManagedClient) Get

func (m *ManagedClient) Get(ctx context.Context) (*Client, error)

Get returns the managed Client in a thread-safe way. If the client is temporarily unavailable, Get will block until either it becomes available or the context expires.

There is no guarantee that the returned Client will be connected or stay connected.

func (*ManagedClient) Stop

func (m *ManagedClient) Stop() error

Stop closes the Client if possible, and/or stops it from re-connecting. The ManagedClient shouldn't be used after calling Stop.

type ManagedClientConfig

type ManagedClientConfig struct {
	ClientConfig

	PingFrequency         time.Duration // how often to PING server
	PingTimeout           time.Duration // how long to wait for PONG response
	ConnectTimeout        time.Duration // how long to wait for CONNECTED response
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Client
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Client
}

ManagedClientConfig is used to configure a ManagedClient.

type ManagedClientPool

type ManagedClientPool struct {
	// contains filtered or unexported fields
}

ManagedClientPool provides a thread-safe store for ManagedClients, based on their address.

func NewManagedClientPool

func NewManagedClientPool() *ManagedClientPool

NewManagedClientPool initializes a ManagedClientPool.

func (*ManagedClientPool) ForTopic

ForTopic performs topic lookup for the given topic and returns the ManagedClient for the discovered topic information. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-6g0lo incubator-pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java

func (*ManagedClientPool) Get

Get returns the ManagedClient for the given client configuration. First the cache is checked for an existing client. If one doesn't exist, a new one is created and cached, then returned.

type ManagedConsumer

type ManagedConsumer struct {
	// contains filtered or unexported fields
}

ManagedConsumer wraps a Consumer with reconnect logic.

func NewManagedConsumer

func NewManagedConsumer(cp *ManagedClientPool, cfg ManagedConsumerConfig) *ManagedConsumer

NewManagedConsumer returns an initialized ManagedConsumer. It will create and recreate a Consumer for the given discovery address and topic on a background goroutine.

func (*ManagedConsumer) Ack

func (m *ManagedConsumer) Ack(ctx context.Context, msg Message) error

Ack acquires a consumer and sends an ACK message for the given message.

func (*ManagedConsumer) Close added in v0.1.1

func (m *ManagedConsumer) Close(ctx context.Context) error

Close consumer

func (*ManagedConsumer) Monitor added in v0.1.1

func (m *ManagedConsumer) Monitor() func()

Monitor a scoped deferrable lock

func (*ManagedConsumer) Receive

func (m *ManagedConsumer) Receive(ctx context.Context) (Message, error)

Receive returns a single Message, if available. A reasonable context should be provided that will be used to wait for an incoming message if none are available.

func (*ManagedConsumer) ReceiveAsync

func (m *ManagedConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Message) error

ReceiveAsync blocks until the context is done. It continuously reads messages from the consumer and sends them to the provided channel. It manages flow control internally based on the queue size.

func (*ManagedConsumer) RedeliverOverflow added in v0.1.1

func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)

RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003

func (*ManagedConsumer) RedeliverUnacknowledged added in v0.1.1

func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error

RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that have not been acked.

func (*ManagedConsumer) Unsubscribe added in v0.1.1

func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error

Unsubscribe the consumer from its topic.

type ManagedConsumerConfig

type ManagedConsumerConfig struct {
	ManagedClientConfig

	Topic     string
	Name      string // subscription name
	Exclusive bool   // if false, subscription is shared
	Earliest  bool   // if true, subscription cursor set to beginning
	QueueSize int    // number of messages to buffer before dropping messages

	NewConsumerTimeout    time.Duration // maximum duration to create Consumer, including topic lookup
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Producer
}

ManagedConsumerConfig is used to configure a ManagedConsumer.

type ManagedProducer

type ManagedProducer struct {
	// contains filtered or unexported fields
}

ManagedProducer wraps a Producer with re-connect logic.

func NewManagedProducer

func NewManagedProducer(cp *ManagedClientPool, cfg ManagedProducerConfig) *ManagedProducer

NewManagedProducer returns an initialized ManagedProducer. It will create and re-create a Producer for the given discovery address and topic on a background goroutine.

func (*ManagedProducer) Close added in v0.1.1

func (m *ManagedProducer) Close(ctx context.Context) error

Close producer

func (*ManagedProducer) Monitor added in v0.1.1

func (m *ManagedProducer) Monitor() func()

Monitor a scoped deferrable lock

func (*ManagedProducer) Send

func (m *ManagedProducer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)

Send attempts to use the Producer's Send method if available. If not available, an error is returned.

type ManagedProducerConfig

type ManagedProducerConfig struct {
	ManagedClientConfig

	Topic string
	Name  string

	NewProducerTimeout    time.Duration // maximum duration to create Producer, including topic lookup
	InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer
	MaxReconnectDelay     time.Duration // maximum time to wait to attempt to reconnect Producer
}

ManagedProducerConfig is used to configure a ManagedProducer.

type Message

type Message struct {
	Topic string

	Msg     *api.CommandMessage
	Meta    *api.MessageMetadata
	Payload []byte
	// contains filtered or unexported fields
}

Message represents a received MESSAGE from the Pulsar server.

func (*Message) Equal

func (m *Message) Equal(other *Message) bool

Equal returns true if the provided other Message is equal to the receiver Message.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is responsible for creating a subscription producer and managing its state.

func (*Producer) Close

func (p *Producer) Close(ctx context.Context) error

Close closes the producer. When receiving a CloseProducer command, the broker will stop accepting any more messages for the producer, wait until all pending messages are persisted and then reply Success to the client. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#command-closeproducer

func (*Producer) Closed

func (p *Producer) Closed() <-chan struct{}

Closed returns a channel that will block _unless_ the producer has been closed, in which case the channel will have been closed. TODO: Rename Done

func (*Producer) ConnClosed

func (p *Producer) ConnClosed() <-chan struct{}

ConnClosed unblocks when the producer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the producer.

func (*Producer) Send

func (p *Producer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)

Send sends a message and waits for a SendReceipt.

Directories

Path Synopsis
Package api provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol.
Package api provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol.
This program offers a simple CLI utility for interacting with a Pulsar server using the `pulsar` package.
This program offers a simple CLI utility for interacting with a Pulsar server using the `pulsar` package.
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol.
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol.
fuzz

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL