Documentation
¶
Index ¶
- func MustNewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) queue.MessageQueue
- func NewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) (queue.MessageQueue, error)
- type BatchHandle
- type ConsumeErrorHandler
- type ConsumeHandle
- type ConsumeHandler
- type KqConf
- type PushOption
- func WithAllowAutoTopicCreation() PushOption
- func WithBalancer(balancer kafka.Balancer) PushOption
- func WithChunkSize(chunkSize int) PushOption
- func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption
- func WithFlushInterval(interval time.Duration) PushOption
- func WithMaxAttempts(maxAttempts int) PushOption
- func WithRequiredAcks(acks kafka.RequiredAcks) PushOption
- func WithSyncPush() PushOption
- func WithWriteBackoffMax(writeBackoffMax time.Duration) PushOption
- func WithWriteBackoffMin(writeBackoffMin time.Duration) PushOption
- type Pusher
- type QueueOption
- func WithBatchFlushInterval(flushInterval string) QueueOption
- func WithBatchHandle(batchHandle BatchHandle) QueueOption
- func WithBatchSize(batchSize int) QueueOption
- func WithCommitInterval(interval time.Duration) QueueOption
- func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption
- func WithMaxWait(wait time.Duration) QueueOption
- func WithMetrics(metrics *stat.Metrics) QueueOption
- func WithQueueCapacity(queueCapacity int) QueueOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MustNewQueue ¶
func MustNewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) queue.MessageQueue
func NewQueue ¶
func NewQueue(c KqConf, handler ConsumeHandler, queueOpts ...QueueOption) (queue.MessageQueue, error)
Types ¶
type BatchHandle ¶
type ConsumeErrorHandler ¶
type ConsumeHandler ¶
func WithHandle ¶
func WithHandle(handle ConsumeHandle) ConsumeHandler
type KqConf ¶
type KqConf struct { service.ServiceConf Brokers []string Group string Topic string CaFile string `json:",optional"` Offset string `json:",options=first|last,default=last"` Conns int `json:",default=1"` Consumers int `json:",default=8"` Processors int `json:",default=8"` MinBytes int `json:",default=10240"` // 10K MaxBytes int `json:",default=10485760"` // 10M Username string `json:",optional"` Password string `json:",optional"` //ForceCommit bool `json:",default=true"` CommitInOrder bool `json:",default=false"` }
type PushOption ¶
type PushOption func(options *pushOptions)
func WithAllowAutoTopicCreation ¶
func WithAllowAutoTopicCreation() PushOption
WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithBalancer ¶
func WithBalancer(balancer kafka.Balancer) PushOption
WithBalancer customizes the Pusher with the given balancer.
func WithChunkSize ¶
func WithChunkSize(chunkSize int) PushOption
WithChunkSize customizes the Pusher with the given chunk size.
func WithCompletion ¶
func WithCompletion(completion func(messages []kafka.Message, err error)) PushOption
WithCompletion
@Description: @param completion @return PushOption
func WithFlushInterval ¶
func WithFlushInterval(interval time.Duration) PushOption
WithFlushInterval customizes the Pusher with the given flush interval.
func WithMaxAttempts ¶
func WithMaxAttempts(maxAttempts int) PushOption
WithMaxAttempts
@Description: @param MaxAttempts @return PushOption
func WithRequiredAcks ¶
func WithRequiredAcks(acks kafka.RequiredAcks) PushOption
WithRequiredAcks
@Description: @param acks
Number of acknowledges from partition replicas required before receiving a response to a produce request, the following values are supported:
RequireNone (0) fire-and-forget, do not wait for acknowledgements from the RequireOne (1) wait for the leader to acknowledge the writes RequireAll (-1) wait for the full ISR to acknowledge the writes @return PushOption
func WithSyncPush ¶
func WithSyncPush() PushOption
WithSyncPush enables the Pusher to push messages synchronously.
func WithWriteBackoffMax ¶
func WithWriteBackoffMax(writeBackoffMax time.Duration) PushOption
WithWriteBackoffMax
@Description: WriteBackoffMax optionally sets the maximum amount of time the writer waits before it attempts to write a batchProcessor of messages Default: 1s @param writeBackoffMax @return PushOption
func WithWriteBackoffMin ¶
func WithWriteBackoffMin(writeBackoffMin time.Duration) PushOption
WithWriteBackoffMin
@Description:
WriteBackoffMin optionally sets the smallest amount of time the writer waits before it attempts to write a batchProcessor of messages
Default: 100ms
@param writeBackoffMin @return PushOption
type Pusher ¶
type Pusher struct {
// contains filtered or unexported fields
}
func NewPusher ¶
func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher
NewPusher returns a Pusher with the given Kafka addresses and topic.
func (*Pusher) Name ¶
Name returns the name of the Kafka topic that the Pusher is sending messages to.
type QueueOption ¶
type QueueOption func(*queueOptions)
func WithBatchFlushInterval ¶
func WithBatchFlushInterval(flushInterval string) QueueOption
WithBatchFlushInterval
@Description: batchProcessor flush of the windows @param batchFlushInterval default 1s @return BatchOption
func WithBatchHandle ¶
func WithBatchHandle(batchHandle BatchHandle) QueueOption
WithBatchHandle
@Description: batchProcessor handle @param batchHandle @return QueueOption
func WithBatchSize ¶
func WithBatchSize(batchSize int) QueueOption
WithBatchSize
@Description: @param batchSize default 1000 @return BatchOption
func WithCommitInterval ¶
func WithCommitInterval(interval time.Duration) QueueOption
WithCommitInterval
@Description: @param interval @return QueueOption
func WithErrorHandler ¶
func WithErrorHandler(errorHandler ConsumeErrorHandler) QueueOption
WithErrorHandler
@Description: go-queue handle message ,if err!=nil ,will call this func @param errorHandler @return QueueOption
func WithMaxWait ¶
func WithMaxWait(wait time.Duration) QueueOption
func WithMetrics ¶
func WithMetrics(metrics *stat.Metrics) QueueOption
func WithQueueCapacity ¶
func WithQueueCapacity(queueCapacity int) QueueOption