sarama

package module
v0.0.0-...-0c9d279 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2015 License: MIT Imports: 22 Imported by: 0

README

sarama

Build Status GoDoc

Sarama is an MIT-licensed Go client library for Apache Kafka 0.8 (and later).

Documentation is available via godoc at http://godoc.org/github.com/Shopify/sarama

There is a google group for Kafka client users and authors at https://groups.google.com/forum/#!forum/kafka-clients

Sarama is compatible with Go 1.2, 1.3, and 1.4.

A word of warning: the API is not 100% stable. It won't change much (in particular the low-level Broker and Request/Response objects could probably be considered frozen) but there may be the occasional parameter added or function renamed. As far as semantic versioning is concerned, we haven't quite hit 1.0.0 yet. It is absolutely stable enough to use, just expect that you might have to tweak things when you update to a newer version.

Other related links:

Documentation

Overview

Package sarama provides client libraries for the Kafka 0.8 protocol. The Client, Producer and Consumer objects are the core of the high-level API. The Broker and Request/Response objects permit more precise control.

The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Index

Examples

Constants

View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received.

Variables

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrClosedClient = errors.New("kafka: Tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrIncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: Insufficient data to decode packet, more bytes expected")
View Source
var ErrInvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: Message is larger than MaxFetchSize")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured MaxFetchSize

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrShuttingDown = errors.New("kafka: Message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var MaxRequestSize uint32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var PanicHandler func(interface{})

PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Example
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	return err
}
defer broker.Close()

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
if err != nil {
	return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

return nil
Output:

func NewBroker

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targetting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) Close

func (b *Broker) Close() (err error)

func (*Broker) CommitOffset

func (b *Broker) CommitOffset(clientID string, request *OffsetCommitRequest) (*OffsetCommitResponse, error)

func (*Broker) Connected

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) Fetch

func (b *Broker) Fetch(clientID string, request *FetchRequest) (*FetchResponse, error)

func (*Broker) FetchOffset

func (b *Broker) FetchOffset(clientID string, request *OffsetFetchRequest) (*OffsetFetchResponse, error)

func (*Broker) GetAvailableOffsets

func (b *Broker) GetAvailableOffsets(clientID string, request *OffsetRequest) (*OffsetResponse, error)

func (*Broker) GetConsumerMetadata

func (b *Broker) GetConsumerMetadata(clientID string, request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

func (*Broker) GetMetadata

func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error)

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) Open

func (b *Broker) Open(conf *BrokerConfig) error

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewBrokerConfig() is used.

func (*Broker) Produce

func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResponse, error)

type BrokerConfig

type BrokerConfig struct {
	MaxOpenRequests int // How many outstanding requests the broker is allowed to have before blocking attempts to send (default 5).

	// All three of the below configurations are similar to the `socket.timeout.ms` setting in JVM kafka.
	DialTimeout  time.Duration // How long to wait for the initial connection to succeed before timing out and returning an error (default 30s).
	ReadTimeout  time.Duration // How long to wait for a response before timing out and returning an error (default 30s).
	WriteTimeout time.Duration // How long to wait for a transmit to succeed before timing out and returning an error (default 30s).
}

BrokerConfig is used to pass multiple configuration options to Broker.Open.

func NewBrokerConfig

func NewBrokerConfig() *BrokerConfig

NewBrokerConfig returns a new broker configuration with sane defaults.

func (*BrokerConfig) Validate

func (config *BrokerConfig) Validate() error

Validate checks a BrokerConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that you can do things like

producer.SendMessage(nil, sarama.ByteEncoder([]byte{0x00}))

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. A single client can be safely shared by multiple concurrent Producers and Consumers.

func NewClient

func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)

NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

func (*Client) Close

func (client *Client) Close() error

Close shuts down all broker connections managed by this client. It is required to call this function before a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers using a client before you close the client.

func (*Client) Closed

func (client *Client) Closed() bool

Closed returns true if the client has already had Close called on it

func (*Client) GetOffset

func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)

GetOffset queries the cluster to get the most recent available offset at the given time on the topic/partition combination.

func (*Client) Leader

func (client *Client) Leader(topic string, partitionID int32) (*Broker, error)

Leader returns the broker object that is the leader of the current topic/partition, as determined by querying the cluster metadata.

func (*Client) Partitions

func (client *Client) Partitions(topic string) ([]int32, error)

Partitions returns the sorted list of all partition IDs for the given topic.

func (*Client) RefreshAllMetadata

func (client *Client) RefreshAllMetadata() error

RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.

func (*Client) RefreshTopicMetadata

func (client *Client) RefreshTopicMetadata(topics ...string) error

RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the available metadata for those topics.

func (*Client) Replicas

func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)

Replicas returns the set of all replica IDs for the given partition.

func (*Client) ReplicasInSync

func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error)

ReplicasInSync returns the set of all in-sync replica IDs for the given partition. Note: kafka's metadata here is known to be stale in many cases, and should not generally be trusted. This method should be considered effectively deprecated.

func (*Client) Topics

func (client *Client) Topics() ([]string, error)

Topics returns the set of available topics as retrieved from the cluster metadata.

func (*Client) WritablePartitions

func (client *Client) WritablePartitions(topic string) ([]int32, error)

WritablePartitions returns the sorted list of all writable partition IDs for the given topic, where "writable" means "having a valid leader accepting writes".

type ClientConfig

type ClientConfig struct {
	MetadataRetries            int           // How many times to retry a metadata request when a partition is in the middle of leader election.
	WaitForElection            time.Duration // How long to wait for leader election to finish between retries.
	DefaultBrokerConf          *BrokerConfig // Default configuration for broker connections created by this client.
	BackgroundRefreshFrequency time.Duration // How frequently the client will refresh the cluster metadata in the background. Defaults to 10 minutes. Set to 0 to disable.
}

ClientConfig is used to pass multiple configuration options to NewClient.

func NewClientConfig

func NewClientConfig() *ClientConfig

NewClientConfig creates a new ClientConfig instance with sensible defaults

func (*ClientConfig) Validate

func (config *ClientConfig) Validate() error

Validate checks a ClientConfig instance. This will return a ConfigurationError if the specified values don't make sense.

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	CompressionNone   CompressionCodec = 0
	CompressionGZIP   CompressionCodec = 1
	CompressionSnappy CompressionCodec = 2
)

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type ConstantPartitioner

type ConstantPartitioner struct {
	Constant int32
}

ConstantPartitioner implements the Partitioner interface by just returning a constant value.

func (*ConstantPartitioner) Partition

func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)

func (*ConstantPartitioner) RequiresConsistency

func (p *ConstantPartitioner) RequiresConsistency() bool

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer manages PartitionConsumers which process Kafka messages from brokers.

func NewConsumer

func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error)

NewConsumer creates a new consumer attached to the given client.

func (*Consumer) ConsumePartition

func (c *Consumer) ConsumePartition(topic string, partition int32, config *PartitionConsumerConfig) (*PartitionConsumer, error)

ConsumePartition creates a PartitionConsumer on the given topic/partition with the given configuration. It will return an error if this Consumer is already consuming on the given topic/partition.

type ConsumerConfig

type ConsumerConfig struct {
	// The minimum amount of data to fetch in a request - the broker will wait until at least this many bytes are available.
	// The default is 1, as 0 causes the consumer to spin when no messages are available.
	MinFetchSize int32
	// The maximum amount of time the broker will wait for MinFetchSize bytes to become available before it
	// returns fewer than that anyways. The default is 250ms, since 0 causes the consumer to spin when no events are available.
	// 100-500ms is a reasonable range for most cases. Kafka only supports precision up to milliseconds; nanoseconds will be truncated.
	MaxWaitTime time.Duration
}

ConsumerConfig is used to pass multiple configuration options to NewConsumer.

func NewConsumerConfig

func NewConsumerConfig() *ConsumerConfig

NewConsumerConfig creates a ConsumerConfig instance with sane defaults.

func (*ConsumerConfig) Validate

func (config *ConsumerConfig) Validate() error

Validate checks a ConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type ConsumerError

type ConsumerError struct {
	Topic     string
	Partition int32
	Err       error
}

ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

func (ConsumerError) Error

func (ce ConsumerError) Error() string

type ConsumerErrors

type ConsumerErrors []*ConsumerError

ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

func (ConsumerErrors) Error

func (ce ConsumerErrors) Error() string

type ConsumerMessage

type ConsumerMessage struct {
	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	ConsumerGroup string
}

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Err             KError
	CoordinatorID   int32
	CoordinatorHost string
	CoordinatorPort int32
}

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type FetchRequest

type FetchRequest struct {
	MaxWaitTime int32
	MinBytes    int32
	// contains filtered or unexported fields
}

func (*FetchRequest) AddBlock

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32)

type FetchResponse

type FetchResponse struct {
	Blocks map[string]map[int32]*FetchResponseBlock
}

func (*FetchResponse) AddError

func (fr *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage

func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) GetBlock

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

type FetchResponseBlock

type FetchResponseBlock struct {
	Err                 KError
	HighWaterMarkOffset int64
	MsgSet              MessageSet
}

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes is used modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func (*HashPartitioner) Partition

func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)

func (*HashPartitioner) RequiresConsistency

func (p *HashPartitioner) RequiresConsistency() bool

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrNoError                         KError = 0
	ErrUnknown                         KError = -1
	ErrOffsetOutOfRange                KError = 1
	ErrInvalidMessage                  KError = 2
	ErrUnknownTopicOrPartition         KError = 3
	ErrInvalidMessageSize              KError = 4
	ErrLeaderNotAvailable              KError = 5
	ErrNotLeaderForPartition           KError = 6
	ErrRequestTimedOut                 KError = 7
	ErrBrokerNotAvailable              KError = 8
	ErrReplicaNotAvailable             KError = 9
	ErrMessageSizeTooLarge             KError = 10
	ErrStaleControllerEpochCode        KError = 11
	ErrOffsetMetadataTooLarge          KError = 12
	ErrOffsetsLoadInProgress           KError = 14
	ErrConsumerCoordinatorNotAvailable KError = 15
	ErrNotCoordinatorForConsumer       KError = 16
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type Message

type Message struct {
	Codec CompressionCodec // codec used to compress the message contents
	Key   []byte           // the message key, may be nil
	Value []byte           // the message contents
	Set   *MessageSet      // the message set a message might wrap
	// contains filtered or unexported fields
}

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	Messages               []*MessageBlock
}

type MetadataRequest

type MetadataRequest struct {
	Topics []string
}

type MetadataResponse

type MetadataResponse struct {
	Brokers []*Broker
	Topics  []*TopicMetadata
}

func (*MetadataResponse) AddBroker

func (m *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopicPartition

func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Errors map[string]map[int32]KError
}

type OffsetFetchRequest

type OffsetFetchRequest struct {
	ConsumerGroup string
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Blocks map[string]map[int32]*OffsetFetchResponseBlock
}

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset   int64
	Metadata string
	Err      KError
}

type OffsetMethod

type OffsetMethod int

OffsetMethod is passed in ConsumerConfig to tell the consumer how to determine the starting offset.

const (
	// OffsetMethodNewest causes the consumer to start at the most recent available offset, as
	// determined by querying the broker.
	OffsetMethodNewest OffsetMethod = iota
	// OffsetMethodOldest causes the consumer to start at the oldest available offset, as
	// determined by querying the broker.
	OffsetMethodOldest
	// OffsetMethodManual causes the consumer to interpret the OffsetValue in the ConsumerConfig as the
	// offset at which to start, allowing the user to manually specify their desired starting offset.
	OffsetMethodManual
)

type OffsetRequest

type OffsetRequest struct {
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time OffsetTime, maxOffsets int32)

type OffsetResponse

type OffsetResponse struct {
	Blocks map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err     KError
	Offsets []int64
}

type OffsetTime

type OffsetTime int64

OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64 value will be interpreted as milliseconds, or use the special constants defined here.

const (
	// LatestOffsets askes for the latest offsets.
	LatestOffsets OffsetTime = -1
	// EarliestOffset askes for the earliest available offset. Note that because offsets are pulled in descending order,
	// asking for the earliest offset will always return you a single element.
	EarliestOffset OffsetTime = -2
)

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type PartitionConsumer

type PartitionConsumer struct {
	// contains filtered or unexported fields
}

PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary). You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.

func (*PartitionConsumer) AsyncClose

func (child *PartitionConsumer) AsyncClose()

AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you should wait until the 'messages' and 'errors' channel are drained. It is required to call this function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.

func (*PartitionConsumer) Close

func (child *PartitionConsumer) Close() error

Close stops the PartitionConsumer from fetching messages. It is required to call this function, or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.

func (*PartitionConsumer) Errors

func (child *PartitionConsumer) Errors() <-chan *ConsumerError

Errors returns the read channel for any errors that occurred while consuming the partition. You have to read this channel to prevent the consumer from deadlock. Under no circumstances, the partition consumer will shut down by itself. It will just wait until it is able to continue consuming messages. If you want to shut down your consumer, you will have trigger it yourself by consuming this channel and calling Close or AsyncClose when appropriate.

func (*PartitionConsumer) Messages

func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage

Messages returns the read channel for the messages that are returned by the broker

type PartitionConsumerConfig

type PartitionConsumerConfig struct {
	// The default (maximum) amount of data to fetch from the broker in each request. The default is 32768 bytes.
	DefaultFetchSize int32
	// The maximum permittable message size - messages larger than this will return ErrMessageTooLarge. The default of 0 is
	// treated as no limit.
	MaxMessageSize int32
	// The method used to determine at which offset to begin consuming messages. The default is to start at the most recent message.
	OffsetMethod OffsetMethod
	// Interpreted differently according to the value of OffsetMethod.
	OffsetValue int64
	// The number of events to buffer in the Messages and Errors channel. Having this non-zero permits the
	// consumer to continue fetching messages in the background while client code consumes events,
	// greatly improving throughput. The default is 64.
	ChannelBufferSize int
}

PartitionConsumerConfig is used to pass multiple configuration options to AddPartition

func NewPartitionConsumerConfig

func NewPartitionConsumerConfig() *PartitionConsumerConfig

NewPartitionConsumerConfig creates a PartitionConsumerConfig with sane defaults.

func (*PartitionConsumerConfig) Validate

func (config *PartitionConsumerConfig) Validate() error

Validate checks a PartitionConsumerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type PartitionMetadata

type PartitionMetadata struct {
	Err      KError
	ID       int32
	Leader   int32
	Replicas []int32
	Isr      []int32
}

type Partitioner

type Partitioner interface {
	Partition(key Encoder, numPartitions int32) (int32, error) // Partition takes the key and partition count and chooses a partition

	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
	// be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner.
	RequiresConsistency() bool
}

Partitioner is anything that, given a Kafka message key and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

func NewHashPartitioner

func NewHashPartitioner() Partitioner

func NewRandomPartitioner

func NewRandomPartitioner() Partitioner

func NewRoundRobinPartitioner

func NewRoundRobinPartitioner() Partitioner

type PartitionerConstructor

type PartitionerConstructor func() Partitioner

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

type ProduceRequest

type ProduceRequest struct {
	RequiredAcks RequiredAcks
	Timeout      int32
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddMessage

func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks map[string]map[int32]*ProduceResponseBlock
}

func (*ProduceResponse) AddTopicPartition

func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err    KError
	Offset int64
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer publishes Kafka messages. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks: it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

producer, err := NewProducer(client, nil)
if err != nil {
	panic(err)
}
defer producer.Close()

for {
	select {
	case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
		fmt.Println("> message queued")
	case err := <-producer.Errors():
		panic(err.Err)
	}
}
Output:

func NewProducer

func NewProducer(client *Client, config *ProducerConfig) (*Producer, error)

NewProducer creates a new Producer using the given client.

func (*Producer) AsyncClose

func (p *Producer) AsyncClose()

AsyncClose triggers a shutdown of the producer, flushing any messages it may have buffered. The shutdown has completed when both the Errors and Successes channels have been closed. When calling AsyncClose, you *must* continue to read from those channels in order to drain the results of any messages in flight.

func (*Producer) Close

func (p *Producer) Close() error

Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*Producer) Errors

func (p *Producer) Errors() <-chan *ProducerError

Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock. It is suggested that you send messages and read errors together in a single select statement.

func (*Producer) Input

func (p *Producer) Input() chan<- *ProducerMessage

Input is the input channel for the user to write messages to that they wish to send.

func (*Producer) Successes

func (p *Producer) Successes() <-chan *ProducerMessage

Successes is the success output channel back to the user when AckSuccesses is configured. If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock. It is suggested that you send and read messages together in a single select statement.

type ProducerConfig

type ProducerConfig struct {
	Partitioner       PartitionerConstructor // Generates partitioners for choosing the partition to send messages to (defaults to hash). Similar to the `partitioner.class` setting for the JVM producer.
	RequiredAcks      RequiredAcks           // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal). Equivalent to the `request.required.acks` setting of the JVM producer.
	Timeout           time.Duration          // The maximum duration the broker will wait the receipt of the number of RequiredAcks (defaults to 10 seconds). This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated. Equivalent to the JVM producer's `request.timeout.ms` setting.
	Compression       CompressionCodec       // The type of compression to use on messages (defaults to no compression). Similar to `compression.codec` setting of the JVM producer.
	FlushMsgCount     int                    // The number of messages needed to trigger a flush. This is a best effort; the number of messages may be more or less. Use `MaxMessagesPerReq` to set a hard upper limit.
	FlushFrequency    time.Duration          // If this amount of time elapses without a flush, one will be queued. The frequency is a best effort, and the actual frequency can be more or less. Equivalent to `queue.buffering.max.ms` setting of JVM producer.
	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered. This is a best effort; the number of bytes may be more or less. Use the gloabl `sarama.MaxRequestSize` to set a hard upper limit.
	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 256).
	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 100ms). Similar to the retry.backoff.ms setting of the JVM producer.
	MaxRetries        int                    // The total number of times to retry sending a message (defaults to 3). Similar to the message.send.max.retries setting of the JVM producer.
}

ProducerConfig is used to pass multiple configuration options to NewProducer.

Some of these configuration settings match settings with the JVM producer, but some of these are implementation specific and have no equivalent in the JVM producer.

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

NewProducerConfig creates a new ProducerConfig instance with sensible defaults.

func (*ProducerConfig) Validate

func (config *ProducerConfig) Validate() error

Validate checks a ProducerConfig instance. It will return a ConfigurationError if the specified value doesn't make sense.

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

func (ProducerError) Error

func (pe ProducerError) Error() string

type ProducerErrors

type ProducerErrors []*ProducerError

ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

func (ProducerErrors) Error

func (pe ProducerErrors) Error() string

type ProducerMessage

type ProducerMessage struct {
	Topic    string      // The Kafka topic for this message.
	Key      Encoder     // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
	Value    Encoder     // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
	Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels.  Sarama completely ignores this field and is only to be used for pass-through data.
	// contains filtered or unexported fields
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

func (*ProducerMessage) Offset

func (m *ProducerMessage) Offset() int64

Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.

func (*ProducerMessage) Partition

func (m *ProducerMessage) Partition() int32

Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.

type RandomPartitioner

type RandomPartitioner struct {
	// contains filtered or unexported fields
}

RandomPartitioner implements the Partitioner interface by choosing a random partition each time.

func (*RandomPartitioner) Partition

func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)

func (*RandomPartitioner) RequiresConsistency

func (p *RandomPartitioner) RequiresConsistency() bool

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all replicas to commit before responding.
	WaitForAll RequiredAcks = -1
)

type RoundRobinPartitioner

type RoundRobinPartitioner struct {
	// contains filtered or unexported fields
}

RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.

func (*RoundRobinPartitioner) Partition

func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error)

func (*RoundRobinPartitioner) RequiresConsistency

func (p *RoundRobinPartitioner) RequiresConsistency() bool

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)

Logger is the instance of a StdLogger interface that Sarama writes connection management events to. By default it is set to discard all log messages via ioutil.Discard, but you can set it to redirect wherever you want.

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that you can do things like

producer.SendMessage(nil, sarama.StringEncoder("hello world"))

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type SyncProducer

type SyncProducer struct {
	// contains filtered or unexported fields
}

SyncProducer publishes Kafka messages. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).

Example
client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
if err != nil {
	panic(err)
} else {
	fmt.Println("> connected")
}
defer client.Close()

producer, err := NewSyncProducer(client, nil)
if err != nil {
	panic(err)
}
defer producer.Close()

for {
	partition, offset, err := producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
	if err != nil {
		panic(err)
	} else {
		fmt.Printf("> message sent to partition %d at offset %d\n", partition, offset)
	}
}
Output:

func NewSyncProducer

func NewSyncProducer(client *Client, config *ProducerConfig) (*SyncProducer, error)

NewSyncProducer creates a new SyncProducer using the given client and configuration.

func (*SyncProducer) Close

func (sp *SyncProducer) Close() error

Close shuts down the producer and flushes any messages it may have buffered. You must call this function before a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close on the underlying client.

func (*SyncProducer) SendMessage

func (sp *SyncProducer) SendMessage(topic string, key, value Encoder) (partition int32, offset int64, err error)

SendMessage produces a message to the given topic with the given key and value. To send strings as either key or value, see the StringEncoder type. It returns the partition and offset of the successfully-produced message, or the error (if any).

type TopicMetadata

type TopicMetadata struct {
	Err        KError
	Name       string
	Partitions []*PartitionMetadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL