Documentation
¶
Index ¶
- Constants
- func MarshalProperties(properties map[string]string) []*api.KeyValue
- func UnmarshalProperties(kvs []*api.KeyValue) map[string]string
- type BinaryLookupService
- func (bls *BinaryLookupService) GetPartitions(ctx context.Context, topic string) (uint32, error)
- func (bls *BinaryLookupService) GetTopicsOfNamespace(ctx context.Context, namespace string) ([]string, error)
- func (bls *BinaryLookupService) LookupTopic(ctx context.Context, topic string) (*LookupData, error)
- type Client
- type ClientConfig
- type CompressionType
- type Conn
- type ConnWrapper
- func (cw *ConnWrapper) Ack(ctx context.Context, consumerID uint64, msgID MessageID) error
- func (cw *ConnWrapper) Close() error
- func (cw *ConnWrapper) CloseConsumer(ctx context.Context, consumerID uint64) error
- func (cw *ConnWrapper) Flow(ctx context.Context, consumerID uint64, permits uint32) error
- func (cw *ConnWrapper) IsClose() bool
- func (cw *ConnWrapper) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
- func (cw *ConnWrapper) PartitionedMetadata(ctx context.Context, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
- func (cw *ConnWrapper) Ping(ctx context.Context) error
- func (cw *ConnWrapper) Publish(ctx context.Context, f *frame.Frame) error
- func (cw *ConnWrapper) RedeliverUnacked(ctx context.Context, consumerID uint64) error
- func (cw *ConnWrapper) RegisterProducer(ctx context.Context, topic, producerName string, producerID uint64, ...) error
- func (cw *ConnWrapper) Seek(ctx context.Context, consumerID uint64, msgID MessageID) error
- func (cw *ConnWrapper) Start(timeout time.Duration, authMethod auth.Authentication) error
- func (cw *ConnWrapper) Subscribe(ctx context.Context, topic, subName, consumerName string, ...) error
- func (cw *ConnWrapper) TopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)
- func (cw *ConnWrapper) UnregisterProducer(ctx context.Context, producerID uint64) error
- func (cw *ConnWrapper) Unubscribe(ctx context.Context, consumerID uint64) error
- type ConnectionPool
- type ConsumeHandler
- type Consumer
- type ConsumerConfig
- type ConsumerImpl
- func (c *ConsumerImpl) Ack(msg *Message) error
- func (c *ConsumerImpl) AckID(msgID MessageID) error
- func (c *ConsumerImpl) Close() error
- func (c *ConsumerImpl) HandleClose(f frame.Frame) error
- func (c *ConsumerImpl) HandleConnected(f frame.Frame) error
- func (c *ConsumerImpl) HandleEndOfTopic(f frame.Frame) error
- func (c *ConsumerImpl) HandleMessage(f frame.Frame) error
- func (c *ConsumerImpl) Receive(ctx context.Context) (*Message, error)
- func (c *ConsumerImpl) RedeliverUnackedMessages() error
- func (c *ConsumerImpl) Seek(msgID MessageID) error
- func (c *ConsumerImpl) Subscription() string
- func (c *ConsumerImpl) Topic() string
- func (c *ConsumerImpl) Unsubscribe() error
- type HTTPLookupService
- func (hls *HTTPLookupService) GetPartitions(ctx context.Context, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
- func (hls *HTTPLookupService) GetTopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)
- func (hls *HTTPLookupService) LookupTopic(ctx context.Context, topic string) (*api.CommandLookupTopicResponse, error)
- type HashingScheme
- type InitialPosition
- type LookupData
- type LookupService
- type Message
- type MessageID
- type MessageRouter
- type MessageRoutingMode
- type MonotonicID
- type PartitionedConsumerImpl
- func (pc *PartitionedConsumerImpl) Ack(msg *Message) error
- func (pc *PartitionedConsumerImpl) AckID(msgID MessageID) error
- func (pc *PartitionedConsumerImpl) Close() error
- func (pc *PartitionedConsumerImpl) Receive(ctx context.Context) (*Message, error)
- func (pc *PartitionedConsumerImpl) RedeliverUnackedMessages() error
- func (pc *PartitionedConsumerImpl) Seek(msgID MessageID) error
- func (pc *PartitionedConsumerImpl) Subscription() string
- func (pc *PartitionedConsumerImpl) Topic() string
- func (pc *PartitionedConsumerImpl) Unsubscribe() error
- type PartitionedProducerImpl
- func (p *PartitionedProducerImpl) Close() error
- func (p *PartitionedProducerImpl) LastSequenceID() uint64
- func (p *PartitionedProducerImpl) Name() string
- func (p *PartitionedProducerImpl) Send(msg ProducerMessage) error
- func (p *PartitionedProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))
- func (p *PartitionedProducerImpl) Topic() string
- type ProduceHandler
- type Producer
- type ProducerConfig
- type ProducerImpl
- func (p *ProducerImpl) Close() error
- func (p *ProducerImpl) HandleClose(frame.Frame) error
- func (p *ProducerImpl) HandleConnected(f frame.Frame) error
- func (p *ProducerImpl) HandleError(f frame.Frame) error
- func (p *ProducerImpl) HandleReceipt(f frame.Frame) error
- func (p *ProducerImpl) LastSequenceID() uint64
- func (p *ProducerImpl) Name() string
- func (p *ProducerImpl) Send(msg ProducerMessage) error
- func (p *ProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))
- func (p *ProducerImpl) Topic() string
- type ProducerMessage
- type Result
- type RoundRobinRouter
- type SinglePartitionRouter
- type SubscriptionType
- type TopicObject
Constants ¶
const ( // PartitionSuffix partitioned-topic name will consist of the following parts: // TopicObject.String() + PartitionSuffix + index PartitionSuffix = "-partition-" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BinaryLookupService ¶
type BinaryLookupService struct {
URL string
UseTLS bool
ConnPool *ConnectionPool
}
BinaryLookupService
func (*BinaryLookupService) GetPartitions ¶
func (*BinaryLookupService) GetTopicsOfNamespace ¶
func (*BinaryLookupService) LookupTopic ¶
func (bls *BinaryLookupService) LookupTopic(ctx context.Context, topic string) (*LookupData, error)
type Client ¶
type Client interface {
// CreateProducer create the producer instance
// This method will block until the producer is created successfully
CreateProducer(ProducerConfig) (Producer, error)
// Subscribe create a `Consumer` by subscribing to a topic.
//
// If the subscription does not exist, a new subscription will be created and all messages published after the
// creation will be retained until acknowledged, even if the consumer is not connected
Subscribe(ConsumerConfig) (Consumer, error)
// Close the Client and free associated resources
Close() error
}
Client opaque interface that provides the ability to interact with pulsar.
type ClientConfig ¶
type ClientConfig struct {
// URL the lookup URL for the Pulsar service.
// It can be either a binary address(pulsar:// or pulsar+ssl://)
// or an HTTP address(http:// or https://).
// This parameter is required
URL string
// TLSConfig tls configuration.
TLSConfig *tls.Config
// OperationTimeout (default: 5 seconds)
// Timeout for dialing to server and creating producer, consumer, reader.
OperationTimeout time.Duration
// Authentication configure the authentication provider. (default: no authentication)
// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
AuthMethod auth.Authentication
}
ClientConfig pulsar client configuration options.
func (*ClientConfig) SetDefault ¶
func (cfg *ClientConfig) SetDefault() error
SetDefault set default if not set.
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD )
type Conn ¶
type Conn struct {
NetConn net.Conn
Wmu sync.Mutex // protects w to ensure frames aren't interleaved
Cmu sync.Mutex // protects following
IsClosed bool
Closedc chan struct{}
}
Conn is responsible for writing and reading Frames to and from the underlying connection (r and w).
func NewConn ¶
func NewConn(addr string, timeout time.Duration, tlsCfg *tls.Config, authMethod auth.Authentication) (*Conn, error)
NewConn creates a connection to given pulsar server.
func (*Conn) Close ¶
Close closes the underlaying connection. This will cause read() to unblock and return an error. It will also cause the closed channel to unblock.
func (*Conn) Closed ¶
func (c *Conn) Closed() <-chan struct{}
Closed returns a channel that will unblock when the connection has been closed and is no longer usable.
func (*Conn) Read ¶
Read blocks while it reads from r until an error occurs. It passes all frames to the provided handler, sequentially and from the same goroutine as called with. Any error encountered will close the connection. Also if close() is called, read() will unblock. Once read returns, the core should be considered unusable.
type ConnWrapper ¶
type ConnWrapper struct {
LogicalAddr, PhysicalAddr string
C *Conn
RequestID *MonotonicID
Dispatcher *frame.Dispatcher
ConsumeHandlers map[uint64]ConsumeHandler
ProduceHandlers map[uint64]ProduceHandler
// contains filtered or unexported fields
}
ConnWrapper is a wrapper of Conn which pack and send commands to pulsar server.
func (*ConnWrapper) Close ¶
func (cw *ConnWrapper) Close() error
func (*ConnWrapper) CloseConsumer ¶
func (cw *ConnWrapper) CloseConsumer(ctx context.Context, consumerID uint64) error
func (*ConnWrapper) IsClose ¶
func (cw *ConnWrapper) IsClose() bool
func (*ConnWrapper) LookupTopic ¶
func (cw *ConnWrapper) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
func (*ConnWrapper) PartitionedMetadata ¶
func (cw *ConnWrapper) PartitionedMetadata(ctx context.Context, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
func (*ConnWrapper) RedeliverUnacked ¶
func (cw *ConnWrapper) RedeliverUnacked(ctx context.Context, consumerID uint64) error
func (*ConnWrapper) RegisterProducer ¶
func (cw *ConnWrapper) RegisterProducer(ctx context.Context, topic, producerName string, producerID uint64, handler ProduceHandler) error
func (*ConnWrapper) Start ¶
func (cw *ConnWrapper) Start(timeout time.Duration, authMethod auth.Authentication) error
Start handshake with pulsar server.
func (*ConnWrapper) Subscribe ¶
func (cw *ConnWrapper) Subscribe(ctx context.Context, topic, subName, consumerName string, subType SubscriptionType, consumerID uint64, initialPosition InitialPosition, handler ConsumeHandler) error
func (*ConnWrapper) TopicsOfNamespace ¶
func (cw *ConnWrapper) TopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)
func (*ConnWrapper) UnregisterProducer ¶
func (cw *ConnWrapper) UnregisterProducer(ctx context.Context, producerID uint64) error
func (*ConnWrapper) Unubscribe ¶
func (cw *ConnWrapper) Unubscribe(ctx context.Context, consumerID uint64) error
type ConnectionPool ¶
type ConnectionPool struct {
// Timeout timeout for create a new connection
Timeout time.Duration
// AuthMethod auth interface
AuthMethod auth.Authentication
// contains filtered or unexported fields
}
ConnectionPool provides management abilities of *ConnWrapper which can be shared by different clients.
func (*ConnectionPool) GetConnection ¶
func (cp *ConnectionPool) GetConnection(logicalAddress, physicalAddress string) (*ConnWrapper, error)
GetConnection return a *ConnWrapper by addr. A new one will be created if not exists.
type ConsumeHandler ¶
type Consumer ¶
type Consumer interface {
// Get the topic for the consumer
Topic() string
// Get a subscription for the consumer
Subscription() string
// Unsubscribe the consumer
Unsubscribe() error
// Receives a single message.
// This calls blocks until a message is available.
Receive(context.Context) (*Message, error)
//Ack the consumption of a single message
Ack(*Message) error
// Ack the consumption of a single message, identified by its MessageID
AckID(MessageID) error
// Close the consumer and stop the broker to push more messages
Close() error
// Reset the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(msgID MessageID) error
// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
RedeliverUnackedMessages() error
}
An interface that abstracts behavior of Pulsar's consumer
type ConsumerConfig ¶
type ConsumerConfig struct {
// Specify the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string
// Specify the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
// Set the timeout for acking messages.
// default (30 seconds)
AckTimeout time.Duration
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
// InitialPosition at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitPos InitialPosition
// Sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize uint32
}
ConsumerBuilder is used to configure and create instances of Consumer
func (*ConsumerConfig) SetDefault ¶
func (cfg *ConsumerConfig) SetDefault() error
type ConsumerImpl ¶
type ConsumerImpl struct {
Ctx context.Context
TopicName string
SubscriptionName string
PartitionIndex int32
ID uint64
Name string
AckTimeout time.Duration
Conn *ConnWrapper
Queue chan *Message
QueueSize uint32
Mu sync.Mutex // protects following
IsClosed bool
Closedc chan struct{}
IsEndOfTopic bool
EndOfTopicc chan struct{}
// contains filtered or unexported fields
}
func (*ConsumerImpl) Ack ¶
func (c *ConsumerImpl) Ack(msg *Message) error
Ack the consumption of a single message
func (*ConsumerImpl) AckID ¶
func (c *ConsumerImpl) AckID(msgID MessageID) error
Ack the consumption of a single message, identified by its MessageID
func (*ConsumerImpl) Close ¶
func (c *ConsumerImpl) Close() error
Close the consumer and stop the broker to push more messages
func (*ConsumerImpl) HandleClose ¶
func (c *ConsumerImpl) HandleClose(f frame.Frame) error
func (*ConsumerImpl) HandleConnected ¶
func (c *ConsumerImpl) HandleConnected(f frame.Frame) error
func (*ConsumerImpl) HandleEndOfTopic ¶
func (c *ConsumerImpl) HandleEndOfTopic(f frame.Frame) error
func (*ConsumerImpl) HandleMessage ¶
func (c *ConsumerImpl) HandleMessage(f frame.Frame) error
func (*ConsumerImpl) Receive ¶
func (c *ConsumerImpl) Receive(ctx context.Context) (*Message, error)
Receives a single message. This calls blocks until a message is available.
func (*ConsumerImpl) RedeliverUnackedMessages ¶
func (c *ConsumerImpl) RedeliverUnackedMessages() error
RedeliverUnackedMessages redelivers all the unacknowledged messages.
func (*ConsumerImpl) Seek ¶
func (c *ConsumerImpl) Seek(msgID MessageID) error
Seek reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
func (*ConsumerImpl) Subscription ¶
func (c *ConsumerImpl) Subscription() string
Get a subscription for the consumer
func (*ConsumerImpl) Topic ¶
func (c *ConsumerImpl) Topic() string
func (*ConsumerImpl) Unsubscribe ¶
func (c *ConsumerImpl) Unsubscribe() error
Unsubscribe the consumer
type HTTPLookupService ¶
type HTTPLookupService struct {
}
HTTPLookupService
func (*HTTPLookupService) GetPartitions ¶
func (hls *HTTPLookupService) GetPartitions(ctx context.Context, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
func (*HTTPLookupService) GetTopicsOfNamespace ¶
func (hls *HTTPLookupService) GetTopicsOfNamespace(ctx context.Context, namespace string) (*api.CommandGetTopicsOfNamespaceResponse, error)
func (*HTTPLookupService) LookupTopic ¶
func (hls *HTTPLookupService) LookupTopic(ctx context.Context, topic string) (*api.CommandLookupTopicResponse, error)
type HashingScheme ¶
type HashingScheme int
const ( JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent Murmur3_32Hash // Use Murmur3 hashing function BoostHash // C++ based boost::hash )
type InitialPosition ¶
type InitialPosition int
const ( // Latest position which means the start consuming position will be the last message Latest InitialPosition = iota // Earliest position which means the start consuming position will be the first message Earliest )
type LookupData ¶
type LookupService ¶
type LookupService interface {
LookupTopic(ctx context.Context, topic string) (*LookupData, error)
GetPartitions(ctx context.Context, topic string) (uint32, error)
GetTopicsOfNamespace(ctx context.Context, namespace string) ([]string, error)
}
LookupService opaque interface that provides the ability to lookup topic and namespace infomation.
func NewLookupService ¶
func NewLookupService(ctx context.Context, url string, tlsCfg *tls.Config, connPool *ConnectionPool) LookupService
type MessageRouter ¶
MessageRouterFunc
type MessageRoutingMode ¶
type MessageRoutingMode int
MessageRoutingMode
const ( // RoundRobinDistribution publish messages across all partitions in round-robin. RoundRobinDistribution MessageRoutingMode = iota // UseSinglePartition the producer will chose one single partition and publish all the messages into that partition UseSinglePartition // CustomPartition use custom message router implementation that will be called to determine the partition for a particular message. CustomPartition )
type PartitionedConsumerImpl ¶
type PartitionedConsumerImpl struct {
Ctx context.Context
SubscriptionName string
TopicObject *TopicObject
Partitions uint32
Consumers []Consumer
Queue chan *Message
Mu sync.Mutex // protects following
IsClosed bool
Closedc chan struct{}
IsEndOfTopic bool
EndOfTopicc chan struct{}
}
func (*PartitionedConsumerImpl) Ack ¶
func (pc *PartitionedConsumerImpl) Ack(msg *Message) error
Ack the consumption of a single message
func (*PartitionedConsumerImpl) AckID ¶
func (pc *PartitionedConsumerImpl) AckID(msgID MessageID) error
Ack the consumption of a single message, identified by its MessageID
func (*PartitionedConsumerImpl) Close ¶
func (pc *PartitionedConsumerImpl) Close() error
Close the consumer and stop the broker to push more messages
func (*PartitionedConsumerImpl) Receive ¶
func (pc *PartitionedConsumerImpl) Receive(ctx context.Context) (*Message, error)
Receive a single message. This calls blocks until a message is available.
func (*PartitionedConsumerImpl) RedeliverUnackedMessages ¶
func (pc *PartitionedConsumerImpl) RedeliverUnackedMessages() error
RedeliverUnackedMessages redelivers all the unacknowledged messages.
func (*PartitionedConsumerImpl) Seek ¶
func (pc *PartitionedConsumerImpl) Seek(msgID MessageID) error
Seek reset the subscription associated with this consumer to a specific message id. The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
func (*PartitionedConsumerImpl) Subscription ¶
func (pc *PartitionedConsumerImpl) Subscription() string
Subscription get a subscription for the consumer
func (*PartitionedConsumerImpl) Topic ¶
func (pc *PartitionedConsumerImpl) Topic() string
Topic get a topic for the consumer
func (*PartitionedConsumerImpl) Unsubscribe ¶
func (pc *PartitionedConsumerImpl) Unsubscribe() error
Unsubscribe the consumer
type PartitionedProducerImpl ¶
type PartitionedProducerImpl struct {
TopicName string
TopicObject *TopicObject
Partitions uint32
Producers []Producer
Router MessageRouter
SequenceID *MonotonicID
}
PartitionedProducerImpl implication of partitioned-topic producer.
func (*PartitionedProducerImpl) Close ¶
func (p *PartitionedProducerImpl) Close() error
Close close this producer.
func (*PartitionedProducerImpl) LastSequenceID ¶
func (p *PartitionedProducerImpl) LastSequenceID() uint64
LastSequenceID returns the last sequence id that was published by this producer.
func (*PartitionedProducerImpl) Name ¶
func (p *PartitionedProducerImpl) Name() string
Name returns ProducerName
func (*PartitionedProducerImpl) Send ¶
func (p *PartitionedProducerImpl) Send(msg ProducerMessage) error
Send sync send a ProducerMessage.
func (*PartitionedProducerImpl) SendAsync ¶
func (p *PartitionedProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))
SendAsync send a ProducerMessage in asynchronous mode.
func (*PartitionedProducerImpl) Topic ¶
func (p *PartitionedProducerImpl) Topic() string
Topic returns TopicName
type ProduceHandler ¶
type Producer ¶
type Producer interface {
// return the topic to which producer is publishing to
Topic() string
// return the producer name which could have been assigned by the system or specified by the client
Name() string
// Send a ProducerMessage
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// producer.Send(pulsar.ProducerMessage{ Payload: myPayload })
Send(ProducerMessage) error
// Send a ProducerMessage in asynchronous mode
// The callback will report back the ProducerMessage being published and
// the eventual error in publishing
SendAsync(ProducerMessage, func(ProducerMessage, error))
// Get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
// was published and acknowledged by the broker.
// After recreating a producer with the same producer name, this will return the last ProducerMessage that was
// published in the previous producer session, or -1 if there no ProducerMessage was ever published.
// return the last sequence id published by this producer.
LastSequenceID() uint64
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Close() error
}
Producer the producer is used to publish messages on a topic
type ProducerConfig ¶
type ProducerConfig struct {
// Specify the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
// Specify a name for the producer
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.Name().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
// a topic.
Name string
// Attach a set of application defined properties to the producer
// This properties will be visible in the topic stats
Properties map[string]string
// Set the send timeout (default: 30 seconds)
// If a ProducerMessage is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's ProducerMessage
// deduplication feature.
SendTimeout time.Duration
// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
// When the queue is full, by default, all calls to Producer.send() and Producer.SendAsync() will fail
// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
// (default: 1024)
MaxPendingMessages int
// Set the number of max pending messages across all the partitions
// This setting will be used to lower the max pending messages for each partition
// `MaxPendingMessages(int)`, if the total exceeds the configured value.
// (default: 256)
MaxPendingMessagesAcrossPartitions int
// Set whether the `Producer.Send()` and `Producer.SendAsync()` operations should block when the outgoing
// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
// `ProducerQueueIsFullError` when there is no space left in pending queue.
BlockIfQueueFull bool
// Set the ProducerMessage routing mode for the partitioned producer.
// Default routing mode is round-robin routing.
//
// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
// particular ProducerMessage.
RoutingMode MessageRoutingMode
// Set the compression type for the producer.
// By default, ProducerMessage payloads are not compressed. Supported compression types are:
// - LZ4
// - ZLIB
// - ZSTD
//
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with ZSTD.
Compression CompressionType
// Set a custom ProducerMessage routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular ProducerMessage and the topic metadata, returns the
// partition index where the ProducerMessage should be routed to
CostomRouter MessageRouter
}
ProducerConfig contains options of creating a producer.
func (*ProducerConfig) SetDefault ¶
func (cfg *ProducerConfig) SetDefault() error
SetDefault check and set default value for congfig.
type ProducerImpl ¶
type ProducerImpl struct {
Ctx context.Context
TopicName string
ID uint64
ProducerName string
SendTimeout time.Duration
PendingSize int
BlockIfQueueFull bool
Conn *ConnWrapper
SequenceID *MonotonicID
Compression CompressionType
IsClosed bool
Closedc chan struct{}
// contains filtered or unexported fields
}
ProducerImpl implication of no-partitioned-topic producer.
func (*ProducerImpl) HandleClose ¶
func (p *ProducerImpl) HandleClose(frame.Frame) error
HandleClose implementation of ProduceHandler
func (*ProducerImpl) HandleConnected ¶
func (p *ProducerImpl) HandleConnected(f frame.Frame) error
HandleConnected implementation of ProduceHandler
func (*ProducerImpl) HandleError ¶
func (p *ProducerImpl) HandleError(f frame.Frame) error
HandleError implementation of ProduceHandler
func (*ProducerImpl) HandleReceipt ¶
func (p *ProducerImpl) HandleReceipt(f frame.Frame) error
HandleReceipt implementation of ProduceHandler
func (*ProducerImpl) LastSequenceID ¶
func (p *ProducerImpl) LastSequenceID() uint64
LastSequenceID returns the last sequence id that was published by this producer.
func (*ProducerImpl) Send ¶
func (p *ProducerImpl) Send(msg ProducerMessage) error
Send sync send a ProducerMessage.
func (*ProducerImpl) SendAsync ¶
func (p *ProducerImpl) SendAsync(msg ProducerMessage, callback func(ProducerMessage, error))
SendAsync send a ProducerMessage in asynchronous mode.
type ProducerMessage ¶
type ProducerMessage struct {
// ProducerID which should be auto filled by producer while sending
ProducerID uint64
// ProducerName which should be auto filled by producer while sending
ProducerName string
// SequenceID which should be auto filled by producer while sending
SequenceID uint64
// Payload for the message
Payload []byte
// Sets the key of the message for routing policy
Key string
// Attach application defined properties on the message
Properties map[string]string
// Set the event time for a given message
EventTime time.Time
Compression CompressionType
}
func (*ProducerMessage) Frame ¶
func (msg *ProducerMessage) Frame() *frame.Frame
type Result ¶
type Result int
const ( UnknownError Result = 1 // Unknown error happened on broker InvalidConfiguration Result = 2 // Invalid configuration TimeoutError Result = 3 // Operation timed out LookupError Result = 4 // Broker lookup failed ConnectError Result = 5 // Failed to connect to broker ReadError Result = 6 // Failed to read from socket AuthenticationError Result = 7 // Authentication failed on broker AuthorizationError Result = 8 // Client is not authorized to create producer/consumer ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data BrokerMetadataError Result = 10 // Broker failed in updating metadata BrokerPersistenceError Result = 11 // Broker failed to persist entry ChecksumError Result = 12 // Corrupt message checksum failure ConsumerBusy Result = 13 // Exclusive consumer is already connected NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation InvalidMessage Result = 16 // Error in publishing an already used message ConsumerNotInitialized Result = 17 // Consumer is not initialized ProducerNotInitialized Result = 18 // Producer is not initialized TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest InvalidTopicName Result = 20 // Invalid topic name InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created OperationNotSupported Result = 23 ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception ProducerQueueIsFull Result = 26 // Producer queue is full MessageTooBig Result = 27 // Trying to send a messages exceeding the max size TopicNotFound Result = 28 // Topic not found SubscriptionNotFound Result = 29 // Subscription not found ConsumerNotFound Result = 30 // Consumer not found UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature TopicTerminated Result = 32 // Topic was already terminated CryptoError Result = 33 // Error when crypto operation fails )
type RoundRobinRouter ¶
type RoundRobinRouter struct {
Partition uint32
// contains filtered or unexported fields
}
func (*RoundRobinRouter) GetPartition ¶
func (router *RoundRobinRouter) GetPartition(key string) uint32
type SinglePartitionRouter ¶
func (*SinglePartitionRouter) GetPartition ¶
func (router *SinglePartitionRouter) GetPartition(key string) uint32
type SubscriptionType ¶
type SubscriptionType int
Types of subscription supported by Pulsar
const ( // There can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // a round-robin rotation between the connected consumers Shared // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover )
type TopicObject ¶
type TopicObject struct {
FullName string
Domain string
Property string
Cluster string
NamespacePortion string
LocalName string
IsV2Topic bool
}
TopicObject standardized topic
func ParseTopicName ¶
func ParseTopicName(topic string) (*TopicObject, error)
ParseTopicName parse topic string to a standardized TopicObject. Returns an error if the topic was invalid.
func (*TopicObject) GetPartitionName ¶
func (t *TopicObject) GetPartitionName(partition uint32) string
GetPartitionName returns a partitioned-topic string by index.
func (*TopicObject) String ¶
func (t *TopicObject) String() string