Documentation
¶
Index ¶
- Constants
- func ReadElements(r io.Reader, elements ...interface{}) error
- func WriteElements(w io.Writer, elements ...interface{}) error
- type Authentication
- type AvroCodec
- type AvroSchema
- type BinaryFreeList
- func (b BinaryFreeList) Borrow() (buf []byte)
- func (b BinaryFreeList) Float32(buf []byte) (float32, error)
- func (b BinaryFreeList) Float64(buf []byte) (float64, error)
- func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
- func (b BinaryFreeList) PutUint16(w io.Writer, byteOrder binary.ByteOrder, val uint16) error
- func (b BinaryFreeList) PutUint32(w io.Writer, byteOrder binary.ByteOrder, val uint32) error
- func (b BinaryFreeList) PutUint64(w io.Writer, byteOrder binary.ByteOrder, val uint64) error
- func (b BinaryFreeList) PutUint8(w io.Writer, val uint8) error
- func (b BinaryFreeList) Return(buf []byte)
- func (b BinaryFreeList) Uint16(r io.Reader, byteOrder binary.ByteOrder) (uint16, error)
- func (b BinaryFreeList) Uint32(r io.Reader, byteOrder binary.ByteOrder) (uint32, error)
- func (b BinaryFreeList) Uint64(r io.Reader, byteOrder binary.ByteOrder) (uint64, error)
- func (b BinaryFreeList) Uint8(r io.Reader) (uint8, error)
- type BytesSchema
- type Client
- type ClientOptions
- type CompressionType
- type Consumer
- type ConsumerMessage
- type ConsumerOptions
- type DoubleSchema
- type Error
- type FloatSchema
- type HashingScheme
- type InitialPosition
- type Int16Schema
- type Int32Schema
- type Int64Schema
- type Int8Schema
- type JsonSchema
- type Message
- type MessageID
- type MessageRoutingMode
- type Producer
- type ProducerMessage
- type ProducerOptions
- type ProtoSchema
- type Reader
- type ReaderMessage
- type ReaderOptions
- type Result
- type Schema
- type SchemaInfo
- type SchemaType
- type StringSchema
- type SubscriptionType
- type TopicMetadata
Constants ¶
const (
IoMaxSize = 1024
)
Variables ¶
This section is empty.
Functions ¶
func ReadElements ¶
func WriteElements ¶
Types ¶
type Authentication ¶
type Authentication interface{}
Opaque interface that represents the authentication credentials
func NewAuthenticationAthenz ¶
func NewAuthenticationAthenz(authParams string) Authentication
Create new Athenz Authentication provider with configuration in JSON form
func NewAuthenticationTLS ¶
func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authentication
Create new Authentication provider with specified TLS certificate and private key
func NewAuthenticationToken ¶
func NewAuthenticationToken(token string) Authentication
Create new Authentication provider with specified auth token
func NewAuthenticationTokenSupplier ¶
func NewAuthenticationTokenSupplier(tokenSupplier func() string) Authentication
Create new Authentication provider with specified auth token supplier
type AvroCodec ¶
func NewSchemaDefinition ¶
type AvroSchema ¶
type AvroSchema struct {
AvroCodec
SchemaInfo
}
func NewAvroSchema ¶
func NewAvroSchema(avroSchemaDef string, properties map[string]string) *AvroSchema
func (*AvroSchema) Decode ¶
func (as *AvroSchema) Decode(data []byte, v interface{}) error
func (*AvroSchema) Encode ¶
func (as *AvroSchema) Encode(data interface{}) ([]byte, error)
func (*AvroSchema) GetSchemaInfo ¶
func (as *AvroSchema) GetSchemaInfo() *SchemaInfo
func (*AvroSchema) Validate ¶
func (as *AvroSchema) Validate(message []byte) error
type BinaryFreeList ¶
type BinaryFreeList chan []byte
var BinarySerializer BinaryFreeList = make(chan []byte, IoMaxSize)
func (BinaryFreeList) Borrow ¶
func (b BinaryFreeList) Borrow() (buf []byte)
func (BinaryFreeList) PutDouble ¶
func (b BinaryFreeList) PutDouble(datum interface{}) ([]byte, error)
func (BinaryFreeList) PutFloat ¶
func (b BinaryFreeList) PutFloat(datum interface{}) ([]byte, error)
func (BinaryFreeList) Return ¶
func (b BinaryFreeList) Return(buf []byte)
type BytesSchema ¶
type BytesSchema struct {
SchemaInfo
}
func NewBytesSchema ¶
func NewBytesSchema(properties map[string]string) *BytesSchema
func (*BytesSchema) Decode ¶
func (bs *BytesSchema) Decode(data []byte, v interface{}) error
func (*BytesSchema) Encode ¶
func (bs *BytesSchema) Encode(data interface{}) ([]byte, error)
func (*BytesSchema) GetSchemaInfo ¶
func (bs *BytesSchema) GetSchemaInfo() *SchemaInfo
func (*BytesSchema) Validate ¶
func (bs *BytesSchema) Validate(message []byte) error
type Client ¶
type Client interface {
// Create the producer instance
// This method will block until the producer is created successfully
CreateProducer(ProducerOptions) (Producer, error)
CreateProducerWithSchema(ProducerOptions, Schema) (Producer, error)
// Create a `Consumer` by subscribing to a topic.
//
// If the subscription does not exist, a new subscription will be created and all messages published after the
// creation will be retained until acknowledged, even if the consumer is not connected
Subscribe(ConsumerOptions) (Consumer, error)
SubscribeWithSchema(ConsumerOptions, Schema) (Consumer, error)
// Create a Reader instance.
// This method will block until the reader is created successfully.
CreateReader(ReaderOptions) (Reader, error)
CreateReaderWithSchema(ReaderOptions, Schema) (Reader, error)
// Fetch the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
// If the topic is not partitioned, the returned list will contain the topic
// name itself.
//
// This can be used to discover the partitions and create {@link Reader},
// {@link Consumer} or {@link Producer} instances directly on a particular partition.
TopicPartitions(topic string) ([]string, error)
// Close the Client and free associated resources
Close() error
}
func NewClient ¶
func NewClient(options ClientOptions) (Client, error)
type ClientOptions ¶
type ClientOptions struct {
// Configure the service URL for the Pulsar service.
// This parameter is required
URL string
// Number of threads to be used for handling connections to brokers (default: 1 thread)
IOThreads int
// Set the operation timeout (default: 30 seconds)
// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
// operation will be marked as failed
OperationTimeoutSeconds time.Duration
// Set the number of threads to be used for message listeners (default: 1 thread)
MessageListenerThreads int
// Number of concurrent lookup-requests allowed to send on each broker-connection to prevent overload on broker.
// (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe
// on thousands of topic using created Pulsar Client
ConcurrentLookupRequests int
// Provide a custom logger implementation where all Pulsar library info/warn/error messages will be routed
// By default, log messages will be printed on standard output. By passing a logger function, application
// can determine how to print logs. This function will be called each time the Pulsar client library wants
// to write any logs.
Logger func(level log.LoggerLevel, file string, line int, message string)
// Set the path to the trusted TLS certificate file
TLSTrustCertsFilePath string
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
TLSAllowInsecureConnection bool
// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
TLSValidateHostname bool
// Configure the authentication provider. (default: no authentication)
// Example: `Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")`
Authentication
// Set the interval between each stat info (default: 60 seconds). Stats will be activated with positive
// statsIntervalSeconds It should be set to at least 1 second
StatsIntervalInSeconds int
}
Builder interface that is used to construct a Pulsar Client instance.
type CompressionType ¶
type CompressionType int
const ( NoCompression CompressionType = iota LZ4 ZLib ZSTD SNAPPY )
type Consumer ¶
type Consumer interface {
// Get the topic for the consumer
Topic() string
// Get a subscription for the consumer
Subscription() string
// Unsubscribe the consumer
Unsubscribe() error
// Receives a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
//Ack the consumption of a single message
Ack(Message) error
// Ack the consumption of a single message, identified by its MessageID
AckID(MessageID) error
// Ack the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
//
// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
//
// It's equivalent to calling asyncAcknowledgeCumulative(Message) and waiting for the callback to be triggered.
AckCumulative(Message) error
// Ack the reception of all the messages in the stream up to (and including) the provided message.
// This method will block until the acknowledge has been sent to the broker. After that, the messages will not be
// re-delivered to this consumer.
//
// Cumulative acknowledge cannot be used when the consumer type is set to ConsumerShared.
//
// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
AckCumulativeID(MessageID) error
// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NAckRedeliveryDelay .
//
// This call is not blocking.
Nack(Message) error
// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NackRedeliveryDelay .
//
// This call is not blocking.
NackID(MessageID) error
// Close the consumer and stop the broker to push more messages
Close() error
// Reset the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(msgID MessageID) error
// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
// breaks, the messages are redelivered after reconnect.
RedeliverUnackedMessages()
Schema() Schema
}
An interface that abstracts behavior of Pulsar's consumer
type ConsumerMessage ¶
Pair of a Consumer and Message
type ConsumerOptions ¶
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string
// Specify a list of topics this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topics []string
// Specify a regular expression to subscribe to multiple topics under the same namespace.
// Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPattern string
// Specify the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
// Attach a set of application defined properties to the consumer
// This properties will be visible in the topic stats
Properties map[string]string
// Set the timeout for unacked messages
// Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer
// Default is 0, which means message are not being replayed based on ack time
AckTimeout time.Duration
// The delay after which to redeliver the messages that failed to be
// processed. Default is 1min. (See `Consumer.Nack()`)
NackRedeliveryDelay *time.Duration
// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
// InitialPosition at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitPos InitialPosition
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
// Sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
// Set to -1 to disable prefetching in consumer
ReceiverQueueSize int
// Set the max total receiver queue size across partitions.
// This setting will be used to reduce the receiver queue size for individual partitions
// ReceiverQueueSize(int) if the total exceeds this value (default: 50000).
MaxTotalReceiverQueueSizeAcrossPartitions int
// Set the consumer name.
Name string
// If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
// of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
// point, the messages will be sent as normal.
//
// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool
Schema
}
ConsumerBuilder is used to configure and create instances of Consumer
type DoubleSchema ¶
type DoubleSchema struct {
SchemaInfo
}
func NewDoubleSchema ¶
func NewDoubleSchema(properties map[string]string) *DoubleSchema
func (*DoubleSchema) Decode ¶
func (ds *DoubleSchema) Decode(data []byte, v interface{}) error
func (*DoubleSchema) Encode ¶
func (ds *DoubleSchema) Encode(value interface{}) ([]byte, error)
func (*DoubleSchema) GetSchemaInfo ¶
func (ds *DoubleSchema) GetSchemaInfo() *SchemaInfo
func (*DoubleSchema) Validate ¶
func (ds *DoubleSchema) Validate(message []byte) error
type FloatSchema ¶
type FloatSchema struct {
SchemaInfo
}
func NewFloatSchema ¶
func NewFloatSchema(properties map[string]string) *FloatSchema
func (*FloatSchema) Decode ¶
func (fs *FloatSchema) Decode(data []byte, v interface{}) error
func (*FloatSchema) Encode ¶
func (fs *FloatSchema) Encode(value interface{}) ([]byte, error)
func (*FloatSchema) GetSchemaInfo ¶
func (fs *FloatSchema) GetSchemaInfo() *SchemaInfo
func (*FloatSchema) Validate ¶
func (fs *FloatSchema) Validate(message []byte) error
type HashingScheme ¶
type HashingScheme int
const ( JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent Murmur3_32Hash // Use Murmur3 hashing function BoostHash // C++ based boost::hash )
type InitialPosition ¶
type InitialPosition int
const ( // Latest position which means the start consuming position will be the last message Latest InitialPosition = iota // Earliest position which means the start consuming position will be the first message Earliest )
type Int16Schema ¶
type Int16Schema struct {
SchemaInfo
}
func NewInt16Schema ¶
func NewInt16Schema(properties map[string]string) *Int16Schema
func (*Int16Schema) Decode ¶
func (is16 *Int16Schema) Decode(data []byte, v interface{}) error
func (*Int16Schema) Encode ¶
func (is16 *Int16Schema) Encode(value interface{}) ([]byte, error)
func (*Int16Schema) GetSchemaInfo ¶
func (is16 *Int16Schema) GetSchemaInfo() *SchemaInfo
func (*Int16Schema) Validate ¶
func (is16 *Int16Schema) Validate(message []byte) error
type Int32Schema ¶
type Int32Schema struct {
SchemaInfo
}
func NewInt32Schema ¶
func NewInt32Schema(properties map[string]string) *Int32Schema
func (*Int32Schema) Decode ¶
func (is32 *Int32Schema) Decode(data []byte, v interface{}) error
func (*Int32Schema) Encode ¶
func (is32 *Int32Schema) Encode(value interface{}) ([]byte, error)
func (*Int32Schema) GetSchemaInfo ¶
func (is32 *Int32Schema) GetSchemaInfo() *SchemaInfo
func (*Int32Schema) Validate ¶
func (is32 *Int32Schema) Validate(message []byte) error
type Int64Schema ¶
type Int64Schema struct {
SchemaInfo
}
func NewInt64Schema ¶
func NewInt64Schema(properties map[string]string) *Int64Schema
func (*Int64Schema) Decode ¶
func (is64 *Int64Schema) Decode(data []byte, v interface{}) error
func (*Int64Schema) Encode ¶
func (is64 *Int64Schema) Encode(value interface{}) ([]byte, error)
func (*Int64Schema) GetSchemaInfo ¶
func (is64 *Int64Schema) GetSchemaInfo() *SchemaInfo
func (*Int64Schema) Validate ¶
func (is64 *Int64Schema) Validate(message []byte) error
type Int8Schema ¶
type Int8Schema struct {
SchemaInfo
}
func NewInt8Schema ¶
func NewInt8Schema(properties map[string]string) *Int8Schema
func (*Int8Schema) Decode ¶
func (is8 *Int8Schema) Decode(data []byte, v interface{}) error
func (*Int8Schema) Encode ¶
func (is8 *Int8Schema) Encode(value interface{}) ([]byte, error)
func (*Int8Schema) GetSchemaInfo ¶
func (is8 *Int8Schema) GetSchemaInfo() *SchemaInfo
func (*Int8Schema) Validate ¶
func (is8 *Int8Schema) Validate(message []byte) error
type JsonSchema ¶
type JsonSchema struct {
AvroCodec
SchemaInfo
}
func NewJsonSchema ¶
func NewJsonSchema(jsonAvroSchemaDef string, properties map[string]string) *JsonSchema
func (*JsonSchema) Decode ¶
func (js *JsonSchema) Decode(data []byte, v interface{}) error
func (*JsonSchema) Encode ¶
func (js *JsonSchema) Encode(data interface{}) ([]byte, error)
func (*JsonSchema) GetSchemaInfo ¶
func (js *JsonSchema) GetSchemaInfo() *SchemaInfo
func (*JsonSchema) Validate ¶
func (js *JsonSchema) Validate(message []byte) error
type Message ¶
type Message interface {
// Get the topic from which this message originated from
Topic() string
// Return the properties attached to the message.
// Properties are application defined key/value pairs that will be attached to the message
Properties() map[string]string
// Get the payload of the message
Payload() []byte
// Get the unique message ID associated with this message.
// The message id can be used to univocally refer to a message without having the keep the entire payload in memory.
ID() MessageID
// Get the publish time of this message. The publish time is the timestamp that a client publish the message.
PublishTime() time.Time
// Get the event time associated with this message. It is typically set by the applications via
// `ProducerMessage.EventTime`.
// If there isn't any event time associated with this event, it will be nil.
EventTime() *time.Time
// Get the key of the message, if any
Key() string
//Get the de-serialized value of the message, according the configured
GetValue(v interface{}) error
}
type MessageID ¶
type MessageID interface {
// Serialize the message id into a sequence of bytes that can be stored somewhere else
Serialize() []byte
}
Identifier for a particular message
func DeserializeMessageID ¶
Reconstruct a MessageID object from its serialized representation
type MessageRoutingMode ¶
type MessageRoutingMode int
const ( // Publish messages across all partitions in round-robin. RoundRobinDistribution MessageRoutingMode = iota // The producer will chose one single partition and publish all the messages into that partition UseSinglePartition // Use custom message router implementation that will be called to determine the partition for a particular message. CustomPartition )
type Producer ¶
type Producer interface {
// return the topic to which producer is publishing to
Topic() string
// return the producer name which could have been assigned by the system or specified by the client
Name() string
// Send a message
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
Send(context.Context, ProducerMessage) error
// Send a message in asynchronous mode
// The callback will report back the message being published and
// the eventual error in publishing
SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
// Get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
// was published and acknowledged by the broker.
// After recreating a producer with the same producer name, this will return the last message that was
// published in the previous producer session, or -1 if there no message was ever published.
// return the last sequence id published by this producer.
LastSequenceID() int64
// Flush all the messages buffered in the client and wait until all messages have been successfully
// persisted.
Flush() error
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Close() error
Schema() Schema
}
The producer is used to publish messages on a topic
type ProducerMessage ¶
type ProducerMessage struct {
// Payload for the message
Payload []byte
//Value and payload is mutually exclusive, `Value interface{}` for schema message.
Value interface{}
// Sets the key of the message for routing policy
Key string
// Attach application defined properties on the message
Properties map[string]string
// Set the event time for a given message
EventTime time.Time
// Override the replication clusters for this message.
ReplicationClusters []string
// Set the sequence id to assign to the current message
SequenceID int64
}
type ProducerOptions ¶
type ProducerOptions struct {
// Specify the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
// Specify a name for the producer
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.ProducerName().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
// a topic.
Name string
// Attach a set of application defined properties to the producer
// This properties will be visible in the topic stats
Properties map[string]string
// Set the send timeout (default: 30 seconds)
// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
// deduplication feature.
SendTimeout time.Duration
// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
MaxPendingMessages int
// Set the number of max pending messages across all the partitions
// This setting will be used to lower the max pending messages for each partition
// `MaxPendingMessages(int)`, if the total exceeds the configured value.
MaxPendingMessagesAcrossPartitions int
// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
// `ProducerQueueIsFullError` when there is no space left in pending queue.
BlockIfQueueFull bool
// Set the message routing mode for the partitioned producer.
// Default routing mode is round-robin routing.
//
// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
// particular message.
MessageRoutingMode
// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
// Standard hashing functions available are:
//
// - `JavaStringHash` : Java String.hashCode() equivalent
// - `Murmur3_32Hash` : Use Murmur3 hashing function.
// https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
// - `BoostHash` : C++ based boost::hash
//
// Default is `JavaStringHash`.
HashingScheme
// Set the compression type for the producer.
// By default, message payloads are not compressed. Supported compression types are:
// - LZ4
// - ZLIB
// - ZSTD
// - SNAPPY
//
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with ZSTD.
//
// Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with SNAPPY.
CompressionType
// Set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
MessageRouter func(Message, TopicMetadata) int
// Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
//
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
// contents.
//
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
Batching bool
// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
// enabled. If set to a non zero value, messages will be queued until this time interval or until
BatchingMaxPublishDelay time.Duration
// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
// messages will be queued until this threshold is reached or batch interval has elapsed
BatchingMaxMessages uint
}
type ProtoSchema ¶
type ProtoSchema struct {
AvroCodec
SchemaInfo
}
func NewProtoSchema ¶
func NewProtoSchema(protoAvroSchemaDef string, properties map[string]string) *ProtoSchema
func (*ProtoSchema) Decode ¶
func (ps *ProtoSchema) Decode(data []byte, v interface{}) error
func (*ProtoSchema) Encode ¶
func (ps *ProtoSchema) Encode(data interface{}) ([]byte, error)
func (*ProtoSchema) GetSchemaInfo ¶
func (ps *ProtoSchema) GetSchemaInfo() *SchemaInfo
func (*ProtoSchema) Validate ¶
func (ps *ProtoSchema) Validate(message []byte) error
type Reader ¶
type Reader interface {
// The topic from which this reader is reading from
Topic() string
// Read the next message in the topic, blocking until a message is available
Next(context.Context) (Message, error)
// Check if there is any message available to read from the current position
HasNext() (bool, error)
// Close the reader and stop the broker to push more messages
Close() error
Schema() Schema
}
A Reader can be used to scan through all the messages currently available in a topic.
type ReaderMessage ¶
type ReaderOptions ¶
type ReaderOptions struct {
// Specify the topic this consumer will subscribe on.
// This argument is required when constructing the reader.
Topic string
// Set the reader name.
Name string
// The initial reader positioning is done by specifying a message id. The options are:
// * `pulsar.EarliestMessage` : Start reading from the earliest message available in the topic
// * `pulsar.LatestMessage` : Start reading from the end topic, only getting messages published after the
// reader was created
// * `MessageID` : Start reading from a particular message id, the reader will position itself on that
// specific position. The first message to be read will be the message next to the specified
// messageID
StartMessageID MessageID
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ReaderMessage
// Sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the Reader before the
// application calls Reader.readNext(). Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
//
// Default value is {@code 1000} messages and should be good for most use cases.
ReceiverQueueSize int
// Set the subscription role prefix. The default prefix is "reader".
SubscriptionRolePrefix string
// If enabled, the reader will read messages from the compacted topic rather than reading the full message backlog
// of the topic. This means that, if the topic has been compacted, the reader will only see the latest value for
// each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
// point, the messages will be sent as normal.
//
// ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent
// topics will lead to the reader create call throwing a PulsarClientException.
ReadCompacted bool
}
type Result ¶
type Result int
const ( UnknownError Result = 1 // Unknown error happened on broker InvalidConfiguration Result = 2 // Invalid configuration TimeoutError Result = 3 // Operation timed out LookupError Result = 4 // Broker lookup failed ConnectError Result = 5 // Failed to connect to broker ReadError Result = 6 // Failed to read from socket AuthenticationError Result = 7 // Authentication failed on broker AuthorizationError Result = 8 // Client is not authorized to create producer/consumer ErrorGettingAuthenticationData Result = 9 // Client cannot find authorization data BrokerMetadataError Result = 10 // Broker failed in updating metadata BrokerPersistenceError Result = 11 // Broker failed to persist entry ChecksumError Result = 12 // Corrupt message checksum failure ConsumerBusy Result = 13 // Exclusive consumer is already connected NotConnectedError Result = 14 // Producer/Consumer is not currently connected to broker AlreadyClosedError Result = 15 // Producer/Consumer is already closed and not accepting any operation InvalidMessage Result = 16 // Error in publishing an already used message ConsumerNotInitialized Result = 17 // Consumer is not initialized ProducerNotInitialized Result = 18 // Producer is not initialized TooManyLookupRequestException Result = 19 // Too Many concurrent LookupRequest InvalidTopicName Result = 20 // Invalid topic name InvalidUrl Result = 21 // Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) ServiceUnitNotReady Result = 22 // Service Unit unloaded between client did lookup and producer/consumer got created OperationNotSupported Result = 23 ProducerBlockedQuotaExceededError Result = 24 // Producer is blocked ProducerBlockedQuotaExceededException Result = 25 // Producer is getting exception ProducerQueueIsFull Result = 26 // Producer queue is full MessageTooBig Result = 27 // Trying to send a messages exceeding the max size TopicNotFound Result = 28 // Topic not found SubscriptionNotFound Result = 29 // Subscription not found ConsumerNotFound Result = 30 // Consumer not found UnsupportedVersionError Result = 31 // Error when an older client/version doesn't support a required feature TopicTerminated Result = 32 // Topic was already terminated CryptoError Result = 33 // Error when crypto operation fails )
type SchemaInfo ¶
type SchemaInfo struct {
Name string
Schema string
Type SchemaType
Properties map[string]string
}
Encapsulates data around the schema definition
type SchemaType ¶
type SchemaType int
const ( NONE SchemaType = iota //No schema defined STRING //Simple String encoding with UTF-8 JSON //JSON object encoding and validation PROTOBUF //Protobuf message encoding and decoding AVRO //Serialize and deserialize via Avro BOOLEAN // INT8 //A 8-byte integer. INT16 //A 16-byte integer. INT32 //A 32-byte integer. INT64 //A 64-byte integer. FLOAT //A float number. DOUBLE //A double number KEY_VALUE //A Schema that contains Key Schema and Value Schema. BYTES = -1 //A bytes array. AUTO = -2 // AUTO_CONSUME = -3 //Auto Consume Type. AUTO_PUBLISH = -4 // Auto Publish Type. )
type StringSchema ¶
type StringSchema struct {
SchemaInfo
}
func NewStringSchema ¶
func NewStringSchema(properties map[string]string) *StringSchema
func (*StringSchema) Decode ¶
func (ss *StringSchema) Decode(data []byte, v interface{}) error
func (*StringSchema) Encode ¶
func (ss *StringSchema) Encode(v interface{}) ([]byte, error)
func (*StringSchema) GetSchemaInfo ¶
func (ss *StringSchema) GetSchemaInfo() *SchemaInfo
func (*StringSchema) Validate ¶
func (ss *StringSchema) Validate(message []byte) error
type SubscriptionType ¶
type SubscriptionType int
Types of subscription supported by Pulsar
const ( // There can be only 1 consumer on the same topic with the same subscription name Exclusive SubscriptionType = iota // a round-robin rotation between the connected consumers Shared // Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages. // If that consumer disconnects, one of the other connected consumers will start receiving messages. Failover // will be dispatched to only one consumer KeyShared )
type TopicMetadata ¶
type TopicMetadata interface {
// Get the number of partitions for the specific topic
NumPartitions() int
}