kafka

package module
v0.0.0-...-4b1678c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2017 License: Apache-2.0 Imports: 12 Imported by: 6

README

Build Status

kafka - Publisher & Consumer for Kafka 0.7.x in Go

Kafka is a distributed publish-subscribe messaging system: (http://kafka.apache.org)

Go language: (http://golang.org/)

For Kafka 0.8.x take a look at https://github.com/Shopify/sarama

Changes

May 2015
  • fixed bug handling partial message at end of a fetch response when the payload is < 4 bytes
  • if the the kafka log segment being read is cleaned up, attempt resuming the consumer from the earliest available offset
April 2015
  • added support for Snappy compression
  • fixed handling of partial messages at the end of each fetch response
  • added ProduceFromChannel() method in the publisher, mirroring the ConsumeOnChannel() method in the consumer
  • changed the quit channel type to empty struct{}, adding ability to stop the consumer on demand without race conditions
  • reused connection in BatchPublish(), instead of establishing a brand new connection every time.
  • applied gofmt / golint on the code (renamed Id() to ID() for compliance)
  • added comments
  • better distinction between DEBUG and ERROR logs, with info on how to get the consumer unstuck when the max fetch size is too small
April 2013
  • Merged back from the apache repository & outstanding patches from jira applied

Get up and running

Install go (version 1):
For more info see: http://weekly.golang.org/doc/install.html#install

Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Also set your GOPATH appropriately: http://weekly.golang.org/doc/code.html#tmp_13

Build from source:

make kafka
Make the tools (publisher & consumer)
make tools
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html

Tools

Start a consumer:


   $GOPATH/bin/consumer -topic test -consumeforever
  Consuming Messages :
  From: localhost:9092, topic: test, partition: 0
   ---------------------- 

Now the consumer will just poll until a message is received.

Publish a message:


  $GOPATH/bin/publisher -topic test -message "Hello World"

The consumer should output message.

API Usage

Publishing


broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewMessage([]byte("testing 1 2 3")))

Publishing Compressed Messages


broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewCompressedMessage([]byte("testing 1 2 3")))

Consumer

broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
broker.Consume(func(msg *kafka.Message) { msg.Print() })

Or the consumer can use a channel based approach:


broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
go broker.ConsumeOnChannel(msgChan, 10, quitChan)

Consuming Offsets

broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
offsets, err := broker.GetOffsets(-1, 1)
Contact

jeffreydamick (at) gmail (dot) com

http://twitter.com/jeffreydamick

Big thank you to NeuStar for sponsoring this work.

Documentation

Index

Constants

View Source
const (
	// MAGIC_DEFAULT is the default value for the Kafka wire format.
	// Compression Support uses '1' - https://cwiki.apache.org/confluence/display/KAFKA/Compression
	MAGIC_DEFAULT = 1

	// NO_LEN_HEADER_SIZE is the length of the header after the 4 bytes representing the length
	// magic + compression + chksum
	NO_LEN_HEADER_SIZE = 1 + 1 + 4
)
View Source
const (
	NO_COMPRESSION_ID     = 0
	GZIP_COMPRESSION_ID   = 1
	SNAPPY_COMPRESSION_ID = 2
)

compression flags

View Source
const (
	REQUEST_PRODUCE      RequestType = 0
	REQUEST_FETCH                    = 1
	REQUEST_MULTIFETCH               = 2
	REQUEST_MULTIPRODUCE             = 3
	REQUEST_OFFSETS                  = 4
	OFFSET_LATEST        int64       = -1
	OFFSET_EARLIEST      int64       = -2
)

Request Types

View Source
const (
	// NETWORK constant for supported network protocol
	NETWORK = "tcp"
)

Variables

View Source
var (
	ErrMalformedPacket       = fmt.Errorf("kafka message malformed, expecting at least 4 bytes")
	ErrIncompletePacket      = fmt.Errorf("kafka message incomplete, expecting larger packet")
	ErrIncompleteInnerPacket = fmt.Errorf("incomplete kafka message within a compressed message")
	ErrInvalidMagic          = fmt.Errorf("incorrect magic value")
	ErrChecksumMismatch      = fmt.Errorf("checksum mismatch on kafka message")
)

Error constants

DefaultCodecs is a list of codecs supported and packaged with the library itself

View Source
var DefaultCodecsMap = codecsMap(DefaultCodecs)

DefaultCodecsMap is a map[id]Codec representation of the supported codecs

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker contains the generic kafka parameters for a broker

func (*Broker) EncodeConsumeRequest

func (b *Broker) EncodeConsumeRequest(offset uint64, maxSize uint32) []byte

EncodeConsumeRequest encodes a fetch request into kafka's wire format <Request Header><OFFSET: uint64><MAX SIZE: uint32>

func (*Broker) EncodeOffsetRequest

func (b *Broker) EncodeOffsetRequest(time int64, maxNumOffsets uint32) []byte

EncodeOffsetRequest encodes an offset request into kafka's wire format <Request Header><TIME: uint64><MAX NUMBER of OFFSETS: uint32>

func (*Broker) EncodePublishRequest

func (b *Broker) EncodePublishRequest(messages ...*Message) []byte

EncodePublishRequest encodes a publish request into kafka's wire format <Request Header><MESSAGE SET SIZE: uint32><MESSAGE SETS>

func (*Broker) EncodeRequestHeader

func (b *Broker) EncodeRequestHeader(requestType RequestType) *bytes.Buffer

EncodeRequestHeader marshals a request into kafka's wire format Request Header: <REQUEST_SIZE: uint32><REQUEST_TYPE: uint16><TOPIC SIZE: uint16><TOPIC: bytes><PARTITION: uint32>

type BrokerConsumer

type BrokerConsumer struct {
	// contains filtered or unexported fields
}

BrokerConsumer holds a Kafka broker instance and the consumer settings

func NewBrokerConsumer

func NewBrokerConsumer(hostname string, topic string, partition int, offset uint64, maxSize uint32) *BrokerConsumer

NewBrokerConsumer creates a new broker consumer * hostname - host and optionally port, delimited by ':' * topic to consume * partition to consume from * offset to start consuming from * maxSize (in bytes) of the message to consume (this should be at least as big as the biggest message to be published)

func NewBrokerOffsetConsumer

func NewBrokerOffsetConsumer(hostname string, topic string, partition int) *BrokerConsumer

NewBrokerOffsetConsumer creates a simplified consumer that defaults the offset and maxSize to 0. * hostname - host and optionally port, delimited by ':' * topic to consume * partition to consume from

func (*BrokerConsumer) AddCodecs

func (consumer *BrokerConsumer) AddCodecs(payloadCodecs []PayloadCodec)

AddCodecs is a utility method to add Custom Payload Codecs for Consumer Decoding payloadCodecs - an array of PayloadCodec implementations

func (*BrokerConsumer) Consume

func (consumer *BrokerConsumer) Consume(handlerFunc MessageHandlerFunc, stop <-chan struct{}) (int, error)

Consume makes a single fetch request and sends the messages in the message set to a handler function

func (*BrokerConsumer) ConsumeOnChannel

func (consumer *BrokerConsumer) ConsumeOnChannel(msgChan chan *Message, pollTimeoutMs int64, quit chan struct{}) (int, error)

ConsumeOnChannel fetches messages from kafka and enqueues them in a channel

func (*BrokerConsumer) GetOffset

func (consumer *BrokerConsumer) GetOffset() uint64

GetOffset returns the current offset for a broker.

func (*BrokerConsumer) GetOffsets

func (consumer *BrokerConsumer) GetOffsets(time int64, maxNumOffsets uint32) ([]uint64, error)

GetOffsets returns a list of valid offsets (up to maxNumOffsets) before the given time, where time is in milliseconds (-1, from the latest offset available, -2 from the smallest offset available) The result is a list of offsets, in descending order.

type BrokerPublisher

type BrokerPublisher struct {
	// contains filtered or unexported fields
}

BrokerPublisher holds a Kafka broker instance and the publisher settings

func NewBrokerPublisher

func NewBrokerPublisher(hostname string, topic string, partition int) *BrokerPublisher

NewBrokerPublisher returns a new broker instance for a publisher

func (*BrokerPublisher) BatchPublish

func (b *BrokerPublisher) BatchPublish(messages ...*Message) (int, error)

BatchPublish writes a batch of messages to the kafka broker

func (*BrokerPublisher) ProduceFromChannel

func (b *BrokerPublisher) ProduceFromChannel(msgChan chan *Message, quit chan struct{}) (int, error)

ProduceFromChannel reads the messages from a Kafka log and sends them to a Message channel

func (*BrokerPublisher) Publish

func (b *BrokerPublisher) Publish(message *Message) (int, error)

Publish writes a message to the kafka broker

type GzipPayloadCodec

type GzipPayloadCodec struct {
}

GzipPayloadCodec - Gzip Codec

func (*GzipPayloadCodec) Decode

func (codec *GzipPayloadCodec) Decode(data []byte) []byte

Decode decodes the message with GZip compression

func (*GzipPayloadCodec) Encode

func (codec *GzipPayloadCodec) Encode(data []byte) []byte

Encode encodes the message with GZip compression

func (*GzipPayloadCodec) ID

func (codec *GzipPayloadCodec) ID() byte

ID returns the 1-byte id of the codec

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message wraps the message headers and the payload

func Decode

func Decode(packet []byte, payloadCodecsMap map[byte]PayloadCodec) (uint32, []Message, error)

Decode scans a packet for messages and decodes them

func DecodeWithDefaultCodecs

func DecodeWithDefaultCodecs(packet []byte) (uint32, []Message, error)

DecodeWithDefaultCodecs decodes the message(s) with the default codecs list

func NewCompressedMessage

func NewCompressedMessage(payload []byte) *Message

NewCompressedMessage creates a Message using the default compression method (gzip)

func NewCompressedMessages

func NewCompressedMessages(messages ...*Message) *Message

NewCompressedMessages encodes a batch of Messages using the default compression method (gzip)

func NewMessage

func NewMessage(payload []byte) *Message

NewMessage creates a message with no compression

func NewMessageWithCodec

func NewMessageWithCodec(payload []byte, codec PayloadCodec) *Message

NewMessageWithCodec creates a new Message instance, with the payload encoded with the given codec

func (*Message) Encode

func (m *Message) Encode() []byte

Encode marshals the Message object into kafka's wire format MESSAGE SET: <MESSAGE LENGTH: uint32><MAGIC: 1 byte><COMPRESSION: 1 byte><CHECKSUM: uint32><MESSAGE PAYLOAD: bytes>

func (*Message) Offset

func (m *Message) Offset() uint64

Offset returns the position of the current message in the log

func (*Message) Payload

func (m *Message) Payload() []byte

Payload returns the actual payload of the message, as byte array

func (*Message) PayloadString

func (m *Message) PayloadString() string

PayloadString returns the actual payload of the message, as string

func (*Message) Print

func (m *Message) Print()

Print is a debug method to print the Message object

type MessageHandlerFunc

type MessageHandlerFunc func(msg *Message)

MessageHandlerFunc defines the interface for message handlers accepted by Consume()

type NoCompressionPayloadCodec

type NoCompressionPayloadCodec struct {
}

NoCompressionPayloadCodec - No compression codec, noop

func (*NoCompressionPayloadCodec) Decode

func (codec *NoCompressionPayloadCodec) Decode(data []byte) []byte

Decode decodes the message without any compression (no-op)

func (*NoCompressionPayloadCodec) Encode

func (codec *NoCompressionPayloadCodec) Encode(data []byte) []byte

Encode encodes the message without any compression (no-op)

func (*NoCompressionPayloadCodec) ID

func (codec *NoCompressionPayloadCodec) ID() byte

ID returns the 1-byte id of the codec

type PayloadCodec

type PayloadCodec interface {

	// ID returns the 1-byte id of the codec
	ID() byte

	// Encode is the encoder interface for compression implementation
	Encode(data []byte) []byte

	// Decode is the decoder interface for decompression implementation
	Decode(data []byte) []byte
}

PayloadCodec defines an interface for the Codecs supported by the Kafka library

type RequestType

type RequestType uint16

RequestType is a typedef for the request type flag

type SnappyPayloadCodec

type SnappyPayloadCodec struct {
}

SnappyPayloadCodec - Snappy Codec

func (*SnappyPayloadCodec) Decode

func (codec *SnappyPayloadCodec) Decode(data []byte) []byte

Decode decodes the message with Snappy compression

func (*SnappyPayloadCodec) Encode

func (codec *SnappyPayloadCodec) Encode(data []byte) []byte

Encode encodes the message with Snappy compression

func (*SnappyPayloadCodec) ID

func (codec *SnappyPayloadCodec) ID() byte

ID returns the 1-byte id of the codec

type Timing

type Timing struct {
	// contains filtered or unexported fields
}

Timing wraps a timer

func StartTiming

func StartTiming(label string) *Timing

StartTiming returns a new timer

func (*Timing) Print

func (t *Timing) Print()

Print logs timing information associated with the timer

func (*Timing) Stop

func (t *Timing) Stop()

Stop stops the timer

Directories

Path Synopsis
tools

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL