pulsar

package module
v0.0.0-...-80a7cab Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2019 License: Apache-2.0 Imports: 18 Imported by: 0

README

vipgoclient

This is a repo for temporary use. please DONOT use it.

Documentation

Index

Constants

View Source
const (
	// PartitionSuffix partitioned-topic name  will consist of the following parts:
	// TopicObject.String() + PartitionSuffix + index
	PartitionSuffix = "-partition-"
)

Variables

This section is empty.

Functions

func MarshalProperties

func MarshalProperties(properties map[string]string) []*api.KeyValue

func UnmarshalProperties

func UnmarshalProperties(kvs []*api.KeyValue) map[string]string

Types

type BinaryLookupService

type BinaryLookupService struct {
	URL      string
	UseTLS   bool
	ConnPool *ConnectionPool
}

BinaryLookupService

func (*BinaryLookupService) GetPartitions

func (bls *BinaryLookupService) GetPartitions(ctx context.Context, topic string) (uint32, error)

func (*BinaryLookupService) GetTopicsOfNamespace

func (bls *BinaryLookupService) GetTopicsOfNamespace(ctx context.Context, namespace string) ([]string, error)

func (*BinaryLookupService) LookupTopic

func (bls *BinaryLookupService) LookupTopic(ctx context.Context, topic string) (*LookupData, error)

type Client

type Client interface {
	// CreateProducer create the producer instance
	// This method will block until the producer is created successfully
	CreateProducer(ProducerConfig) (Producer, error)

	// Subscribe create a `Consumer` by subscribing to a topic.
	//
	// If the subscription does not exist, a new subscription will be created and all messages published after the
	// creation will be retained until acknowledged, even if the consumer is not connected
	Subscribe(ConsumerConfig) (Consumer, error)

	// Close the Client and free associated resources
	Close() error
}

Client opaque interface that provides the ability to interact with pulsar.

func NewClient

func NewClient(ctx context.Context, cfg ClientConfig) (Client, error)

NewClient returns a Pulsar client for the given configuration options

type ClientConfig

type ClientConfig struct {
	// URL the lookup URL for the Pulsar service.
	// It can be either a binary address(pulsar:// or pulsar+ssl://)
	// or an HTTP address(http:// or https://).
	// This parameter is required
	URL string

	// TLSConfig tls configuration.
	TLSConfig *tls.Config

	// OperationTimeout (default: 5 seconds)
	// Timeout for dialing to server and creating producer, consumer, reader.
	OperationTimeout time.Duration

	// Authentication configure the authentication provider. (default: no authentication)
	// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
	AuthMethod auth.Authentication
}

ClientConfig pulsar client configuration options.

func (*ClientConfig) SetDefault

func (cfg *ClientConfig) SetDefault() error

SetDefault set default if not set.

type CompressionType

type CompressionType int
const (
	NoCompression CompressionType = iota
	LZ4
	ZLib
	ZSTD
)

type Conn

type Conn struct {
	NetConn net.Conn
	Wmu     sync.Mutex // protects w to ensure frames aren't interleaved

	Cmu      sync.Mutex // protects following
	IsClosed bool
	Closedc  chan struct{}
}

Conn is responsible for writing and reading Frames to and from the underlying connection (r and w).

func NewConn

func NewConn(addr string, timeout time.Duration, tlsCfg *tls.Config, authMethod auth.Authentication) (*Conn, error)

NewConn creates a connection to given pulsar server.

func (*Conn) Close

func (c *Conn) Close() error

Close closes the underlaying connection. This will cause read() to unblock and return an error. It will also cause the closed channel to unblock.

func (*Conn) Closed

func (c *Conn) Closed() <-chan struct{}

Closed returns a channel that will unblock when the connection has been closed and is no longer usable.

func (*Conn) Read

func (c *Conn) Read(frameHandler func(f frame.Frame)) error

Read blocks while it reads from r until an error occurs. It passes all frames to the provided handler, sequentially and from the same goroutine as called with. Any error encountered will close the connection. Also if close() is called, read() will unblock. Once read returns, the core should be considered unusable.

func (*Conn) Write

func (c *Conn) Write(ctx context.Context, f *frame.Frame) error

Write writes a frame.It is safe to use concurrently.

type ConnWrapper

type ConnWrapper struct {
	LogicalAddr, PhysicalAddr string
	C                         *Conn
	RequestID                 *MonotonicID
	Dispatcher                *frame.Dispatcher
	ConsumeHandlers           map[uint64]ConsumeHandler
	ProduceHandlers           map[uint64]ProduceHandler
	// contains filtered or unexported fields
}

ConnWrapper is a wrapper of Conn which pack and send commands to pulsar server.

func (*ConnWrapper) Ack

func (cw *ConnWrapper) Ack(ctx context.Context, consumerID uint64, msgID MessageID) error

func (*ConnWrapper) Close

func (cw *ConnWrapper) Close() error

func (*ConnWrapper) CloseConsumer

func (cw *ConnWrapper) CloseConsumer(ctx context.Context, consumerID uint64) error

func (*ConnWrapper) Flow

func (cw *ConnWrapper) Flow(ctx context.Context, consumerID uint64, permits uint32) error

func (*ConnWrapper) IsClose

func (cw *ConnWrapper) IsClose() bool

func (*ConnWrapper) LookupTopic

func (cw *ConnWrapper) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)

func (*ConnWrapper) PartitionedMetadata

func (cw *ConnWrapper) PartitionedMetadata(ctx context.Context, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)

func (*ConnWrapper) Ping

func (cw *ConnWrapper) Ping(ctx context.Context) error

func (*ConnWrapper) Publish

func (cw *ConnWrapper) Publish(ctx context.Context, f *frame.Frame) error

func (*ConnWrapper) RedeliverUnacked

func (cw *ConnWrapper) RedeliverUnacked(ctx context.Context, consumerID uint64) error

func (*ConnWrapper) RegisterProducer

func (cw *ConnWrapper) RegisterProducer(ctx context.Context, topic, producerName string, producerID uint64, handler ProduceHandler) error

func (*ConnWrapper) Seek

func (cw *ConnWrapper) Seek(ctx context.Context, consumerID uint64, msgID MessageID) error

func (*ConnWrapper) Start

func (cw *ConnWrapper) Start(timeout time.Duration, authMethod auth.Authentication) error

Start handshake with pulsar server.

func (*ConnWrapper) Subscribe

func (cw *ConnWrapper) Subscribe(ctx context.Context, topic, subName, consumerName string, subType SubscriptionType,
	consumerID uint64, initialPosition InitialPosition, handler ConsumeHandler) error

func (*ConnWrapper) TopicsOfNamespace

func (cw *ConnWrapper) TopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)

func (*ConnWrapper) UnregisterProducer

func (cw *ConnWrapper) UnregisterProducer(ctx context.Context, producerID uint64) error

func (*ConnWrapper) Unubscribe

func (cw *ConnWrapper) Unubscribe(ctx context.Context, consumerID uint64) error

type ConnectionPool

type ConnectionPool struct {
	// Timeout timeout for create a new connection
	Timeout time.Duration
	// AuthMethod auth interface
	AuthMethod auth.Authentication
	// contains filtered or unexported fields
}

ConnectionPool provides management abilities of *ConnWrapper which can be shared by different clients.

func (*ConnectionPool) Close

func (cp *ConnectionPool) Close() error

Close close all connection

func (*ConnectionPool) GetConnection

func (cp *ConnectionPool) GetConnection(logicalAddress, physicalAddress string) (*ConnWrapper, error)

GetConnection return a *ConnWrapper by addr. A new one will be created if not exists.

type ConsumeHandler

type ConsumeHandler interface {
	HandleConnected(frame.Frame) error
	HandleMessage(frame.Frame) error
	HandleEndOfTopic(frame.Frame) error
	HandleClose(frame.Frame) error
}

type Consumer

type Consumer interface {
	// Get the topic for the consumer
	Topic() string

	// Get a subscription for the consumer
	Subscription() string

	// Unsubscribe the consumer
	Unsubscribe() error

	// Receives a single message.
	// This calls blocks until a message is available.
	Receive(context.Context) (*Message, error)

	//Ack the consumption of a single message
	Ack(*Message) error

	// Ack the consumption of a single message, identified by its MessageID
	AckID(MessageID) error

	// Close the consumer and stop the broker to push more messages
	Close() error

	// Reset the subscription associated with this consumer to a specific message id.
	// The message id can either be a specific message or represent the first or last messages in the topic.
	//
	// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
	//       seek() on the individual partitions.
	Seek(msgID MessageID) error

	// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
	// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
	// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
	// breaks, the messages are redelivered after reconnect.
	RedeliverUnackedMessages() error
}

An interface that abstracts behavior of Pulsar's consumer

type ConsumerConfig

type ConsumerConfig struct {
	// Specify the topic this consumer will subscribe on.
	// Either a topic, a list of topics or a topics pattern are required when subscribing
	Topic string

	// Specify the subscription name for this consumer
	// This argument is required when subscribing
	SubscriptionName string

	// Set the timeout for acking messages.
	// default (30 seconds)
	AckTimeout time.Duration

	// Select the subscription type to be used when subscribing to the topic.
	// Default is `Exclusive`
	Type SubscriptionType

	// InitialPosition at which the cursor will be set when subscribe
	// Default is `Latest`
	SubscriptionInitPos InitialPosition

	// Sets the size of the consumer receive queue.
	// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
	// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
	// throughput at the expense of bigger memory utilization.
	// Default value is `1000` messages and should be good for most use cases.
	ReceiverQueueSize uint32
}

ConsumerBuilder is used to configure and create instances of Consumer

func (*ConsumerConfig) SetDefault

func (cfg *ConsumerConfig) SetDefault() error

type ConsumerImpl

type ConsumerImpl struct {
	Ctx              context.Context
	TopicName        string
	SubscriptionName string
	PartitionIndex   int32
	ID               uint64
	Name             string
	AckTimeout       time.Duration
	Conn             *ConnWrapper

	Queue     chan *Message
	QueueSize uint32

	Mu           sync.Mutex // protects following
	IsClosed     bool
	Closedc      chan struct{}
	IsEndOfTopic bool
	EndOfTopicc  chan struct{}
	// contains filtered or unexported fields
}

func (*ConsumerImpl) Ack

func (c *ConsumerImpl) Ack(msg *Message) error

Ack the consumption of a single message

func (*ConsumerImpl) AckID

func (c *ConsumerImpl) AckID(msgID MessageID) error

Ack the consumption of a single message, identified by its MessageID

func (*ConsumerImpl) Close

func (c *ConsumerImpl) Close() error

Close the consumer and stop the broker to push more messages

func (*ConsumerImpl) HandleClose

func (c *ConsumerImpl) HandleClose(f frame.Frame) error

func (*ConsumerImpl) HandleConnected

func (c *ConsumerImpl) HandleConnected(f frame.Frame) error

func (*ConsumerImpl) HandleEndOfTopic

func (c *ConsumerImpl) HandleEndOfTopic(f frame.Frame) error

func (*ConsumerImpl) HandleMessage

func (c *ConsumerImpl) HandleMessage(f frame.Frame) error

func (*ConsumerImpl) Receive

func (c *ConsumerImpl) Receive(ctx context.Context) (*Message, error)

Receives a single message. This calls blocks until a message is available.

func (*ConsumerImpl) RedeliverUnackedMessages

func (c *ConsumerImpl) RedeliverUnackedMessages() error

RedeliverUnackedMessages redelivers all the unacknowledged messages.

func (*ConsumerImpl) Seek

func (c *ConsumerImpl) Seek(msgID MessageID) error

Seek reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.

Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the

seek() on the individual partitions.

func (*ConsumerImpl) Subscription

func (c *ConsumerImpl) Subscription() string

Get a subscription for the consumer

func (*ConsumerImpl) Topic

func (c *ConsumerImpl) Topic() string

func (*ConsumerImpl) Unsubscribe

func (c *ConsumerImpl) Unsubscribe() error

Unsubscribe the consumer

type HTTPLookupService

type HTTPLookupService struct {
}

HTTPLookupService

func (*HTTPLookupService) GetPartitions

func (*HTTPLookupService) GetTopicsOfNamespace

func (hls *HTTPLookupService) GetTopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)

func (*HTTPLookupService) LookupTopic

func (hls *HTTPLookupService) LookupTopic(ctx context.Context, topic string) (*api.CommandLookupTopicResponse, error)

type HashingScheme

type HashingScheme int
const (
	JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
	Murmur3_32Hash                      // Use Murmur3 hashing function
	BoostHash                           // C++ based boost::hash
)

type InitialPosition

type InitialPosition int
const (
	// Latest position which means the start consuming position will be the last message
	Latest InitialPosition = iota

	// Earliest position which means the start consuming position will be the first message
	Earliest
)

type LookupData

type LookupData struct {
	BrokerServiceUrl       string
	BrokerServiceUrlTls    string
	ProxyThroughServiceUrl bool
}

type LookupService

type LookupService interface {
	LookupTopic(ctx context.Context, topic string) (*LookupData, error)
	GetPartitions(ctx context.Context, topic string) (uint32, error)
	GetTopicsOfNamespace(ctx context.Context, namespace string) ([]string, error)
}

LookupService opaque interface that provides the ability to lookup topic and namespace infomation.

func NewLookupService

func NewLookupService(ctx context.Context, url string, tlsCfg *tls.Config, connPool *ConnectionPool) LookupService

type Message

type Message struct {
	ID          MessageID
	Topic       string
	Properties  map[string]string
	Payload     []byte
	PublishTime time.Time
	EventTime   time.Time
	Key         string
}

type MessageID

type MessageID struct {
	LedgerId   uint64
	EntryId    uint64
	Partition  int32
	BatchIndex int32
}

Identifier for a particular message

type MessageRouter

type MessageRouter interface {
	GetPartition(key string) uint32
}

MessageRouterFunc

type MessageRoutingMode

type MessageRoutingMode int

MessageRoutingMode

const (
	// RoundRobinDistribution publish messages across all partitions in round-robin.
	RoundRobinDistribution MessageRoutingMode = iota

	// UseSinglePartition the producer will chose one single partition and publish all the messages into that partition
	UseSinglePartition

	// CustomPartition use custom message router implementation that will be called to determine the partition for a particular message.
	CustomPartition
)

type MonotonicID

type MonotonicID struct {
	ID uint64
}

MonotonicID handles unique id generation

func (*MonotonicID) Last

func (r *MonotonicID) Last() uint64

Last returns the last ID

func (*MonotonicID) Next

func (r *MonotonicID) Next() uint64

Next returns the next ID

type PartitionedConsumerImpl

type PartitionedConsumerImpl struct {
	Ctx              context.Context
	SubscriptionName string
	TopicObject      *TopicObject
	Partitions       uint32

	Consumers []Consumer
	Queue     chan *Message

	Mu           sync.Mutex // protects following
	IsClosed     bool
	Closedc      chan struct{}
	IsEndOfTopic bool
	EndOfTopicc  chan struct{}
}

func (*PartitionedConsumerImpl) Ack

func (pc *PartitionedConsumerImpl) Ack(msg *Message) error

Ack the consumption of a single message

func (*PartitionedConsumerImpl) AckID

func (pc *PartitionedConsumerImpl) AckID(msgID MessageID) error

Ack the consumption of a single message, identified by its MessageID

func (*PartitionedConsumerImpl) Close

func (pc *PartitionedConsumerImpl) Close() error

Close the consumer and stop the broker to push more messages

func (*PartitionedConsumerImpl) Receive

func (pc *PartitionedConsumerImpl) Receive(ctx context.Context) (*Message, error)

Receive a single message. This calls blocks until a message is available.

func (*PartitionedConsumerImpl) RedeliverUnackedMessages

func (pc *PartitionedConsumerImpl) RedeliverUnackedMessages() error

RedeliverUnackedMessages redelivers all the unacknowledged messages.

func (*PartitionedConsumerImpl) Seek

func (pc *PartitionedConsumerImpl) Seek(msgID MessageID) error

Seek reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.

Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the

seek() on the individual partitions.

func (*PartitionedConsumerImpl) Subscription

func (pc *PartitionedConsumerImpl) Subscription() string

Subscription get a subscription for the consumer

func (*PartitionedConsumerImpl) Topic

func (pc *PartitionedConsumerImpl) Topic() string

Topic get a topic for the consumer

func (*PartitionedConsumerImpl) Unsubscribe

func (pc *PartitionedConsumerImpl) Unsubscribe() error

Unsubscribe the consumer

type PartitionedProducerImpl

type PartitionedProducerImpl struct {
	TopicName   string
	TopicObject *TopicObject
	Partitions  uint32
	Producers   []Producer
	Router      MessageRouter
	SequenceID  *MonotonicID
}

PartitionedProducerImpl implication of partitioned-topic producer.

func (*PartitionedProducerImpl) Close

func (p *PartitionedProducerImpl) Close() error

Close close this producer.

func (*PartitionedProducerImpl) LastSequenceID

func (p *PartitionedProducerImpl) LastSequenceID() uint64

LastSequenceID returns the last sequence id that was published by this producer.

func (*PartitionedProducerImpl) Name

func (p *PartitionedProducerImpl) Name() string

Name returns ProducerName

func (*PartitionedProducerImpl) Send

Send sync send a ProducerMessage.

func (*PartitionedProducerImpl) SendAsync

func (p *PartitionedProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))

SendAsync send a ProducerMessage in asynchronous mode.

func (*PartitionedProducerImpl) Topic

func (p *PartitionedProducerImpl) Topic() string

Topic returns TopicName

type ProduceHandler

type ProduceHandler interface {
	HandleConnected(frame.Frame) error
	HandleReceipt(frame.Frame) error
	HandleError(frame.Frame) error
	HandleClose(frame.Frame) error
}

type Producer

type Producer interface {
	// return the topic to which producer is publishing to
	Topic() string

	// return the producer name which could have been assigned by the system or specified by the client
	Name() string

	// Send a ProducerMessage
	// This call will be blocking until is successfully acknowledged by the Pulsar broker.
	// Example:
	// producer.Send(pulsar.ProducerMessage{ Payload: myPayload })
	Send(ProducerMessage) error

	// Send a ProducerMessage in asynchronous mode
	// The callback will report back the ProducerMessage being published and
	// the eventual error in publishing
	SendAsync(ProducerMessage, func(ProducerMessage, error))

	// Get the last sequence id that was published by this producer.
	// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
	// was published and acknowledged by the broker.
	// After recreating a producer with the same producer name, this will return the last ProducerMessage that was
	// published in the previous producer session, or -1 if there no ProducerMessage was ever published.
	// return the last sequence id published by this producer.
	LastSequenceID() uint64

	// Close the producer and releases resources allocated
	// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
	// of errors, pending writes will not be retried.
	Close() error
}

Producer the producer is used to publish messages on a topic

type ProducerConfig

type ProducerConfig struct {
	// Specify the topic this producer will be publishing on.
	// This argument is required when constructing the producer.
	Topic string

	// Specify a name for the producer
	// If not assigned, the system will generate a globally unique name which can be access with
	// Producer.Name().
	// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
	// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
	// a topic.
	Name string

	// Attach a set of application defined properties to the producer
	// This properties will be visible in the topic stats
	Properties map[string]string

	// Set the send timeout (default: 30 seconds)
	// If a ProducerMessage is not acknowledged by the server before the sendTimeout expires, an error will be reported.
	// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's ProducerMessage
	// deduplication feature.
	SendTimeout time.Duration

	// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
	// When the queue is full, by default, all calls to Producer.send() and Producer.SendAsync() will fail
	// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
	// (default: 1024)
	MaxPendingMessages int

	// Set the number of max pending messages across all the partitions
	// This setting will be used to lower the max pending messages for each partition
	// `MaxPendingMessages(int)`, if the total exceeds the configured value.
	// (default: 256)
	MaxPendingMessagesAcrossPartitions int

	// Set whether the `Producer.Send()` and `Producer.SendAsync()` operations should block when the outgoing
	// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
	// `ProducerQueueIsFullError` when there is no space left in pending queue.
	BlockIfQueueFull bool

	// Set the ProducerMessage routing mode for the partitioned producer.
	// Default routing mode is round-robin routing.
	//
	// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
	// particular ProducerMessage.
	RoutingMode MessageRoutingMode

	// Set the compression type for the producer.
	// By default, ProducerMessage payloads are not compressed. Supported compression types are:
	//  - LZ4
	//  - ZLIB
	//  - ZSTD
	//
	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
	// release in order to be able to receive messages compressed with ZSTD.
	Compression CompressionType

	// Set a custom ProducerMessage routing policy by passing an implementation of MessageRouter
	// The router is a function that given a particular ProducerMessage and the topic metadata, returns the
	// partition index where the ProducerMessage should be routed to
	CostomRouter MessageRouter
}

ProducerConfig contains options of creating a producer.

func (*ProducerConfig) SetDefault

func (cfg *ProducerConfig) SetDefault() error

SetDefault check and set default value for congfig.

type ProducerImpl

type ProducerImpl struct {
	Ctx              context.Context
	TopicName        string
	ID               uint64
	ProducerName     string
	SendTimeout      time.Duration
	PendingSize      int
	BlockIfQueueFull bool
	Conn             *ConnWrapper
	SequenceID       *MonotonicID
	Compression      CompressionType
	IsClosed         bool
	Closedc          chan struct{}
	// contains filtered or unexported fields
}

ProducerImpl implication of no-partitioned-topic producer.

func (*ProducerImpl) Close

func (p *ProducerImpl) Close() error

Close close this producer.

func (*ProducerImpl) HandleClose

func (p *ProducerImpl) HandleClose(frame.Frame) error

HandleClose implementation of ProduceHandler

func (*ProducerImpl) HandleConnected

func (p *ProducerImpl) HandleConnected(f frame.Frame) error

HandleConnected implementation of ProduceHandler

func (*ProducerImpl) HandleError

func (p *ProducerImpl) HandleError(f frame.Frame) error

HandleError implementation of ProduceHandler

func (*ProducerImpl) HandleReceipt

func (p *ProducerImpl) HandleReceipt(f frame.Frame) error

HandleReceipt implementation of ProduceHandler

func (*ProducerImpl) LastSequenceID

func (p *ProducerImpl) LastSequenceID() uint64

LastSequenceID returns the last sequence id that was published by this producer.

func (*ProducerImpl) Name

func (p *ProducerImpl) Name() string

Name returns ProducerName

func (*ProducerImpl) Send

func (p *ProducerImpl) Send(msg ProducerMessage) error

Send sync send a ProducerMessage.

func (*ProducerImpl) SendAsync

func (p *ProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))

SendAsync send a ProducerMessage in asynchronous mode.

func (*ProducerImpl) Topic

func (p *ProducerImpl) Topic() string

Topic returns TopicName

type ProducerMessage

type ProducerMessage struct {
	// ProducerID which should be auto filled by producer while sending
	ProducerID uint64
	// ProducerName which should be auto filled by producer while sending
	ProducerName string
	// SequenceID which should be auto filled by producer while sending
	SequenceID uint64
	// Payload for the message
	Payload []byte
	// Sets the key of the message for routing policy
	Key string
	// Attach application defined properties on the message
	Properties map[string]string
	// Set the event time for a given message
	EventTime   time.Time
	Compression CompressionType
}

func (*ProducerMessage) Frame

func (msg *ProducerMessage) Frame() *frame.Frame

type Result

type Result int
const (
	UnknownError                          Result = 1  // Unknown error happened on broker
	InvalidConfiguration                  Result = 2  // Invalid configuration
	TimeoutError                          Result = 3  // Operation timed out
	LookupError                           Result = 4  // Broker lookup failed
	ConnectError                          Result = 5  // Failed to connect to broker
	ReadError                             Result = 6  // Failed to read from socket
	AuthenticationError                   Result = 7  // Authentication failed on broker
	AuthorizationError                    Result = 8  // Client is not authorized to create producer/consumer
	ErrorGettingAuthenticationData        Result = 9  // Client cannot find authorization data
	BrokerMetadataError                   Result = 10 // Broker failed in updating metadata
	BrokerPersistenceError                Result = 11 // Broker failed to persist entry
	ChecksumError                         Result = 12 // Corrupt message checksum failure
	ConsumerBusy                          Result = 13 // Exclusive consumer is already connected
	NotConnectedError                     Result = 14 // Producer/Consumer is not currently connected to broker
	AlreadyClosedError                    Result = 15 // Producer/Consumer is already closed and not accepting any operation
	InvalidMessage                        Result = 16 // Error in publishing an already used message
	ConsumerNotInitialized                Result = 17 // Consumer is not initialized
	ProducerNotInitialized                Result = 18 // Producer is not initialized
	TooManyLookupRequestException         Result = 19 // Too Many concurrent LookupRequest
	InvalidTopicName                      Result = 20 // Invalid topic name
	InvalidUrl                            Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
	ServiceUnitNotReady                   Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created
	OperationNotSupported                 Result = 23
	ProducerBlockedQuotaExceededError     Result = 24 // Producer is blocked
	ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception
	ProducerQueueIsFull                   Result = 26 // Producer queue is full
	MessageTooBig                         Result = 27 // Trying to send a messages exceeding the max size
	TopicNotFound                         Result = 28 // Topic not found
	SubscriptionNotFound                  Result = 29 // Subscription not found
	ConsumerNotFound                      Result = 30 // Consumer not found
	UnsupportedVersionError               Result = 31 // Error when an older client/version doesn't support a required feature
	TopicTerminated                       Result = 32 // Topic was already terminated
	CryptoError                           Result = 33 // Error when crypto operation fails
)

type RoundRobinRouter

type RoundRobinRouter struct {
	Partition uint32
	// contains filtered or unexported fields
}

func (*RoundRobinRouter) GetPartition

func (router *RoundRobinRouter) GetPartition(key string) uint32

type SinglePartitionRouter

type SinglePartitionRouter struct {
	Partition       uint32
	SinglePartition uint32
}

func (*SinglePartitionRouter) GetPartition

func (router *SinglePartitionRouter) GetPartition(key string) uint32

type SubscriptionType

type SubscriptionType int

Types of subscription supported by Pulsar

const (
	// There can be only 1 consumer on the same topic with the same subscription name
	Exclusive SubscriptionType = iota

	// Multiple consumer will be able to use the same subscription name and the messages will be dispatched according to
	// a round-robin rotation between the connected consumers
	Shared

	// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
	// If that consumer disconnects, one of the other connected consumers will start receiving messages.
	Failover

	KeyShare
)

type TopicObject

type TopicObject struct {
	FullName         string
	Domain           string
	Property         string
	Cluster          string
	NamespacePortion string
	LocalName        string
	IsV2Topic        bool
}

TopicObject standardized topic

func ParseTopicName

func ParseTopicName(topic string) (*TopicObject, error)

ParseTopicName parse topic string to a standardized TopicObject. Returns an error if the topic was invalid.

func (*TopicObject) GetPartitionName

func (t *TopicObject) GetPartitionName(partition uint32) string

GetPartitionName returns a partitioned-topic string by index.

func (*TopicObject) String

func (t *TopicObject) String() string

Directories

Path Synopsis
examples
consumer command
producer command
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol.
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol.
pkg
api

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL