sarama

package
Version: v0.0.0-...-0f5cdac Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2015 License: MIT, MIT Imports: 19 Imported by: 0

README

sarama

Build Status GoDoc

Sarama is an MIT-licensed Go client library for Apache Kafka 0.8 (and later).

Documentation is available via godoc at http://godoc.org/github.com/Shopify/sarama

There is a google group for discussion

It is compatible with Go 1.1, 1.2, and 1.3 (which means go vet on 1.2 or 1.3 may return some suggestions that we are ignoring for the sake of compatibility with 1.1).

A word of warning: the API is not 100% stable yet. It won't change much (in particular the low-level Broker and Request/Response objects could probably be considered frozen) but there may be the occasional parameter added or function renamed. As far as semantic versioning is concerned, we haven't quite hit 1.0.0 yet. It is absolutely stable enough to use, just expect that you might have to tweak things when you update to a newer version.

Other related links:

Documentation

Overview

Package sarama provides client libraries for the Kafka 0.8 protocol. The Client, Producer and Consumer objects are the core of the high-level API. The Broker and Request/Response objects permit more precise control.

The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Index

Examples

Constants

View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received.

Variables

View Source
var AlreadyConnected = errors.New("kafka: broker: already connected")

AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.

View Source
var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")

ClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var EncodingError = errors.New("kafka: Error while encoding packet.")

EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

View Source
var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")

IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected.")

InsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

InvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of golang's log.Logger that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

View Source
var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")

MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize

View Source
var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")

NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.

View Source
var NotConnected = errors.New("kafka: broker: not connected")

NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")

OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

Functions

func SnappyDecode

func SnappyDecode(src []byte) ([]byte, error)

SnappyDecode decodes snappy data

func SnappyEncode

func SnappyEncode(src []byte) ([]byte, error)

SnappyEncode encodes binary data

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Example
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	return err
}
defer broker.Close()

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
if err != nil {
	return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

return nil
Output:

func NewBroker

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) Close

func (b *Broker) Close() (err error)

func (*Broker) CommitOffset

func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*Broker) Connected

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) Fetch

func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)

func (*Broker) FetchOffset

func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*Broker) GetAvailableOffsets

func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)

func (*Broker) GetConsumerMetadata

func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

func (*Broker) GetMetadata

func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) Open

func (b *Broker) Open(conf *BrokerConfig) error

Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which connects and releases the lock. This means any subsequent operations on the broker will block waiting for the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.

func (*Broker) Produce

func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)

type BrokerConfig

type BrokerConfig struct {
	MaxOpenRequests int           // How many outstanding requests the broker is allowed to have before blocking attempts to send.
	ReadTimeout     time.Duration // How long to wait for a response before timing out and returning an error.
	WriteTimeout    time.Duration // How long to wait for a transmit to succeed before timing out and returning an error.
}

BrokerConfig is used to pass multiple configuration options to Broker.Open.

func NewBrokerConfig

func NewBrokerConfig() *BrokerConfig

NewBrokerConfig returns a new broker configuration with sane defaults.

func (*BrokerConfig) Validate

func (config *BrokerConfig) Validate() error

Validate checks a BrokerConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like

producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.

func NewClient

func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)

NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

func (*Client) Close

func (client *Client) Close() error

Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.

func (*Client) Closed

func (client *Client) Closed() bool

func (*Client) GetOffset

func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)

GetOffset queries the cluster to get the most recent available offset at the given time on the topic/partition combination.

func (*Client) Leader

func (client *Client) Leader(topic string, partitionID int32) (*Broker, error)

Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.

func (*Client) Partitions

func (client *Client) Partitions(topic string) ([]int32, error)

Partitions returns the sorted list of available partition IDs for the given topic.

func (*Client) RefreshAllMetadata

func (client *Client) RefreshAllMetadata() error

RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.

func (*Client) RefreshTopicMetadata

func (client *Client) RefreshTopicMetadata(topics ...string) error

RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

Topics returns the set of available topics as retrieved from the cluster metadata.

type ClientConfig

type ClientConfig struct {
	MetadataRetries   int           // How many times to retry a metadata request when a partition is in the middle of leader election.
	WaitForElection   time.Duration // How long to wait for leader election to finish between retries.
	DefaultBrokerConf *BrokerConfig // Default configuration for broker connections created by this client.
}

ClientConfig is used to pass multiple configuration options to NewClient.

func NewClientConfig

func NewClientConfig() *ClientConfig

NewClientConfig creates a new ClientConfig instance with sensible defaults

func (*ClientConfig) Validate

func (config *ClientConfig) Validate() error

Validate checks a ClientConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	CompressionNone   CompressionCodec = 0
	CompressionGZIP   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
)

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer processes Kafka messages from a given topic and partition. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> consumer ready")
}
defer consumer.Close()

msgCount := 0
consumerLoop:
for {
	select {
	case event := <-consumer.Events():
		if event.Err != nil {
			panic(event.Err)
		}
		msgCount++
	case <-time.After(5 * time.Second):
		fmt.Println("> timed out")
		break consumerLoop
	}
}
fmt.Println("Got", msgCount, "messages.")
Output:

func NewConsumer

func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error)

NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as part of the named consumer group.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer from fetching messages. It is required to call this function before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Consumer) Events

func (c *Consumer) Events() <-chan *ConsumerEvent

Events returns the read channel for any events (messages or errors) that might be returned by the broker.

type ConsumerConfig

type ConsumerConfig struct {
	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
	DefaultFetchSize int32
	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
	// The default is 1, as 0 causes the consumer to spin when no messages are available.
	MinFetchSize int32
	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
	// treated as no limit.
	MaxMessageSize int32
	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
	MaxWaitTime time.Duration

	// The method used to determine at which offset to begin consuming messages.
	OffsetMethod OffsetMethod
	// Interpreted differently according to the value of OffsetMethod.
	OffsetValue int64

	// The number of events to buffer in the Events channel. Having this non-zero permits the
	// consumer to continue fetching messages in the background while client code consumes events,
	// greatly improving throughput. The default is 16.
	EventBufferSize int
}

ConsumerConfig is used to pass multiple configuration options to NewConsumer.

func NewConsumerConfig

func NewConsumerConfig() *ConsumerConfig

NewConsumerConfig creates a ConsumerConfig instance with sane defaults.

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() error

Validate checks a ConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type ConsumerEvent

type ConsumerEvent struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Err        error
}

ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Err             KError
	CoordinatorId   int32
	CoordinatorHost string
	CoordinatorPort int32
}

type DecodingError

type DecodingError struct {
	Info string
}

DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (DecodingError) Error

func (err DecodingError) Error() string

type DroppedMessagesError

type DroppedMessagesError struct {
	DroppedMessages int
	Err             error
}

DroppedMessagesError is returned from a producer when messages weren't able to be successfully delivered to a broker.

func (DroppedMessagesError) Error

func (err DroppedMessagesError) Error() string

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message.

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	// contains filtered or unexported fields
}

func (*FetchRequest) AddBlock

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse

type FetchResponse struct {
	Blocks map[string]map[int32]*FetchResponseBlock
}

func (*FetchResponse) AddMessage

func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

type FetchResponseBlock

type FetchResponseBlock struct {
	Err                 KError
	HighWaterMarkOffset int64
	MsgSet              MessageSet
}

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func NewHashPartitioner

func NewHashPartitioner() *HashPartitioner

func (*HashPartitioner) Partition

func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	NoError                         KError = 0
	Unknown                         KError = -1
	OffsetOutOfRange                KError = 1
	InvalidMessage                  KError = 2
	UnknownTopicOrPartition         KError = 3
	InvalidMessageSize              KError = 4
	LeaderNotAvailable              KError = 5
	NotLeaderForPartition           KError = 6
	RequestTimedOut                 KError = 7
	BrokerNotAvailable              KError = 8
	MessageSizeTooLarge             KError = 10
	StaleControllerEpochCode        KError = 11
	OffsetMetadataTooLarge          KError = 12
	OffsetsLoadInProgress           KError = 14
	ConsumerCoordinatorNotAvailable KError = 15
	NotCoordinatorForConsumer       KError = 16
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type Message

type Message struct {
	Codec CompressionCodec // codec used to compress the message contents
	Key   []byte           // the message key, may be nil
	Value []byte           // the message contents
	Set   *MessageSet      // the message set a message might wrap
	// contains filtered or unexported fields
}

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	Messages               []*MessageBlock
}

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	Topics  []*TopicMetadata
}

func (*MetadataResponse) AddBroker

func (m *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopicPartition

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32)

type MockBroker

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that accepts a single connection. It reads Kafka requests from that connection and returns each response from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response the server sleeps for 250ms instead of reading a request).

When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

func NewMockBroker

func NewMockBroker(t TestState, brokerID int32) *MockBroker

NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestState and the broker exits.

func (*MockBroker) Addr

func (b *MockBroker) Addr() string

func (*MockBroker) BrokerID

func (b *MockBroker) BrokerID() int32

func (*MockBroker) Close

func (b *MockBroker) Close()

func (*MockBroker) Port

func (b *MockBroker) Port() int32

func (*MockBroker) Returns

func (b *MockBroker) Returns(e encoder)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]KError
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      KError
}

type OffsetMethod

type OffsetMethod int

OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.

const (
	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
	// offset at which to start, allowing the user to manually specify their desired starting offset.
	OffsetMethodManual OffsetMethod = iota
	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
	// determined by querying the broker.
	OffsetMethodNewest
	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
	// determined by querying the broker.
	OffsetMethodOldest
)

type OffsetRequest

type OffsetRequest struct {
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32)

type OffsetResponse

type OffsetResponse struct {
	Blocks map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err     KError
	Offsets []int64
}

type OffsetTime

type OffsetTime int64

OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64 value will be interpreted as milliseconds, or use the special constants defined here.

const (
	// LatestOffsets askes for the latest offsets.
	LatestOffsets OffsetTime = -1
	// EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
	// asking for the earliest offset will always return you a single element.
	EarliestOffset OffsetTime = -2
)

type PartitionMetadata

type PartitionMetadata struct {
	Err      KError
	ID       int32
	Leader   int32
	Replicas []int32
	Isr      []int32
}

type Partitioner

type Partitioner interface {
	Partition(key Encoder, numPartitions int32) int32
}

Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks RequiredAcks
	Timeout      int32
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddMessage

func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

type ProduceResponse

type ProduceResponse struct {
	Blocks map[string]map[int32]*ProduceResponseBlock
}

func (*ProduceResponse) AddTopicPartition

func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err    KError
	Offset int64
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer publishes Kafka messages. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks: it may not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

The default values for MaxBufferedBytes and MaxBufferTime cause sarama to deliver messages immediately, but to buffer subsequent messages while a previous request is in-flight. This is often the correct behaviour.

If synchronous operation is desired, you can use SendMessage. This will cause sarama to block until the broker has returned a value. Normally, you will want to use QueueMessage instead, and read the error back from the Errors() channel. Note that when using QueueMessage, you *must* read the values from the Errors() channel, or sarama will block indefinitely after a few requests.

Example
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

producer, err := NewProducer(client, nil)
if err != nil {
	panic(err)
}
defer producer.Close()

err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
if err != nil {
	panic(err)
} else {
	fmt.Println("> message sent")
}
Output:

func NewProducer

func NewProducer(client *Client, config *ProducerConfig) (*Producer, error)

NewProducer creates a new Producer using the given client.

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Producer) Errors

func (p *Producer) Errors() chan error

Errors provides access to errors generated while parsing ProduceResponses from kafka when operating in asynchronous mode. Should never be called in synchronous mode.

func (*Producer) QueueMessage

func (p *Producer) QueueMessage(topic string, key, value Encoder) error

QueueMessage sends a message with the given key and value to the given topic. The partition to send to is selected by the Producer's Partitioner. To send strings as either key or value, see the StringEncoder type.

QueueMessage uses buffering semantics to reduce the nubmer of requests to the broker. The buffer logic is tunable with config.MaxBufferedBytes and config.MaxBufferTime.

QueueMessage will return an error if it's unable to construct the message (unlikely), but network and response errors must be read from Errors(), since QueueMessage uses asynchronous delivery. Note that you MUST read back from Errors(), otherwise the producer will stall after some number of errors.

If you care about message ordering, you should not call QueueMessage and SendMessage on the same Producer. Either, used alone, preserves ordering, however.

func (*Producer) SendMessage

func (p *Producer) SendMessage(topic string, key, value Encoder) (err error)

SendMessage sends a message with the given key and value to the given topic. The partition to send to is selected by the Producer's Partitioner. To send strings as either key or value, see the StringEncoder type.

Unlike QueueMessage, SendMessage operates synchronously, and will block until the response is received from the broker, returning any error generated in the process. Reading from Errors() may interfere with the operation of SendMessage().

If you care about message ordering, you should not call QueueMessage and SendMessage on the same Producer.

type ProducerConfig

type ProducerConfig struct {
	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
	Timeout          time.Duration    // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
	MaxBufferTime    time.Duration    // The maximum duration to buffer messages before sending to a broker.
}

ProducerConfig is used to pass multiple configuration options to NewProducer.

If MaxBufferTime=MaxBufferedBytes=1, messages will be delivered immediately and constantly, but if multiple messages are received while a roundtrip to kafka is in progress, they will both be combined into the next request. In this mode, errors are not returned from SendMessage, but over the Errors() channel.

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

NewProducerConfig creates a new ProducerConfig instance with sensible defaults.

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() error

Validate checks a ProducerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type RandomPartitioner

type RandomPartitioner struct {
	// contains filtered or unexported fields
}

RandomPartitioner implements the Partitioner interface by choosing a random partition each time.

func NewRandomPartitioner

func NewRandomPartitioner() *RandomPartitioner

func (*RandomPartitioner) Partition

func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any positive int16 value is valid, or the constants defined here.

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type RoundRobinPartitioner

type RoundRobinPartitioner struct {
	// contains filtered or unexported fields
}

RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.

func (*RoundRobinPartitioner) Partition

func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that you can do things like

producer.SendMessage(nil, sarama.StringEncoder("hello world"))

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

type TestState

type TestState interface {
	Error(args ...interface{})
	Fatal(args ...interface{})
	Fatalf(format string, args ...interface{})
}

TestState is a generic interface for a test state, implemented e.g. by testing.T

type TopicMetadata

type TopicMetadata struct {
	Err        KError
	Name       string
	Partitions []*PartitionMetadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto
y or Y : Canonical URL