sarama

package module
v0.0.0-...-0a9f80b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2014 License: MIT Imports: 19 Imported by: 0

README

sarama

Build Status GoDoc

Sarama is an MIT-licensed Go client library for Apache Kafka 0.8 (and later).

Documentation is available via godoc at http://godoc.org/github.com/Shopify/sarama

There is a google group for Kafka client users and authors at https://groups.google.com/forum/#!forum/kafka-clients

Sarama is compatible with Go 1.1, 1.2, and 1.3 (which means go vet on 1.2 or 1.3 may return some suggestions that we are ignoring for the sake of compatibility with 1.1).

A word of warning: the API is not 100% stable. It won't change much (in particular the low-level Broker and Request/Response objects could probably be considered frozen) but there may be the occasional parameter added or function renamed. As far as semantic versioning is concerned, we haven't quite hit 1.0.0 yet. It is absolutely stable enough to use, just expect that you might have to tweak things when you update to a newer version.

Other related links:

Documentation

Overview

Package sarama provides client libraries for the Kafka 0.8 protocol. The Client, Producer and Consumer objects are the core of the high-level API. The Broker and Request/Response objects permit more precise control.

The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Index

Examples

Constants

View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received.

Variables

View Source
var AlreadyConnected = errors.New("kafka: broker: already connected")

AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.

View Source
var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")

ClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var EncodingError = errors.New("kafka: Error while encoding packet.")

EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

View Source
var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")

IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var InsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected.")

InsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

InvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var MaxRequestSize uint32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an EncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")

MessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize

View Source
var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")

NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.

View Source
var NotConnected = errors.New("kafka: broker: not connected")

NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")

OutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

View Source
var ShuttingDown = errors.New("kafka: Message received by producer in process of shutting down.")

ShuttingDown is returned when a producer receives a message during shutdown.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Example
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	return err
}
defer broker.Close()

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
if err != nil {
	return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

return nil
Output:

func NewBroker

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) Close

func (b *Broker) Close() (err error)

func (*Broker) CommitOffset

func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*Broker) Connected

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) Fetch

func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)

func (*Broker) FetchOffset

func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*Broker) GetAvailableOffsets

func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)

func (*Broker) GetConsumerMetadata

func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

func (*Broker) GetMetadata

func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) Open

func (b *Broker) Open(conf *BrokerConfig) error

Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which connects and releases the lock. This means any subsequent operations on the broker will block waiting for the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.

func (*Broker) Produce

func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)

type BrokerConfig

type BrokerConfig struct {
	MaxOpenRequests int           // How many outstanding requests the broker is allowed to have before blocking attempts to send.
	DialTimeout     time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error.
	ReadTimeout     time.Duration // How long to wait for a response before timing out and returning an error.
	WriteTimeout    time.Duration // How long to wait for a transmit to succeed before timing out and returning an error.
}

BrokerConfig is used to pass multiple configuration options to Broker.Open.

func NewBrokerConfig

func NewBrokerConfig() *BrokerConfig

NewBrokerConfig returns a new broker configuration with sane defaults.

func (*BrokerConfig) Validate

func (config *BrokerConfig) Validate() error

Validate checks a BrokerConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like

producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.

func NewClient

func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)

NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

func (*Client) Close

func (client *Client) Close() error

Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.

func (*Client) Closed

func (client *Client) Closed() bool

func (*Client) GetOffset

func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)

GetOffset queries the cluster to get the most recent available offset at the given time on the topic/partition combination.

func (*Client) Leader

func (client *Client) Leader(topic string, partitionID int32) (*Broker, error)

Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.

func (*Client) Partitions

func (client *Client) Partitions(topic string) ([]int32, error)

Partitions returns the sorted list of available partition IDs for the given topic.

func (*Client) RefreshAllMetadata

func (client *Client) RefreshAllMetadata() error

RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.

func (*Client) RefreshTopicMetadata

func (client *Client) RefreshTopicMetadata(topics ...string) error

RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

Topics returns the set of available topics as retrieved from the cluster metadata.

type ClientConfig

type ClientConfig struct {
	MetadataRetries            int           // How many times to retry a metadata request when a partition is in the middle of leader election.
	WaitForElection            time.Duration // How long to wait for leader election to finish between retries.
	DefaultBrokerConf          *BrokerConfig // Default configuration for broker connections created by this client.
	BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable.
}

ClientConfig is used to pass multiple configuration options to NewClient.

func NewClientConfig

func NewClientConfig() *ClientConfig

NewClientConfig creates a new ClientConfig instance with sensible defaults

func (*ClientConfig) Validate

func (config *ClientConfig) Validate() error

Validate checks a ClientConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	CompressionNone   CompressionCodec = 0
	CompressionGZIP   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
)

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type ConstantPartitioner

type ConstantPartitioner struct {
	Constant int32
}

ConstantPartitioner implements the Partitioner interface by just returning a constant value.

func (*ConstantPartitioner) Partition

func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) int32

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer processes Kafka messages from a given topic and partition. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> consumer ready")
}
defer consumer.Close()

msgCount := 0
consumerLoop:
for {
	select {
	case event := <-consumer.Events():
		if event.Err != nil {
			panic(event.Err)
		}
		msgCount++
	case <-time.After(5 * time.Second):
		fmt.Println("> timed out")
		break consumerLoop
	}
}
fmt.Println("Got", msgCount, "messages.")
Output:

func NewConsumer

func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error)

NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as part of the named consumer group.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer from fetching messages. It is required to call this function before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Consumer) Events

func (c *Consumer) Events() <-chan *ConsumerEvent

Events returns the read channel for any events (messages or errors) that might be returned by the broker.

type ConsumerConfig

type ConsumerConfig struct {
	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
	DefaultFetchSize int32
	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
	// The default is 1, as 0 causes the consumer to spin when no messages are available.
	MinFetchSize int32
	// The maximum permittable message size - messages larger than this will return MessageTooLarge. The default of 0 is
	// treated as no limit.
	MaxMessageSize int32
	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
	MaxWaitTime time.Duration

	// The method used to determine at which offset to begin consuming messages.
	OffsetMethod OffsetMethod
	// Interpreted differently according to the value of OffsetMethod.
	OffsetValue int64

	// The number of events to buffer in the Events channel. Having this non-zero permits the
	// consumer to continue fetching messages in the background while client code consumes events,
	// greatly improving throughput. The default is 16.
	EventBufferSize int
}

ConsumerConfig is used to pass multiple configuration options to NewConsumer.

func NewConsumerConfig

func NewConsumerConfig() *ConsumerConfig

NewConsumerConfig creates a ConsumerConfig instance with sane defaults.

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() error

Validate checks a ConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type ConsumerEvent

type ConsumerEvent struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
	Err        error
}

ConsumerEvent is what is provided to the user when an event occurs. It is either an error (in which case Err is non-nil) or a message (in which case Err is nil and Offset, Key, and Value are set). Topic and Partition are always set.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Err             KError
	CoordinatorId   int32
	CoordinatorHost string
	CoordinatorPort int32
}

type DecodingError

type DecodingError struct {
	Info string
}

DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (DecodingError) Error

func (err DecodingError) Error() string

type DroppedMessagesError

type DroppedMessagesError struct {
	DroppedMessages int
	Err             error
}

DroppedMessagesError is returned from a producer when messages weren't able to be successfully delivered to a broker.

func (DroppedMessagesError) Error

func (err DroppedMessagesError) Error() string

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	// contains filtered or unexported fields
}

func (*FetchRequest) AddBlock

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse

type FetchResponse struct {
	Blocks map[string]map[int32]*FetchResponseBlock
}

func (*FetchResponse) AddMessage

func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

type FetchResponseBlock

type FetchResponseBlock struct {
	Err                 KError
	HighWaterMarkOffset int64
	MsgSet              MessageSet
}

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func (*HashPartitioner) Partition

func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	NoError                         KError = 0
	Unknown                         KError = -1
	OffsetOutOfRange                KError = 1
	InvalidMessage                  KError = 2
	UnknownTopicOrPartition         KError = 3
	InvalidMessageSize              KError = 4
	LeaderNotAvailable              KError = 5
	NotLeaderForPartition           KError = 6
	RequestTimedOut                 KError = 7
	BrokerNotAvailable              KError = 8
	ReplicaNotAvailable             KError = 9
	MessageSizeTooLarge             KError = 10
	StaleControllerEpochCode        KError = 11
	OffsetMetadataTooLarge          KError = 12
	OffsetsLoadInProgress           KError = 14
	ConsumerCoordinatorNotAvailable KError = 15
	NotCoordinatorForConsumer       KError = 16
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type Message

type Message struct {
	Codec CompressionCodec // codec used to compress the message contents
	Key   []byte           // the message key, may be nil
	Value []byte           // the message contents
	Set   *MessageSet      // the message set a message might wrap
	// contains filtered or unexported fields
}

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	Messages               []*MessageBlock
}

type MessageToSend

type MessageToSend struct {
	Topic      string
	Key, Value Encoder
	// contains filtered or unexported fields
}

MessageToSend is the collection of elements passed to the Producer in order to send a message.

func (*MessageToSend) Offset

func (m *MessageToSend) Offset() int64

Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.

func (*MessageToSend) Partition

func (m *MessageToSend) Partition() int32

Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	Topics  []*TopicMetadata
}

func (*MetadataResponse) AddBroker

func (m *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopicPartition

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32)

type MockBroker

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that accepts a single connection. It reads Kafka requests from that connection and returns each response from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response the server sleeps for 250ms instead of reading a request).

When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

func NewMockBroker

func NewMockBroker(t TestState, brokerID int32) *MockBroker

NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestState and the broker exits.

func (*MockBroker) Addr

func (b *MockBroker) Addr() string

func (*MockBroker) BrokerID

func (b *MockBroker) BrokerID() int32

func (*MockBroker) Close

func (b *MockBroker) Close()

func (*MockBroker) Port

func (b *MockBroker) Port() int32

func (*MockBroker) Returns

func (b *MockBroker) Returns(e encoder)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]KError
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      KError
}

type OffsetMethod

type OffsetMethod int

OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.

const (
	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
	// offset at which to start, allowing the user to manually specify their desired starting offset.
	OffsetMethodManual OffsetMethod = iota
	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
	// determined by querying the broker.
	OffsetMethodNewest
	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
	// determined by querying the broker.
	OffsetMethodOldest
)

type OffsetRequest

type OffsetRequest struct {
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32)

type OffsetResponse

type OffsetResponse struct {
	Blocks map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err     KError
	Offsets []int64
}

type OffsetTime

type OffsetTime int64

OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64 value will be interpreted as milliseconds, or use the special constants defined here.

const (
	// LatestOffsets askes for the latest offsets.
	LatestOffsets OffsetTime = -1
	// EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
	// asking for the earliest offset will always return you a single element.
	EarliestOffset OffsetTime = -2
)

type PartitionMetadata

type PartitionMetadata struct {
	Err      KError
	ID       int32
	Leader   int32
	Replicas []int32
	Isr      []int32
}

type Partitioner

type Partitioner interface {
	Partition(key Encoder, numPartitions int32) int32
}

Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

func NewHashPartitioner

func NewHashPartitioner() Partitioner

func NewRandomPartitioner

func NewRandomPartitioner() Partitioner

func NewRoundRobinPartitioner

func NewRoundRobinPartitioner() Partitioner

type PartitionerConstructor

type PartitionerConstructor func() Partitioner

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

type ProduceError

type ProduceError struct {
	Msg *MessageToSend
	Err error
}

ProduceError is the type of error generated when the producer fails to deliver a message. It contains the original MessageToSend as well as the actual error value. If the AckSuccesses configuration value is set to true then every message sent generates a ProduceError, but successes will have a nil Err field.

type ProduceErrors

type ProduceErrors []*ProduceError

ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

func (ProduceErrors) Error

func (pe ProduceErrors) Error() string

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks RequiredAcks
	Timeout      int32
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddMessage

func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks map[string]map[int32]*ProduceResponseBlock
}

func (*ProduceResponse) AddTopicPartition

func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err    KError
	Offset int64
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer publishes Kafka messages. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

producer, err := NewProducer(client, nil)
if err != nil {
	panic(err)
}
defer producer.Close()

for {
	select {
	case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
		fmt.Println("> message queued")
	case err := <-producer.Errors():
		panic(err.Err)
	}
}
Output:

func NewProducer

func NewProducer(client *Client, config *ProducerConfig) (*Producer, error)

NewProducer creates a new Producer using the given client.

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Producer) Errors

func (p *Producer) Errors() <-chan *ProduceError

Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock. It is suggested that you send messages and read errors together in a single select statement.

func (*Producer) Input

func (p *Producer) Input() chan<- *MessageToSend

Input is the input channel for the user to write messages to that they wish to send.

func (*Producer) Successes

func (p *Producer) Successes() <-chan *MessageToSend

Successes is the success output channel back to the user when AckSuccesses is configured. If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.

type ProducerConfig

type ProducerConfig struct {
	Partitioner       PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash).
	RequiredAcks      RequiredAcks           // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
	Timeout           time.Duration          // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
	Compression       CompressionCodec       // The type of compression to use on messages (defaults to no compression).
	FlushMsgCount     int                    // The number of messages needed to trigger a flush.
	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued.
	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered.
	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000)
	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
}

ProducerConfig is used to pass multiple configuration options to NewProducer.

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

NewProducerConfig creates a new ProducerConfig instance with sensible defaults.

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() error

Validate checks a ProducerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type RandomPartitioner

type RandomPartitioner struct {
	// contains filtered or unexported fields
}

RandomPartitioner implements the Partitioner interface by choosing a random partition each time.

func (*RandomPartitioner) Partition

func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any positive int16 value is valid, or the constants defined here.

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type RoundRobinPartitioner

type RoundRobinPartitioner struct {
	// contains filtered or unexported fields
}

RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.

func (*RoundRobinPartitioner) Partition

func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32

type SimpleProducer

type SimpleProducer struct {
	// contains filtered or unexported fields
}

SimpleProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

producer, err := NewSimpleProducer(client, "my_topic", nil)
if err != nil {
	panic(err)
}
defer producer.Close()

for {
	err = producer.SendMessage(nil, StringEncoder("testing 123"))
	if err != nil {
		panic(err)
	} else {
		fmt.Println("> message sent")
	}
}
Output:

func NewSimpleProducer

func NewSimpleProducer(client *Client, topic string, partitioner PartitionerConstructor) (*SimpleProducer, error)

NewSimpleProducer creates a new SimpleProducer using the given client, topic and partitioner. If the partitioner is nil, messages are partitioned by the hash of the key (or randomly if there is no key).

func (*SimpleProducer) Close

func (sp *SimpleProducer) Close() error

Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*SimpleProducer) SendMessage

func (sp *SimpleProducer) SendMessage(key, value Encoder) error

SendMessage produces a message with the given key and value. To send strings as either key or value, see the StringEncoder type.

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that you can do things like

producer.SendMessage(nil, sarama.StringEncoder("hello world"))

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type TestState

type TestState interface {
	Error(args ...interface{})
	Fatal(args ...interface{})
	Fatalf(format string, args ...interface{})
}

TestState is a generic interface for a test state, implemented e.g. by testing.T

type TopicMetadata

type TopicMetadata struct {
	Err        KError
	Name       string
	Partitions []*PartitionMetadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL