kafka

package module
Version: v0.0.0-...-1fdb7e3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2022 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package kafka provides a Kafka abstraction through github.com/Shopify/sarama.

WARNING: there's no message ack-ing done by Subscriber, so no automatic resuming from last-processed message. Subscribing is done only from oldest-known or newest or manually-specified numeric offset.

Both bps.NewPublisher (`kafka` + `kafka+sync` schemes) and bps.NewConsumer (`kafka` scheme) support the following query parameters:

client.id
   A user-provided string sent with every request to the brokers for logging debugging, and auditing
   purposes.
rack.id
   A rack identifier for this client. This can be any string value which indicates where this client
   is physically located. It corresponds with the broker config 'broker.rack'.
net.max.requests
   How many outstanding requests a connection is allowed to have before sending on it blocks (default 5).
net.dial.timeout
   How long to wait for the initial connection (default 30s).
net.read.timeout
   How long to wait for a response (default 30s).
net.write.timeout
   How long to wait for for a transmit (default 30s).
net.tls.enable
   Whether or not to use TLS when connecting to the broker (defaults to false).
kafka.version
   The version of the kafka server.
channel.buffer.size
   The number of events to buffer in internal and external channels. This permits the producer to
   continue processing some messages in the background while user code is working, greatly improving
   throughput (default 256).

bps.NewPublisher supports `kafka` + `kafka+sync` schemes and the following query parameters:

acks
   The number of acks required before considering a request complete. When acks=0, the producer will
   not wait for any acknowledgment. When acks=1 the leader will write the record to its local log but
   will respond without awaiting full acknowledgement from all followers. When acks=all the leader will
   wait for the full set of in-sync replicas to acknowledge the record.
message.max.bytes
   The maximum permitted size of a message (default 1,000,000). Should be
   set equal to or smaller than the broker's `message.max.bytes`.
compression.type
   The compression type for all data generated by the producer. Valid values are: none, gzip, snappy,
   lz4 (default none).
partitioner
   The partitioner used to partition messages (defaults to hashing the message ID). Valid values are:
   hash, random, roundrobin,
timeout
   The maximum duration the broker will wait the receipt of the number of acks (default 10s).
   This is only relevant when acks=all or a number > 1.
flush.bytes
   he best-effort number of bytes needed to trigger a flush.
flush.messages
   The best-effort number of messages needed to trigger a flush.
flush.frequency
   The best-effort frequency of flushes.
retry.max
   The total number of times to retry sending a message (default 3).
retry.backoff
   How long to wait for the cluster to settle between retries (default 100ms).

bps.NewConsumer (`kafka://` scheme) supports:

offsets.initial
  Offset to start consuming from. Can be "oldest" (oldest available message)
  or "newest" (only new messages - produced after subscribing)
  or just numeric offset value.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher wraps a kafka producer and implements the bps.Publisher interface.

Example
package main

import (
	"context"

	"github.com/bsm/bps"
	_ "github.com/bsm/bps/kafka"
)

func main() {
	ctx := context.TODO()
	pub, err := bps.NewPublisher(ctx, "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/?client.id=my-client&kafka.version=2.3.0&channel.buffer.size=1024")
	if err != nil {
		panic(err.Error())
	}
	defer pub.Close()

	if err := pub.Topic("topic").Publish(ctx, &bps.PubMessage{Data: []byte("message")}); err != nil {
		panic(err.Error())
	}
}
Output:

func NewPublisher

func NewPublisher(addrs []string, config *sarama.Config) (*Publisher, error)

NewPublisher inits a new async publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

Close implements the bps.Publisher interface.

func (*Publisher) Producer

func (p *Publisher) Producer() sarama.AsyncProducer

Producer exposes the native producer. Use at your own risk!

func (*Publisher) Topic

func (p *Publisher) Topic(name string) bps.PubTopic

Topic implements the bps.Publisher interface.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber wraps a kafka consumer and implements the bps.Subscriber interface.

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/bsm/bps"
	_ "github.com/bsm/bps/kafka"
)

func main() {
	subscriber, err := bps.NewSubscriber(context.TODO(), "kafka://10.0.0.1:9092,10.0.0.2:9092,10.0.0.3:9092/?client.id=my-client&kafka.version=2.3.0&channel.buffer.size=1024")
	if err != nil {
		panic(err.Error())
	}
	defer subscriber.Close()

	subscription, err := subscriber.Topic("topic").Subscribe(
		bps.HandlerFunc(func(msg bps.SubMessage) {
			_, _ = fmt.Printf("%s\n", string(msg.Data()))
		}),
		bps.StartAt(bps.PositionOldest),
	)
	if err != nil {
		panic(err.Error())
	}
	defer subscription.Close()

	time.Sleep(time.Second) // wait to receive some messages
}
Output:

func NewSubscriber

func NewSubscriber(addrs []string, config *sarama.Config) (*Subscriber, error)

NewSubscriber inits a new subscriber. By default, it starts handling from the newest available message (published after subscribing).

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close implements the bps.Subscriber interface.

func (*Subscriber) Topic

func (s *Subscriber) Topic(name string) bps.SubTopic

Topic returns named topic handle.

type SyncPublisher

type SyncPublisher struct {
	// contains filtered or unexported fields
}

SyncPublisher wraps a synchronous kafka producer and implements the bps.Publisher interface.

func NewSyncPublisher

func NewSyncPublisher(addrs []string, config *sarama.Config) (*SyncPublisher, error)

NewSyncPublisher inits a new async publisher.

func (*SyncPublisher) Close

func (p *SyncPublisher) Close() error

Close implements the bps.Publisher interface.

func (*SyncPublisher) Producer

func (p *SyncPublisher) Producer() sarama.SyncProducer

Producer exposes the native producer. Use at your own risk!

func (*SyncPublisher) Topic

func (p *SyncPublisher) Topic(name string) bps.PubTopic

Topic implements the bps.Publisher interface.

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL