Documentation ¶
Overview ¶
Package krater provides io.Writer and io.ReaderFrom implementations that produce messages to Kafka. Each Write() call will be written as a separate message.
AckingWriter ¶
AckingWriter's Write and ReadFrom methods write messages to Kafka, blocking until a response is received from the broker. To allow for this, the sarama producer used to create a new AckingWriter must have Producer.Return.Successes = true and Producer.Return.Errors = true in their Config.
Example of AckingWriter usage (error checking and imports omitted for brevity):
pc := sarama.NewConfig() // these must both be true or the writer will deadlock pc.Producer.Return.Successes = true pc.Producer.Return.Errors = true kp, err := sarama.NewAsyncProducer(opts.Brokers, pc) // writer for topic "example-topic", allowing at most 10 concurrent writes aw := NewAckingWriter("example-topic", kp, 10) aw.Write([]byte("ahoy thar")) // this will block until Kafka responds
UnsafeWriter ¶
UnsafeWriter's Write and ReadFrom methods write messages to Kafka without waiting for responses from the broker. Both methods will block only if the Producer's Input() channel is full. Errors are ignored. The following example will use Kafka as the output of the standard logger package.
pc := sarama.NewConfig() // these must both be false or the writer will deadlock pc.Producer.Return.Successes = false pc.Producer.Return.Errors = false kp, err := sarama.NewAsyncProducer(opts.Brokers, pc) uw := NewUnsafeWriter("example-unsafe", kp) log.New(uw, "[AHOY] ", log.LstdFlags) // create new logger that writes to uw log.Println("Well this is handy")
Index ¶
Constants ¶
This section is empty.
Variables ¶
var LogOutput io.Writer = ioutil.Discard
LogOutput is the writer used by krater's loggers
var PanicHandler func(interface{})
PanicHandler is called for recovering from panics spawned internally to the library (and thus not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
Functions ¶
func LogTo ¶
LogTo makes krater and sarama loggers output to the given writer by replacing sarama.Logger and LogOutput. It returns a function that sets LogOutput and sarama.Logger to whatever values they had before the call to LogTo.
As an example
defer LogTo(os.Stderr)()
would start logging to stderr immediately and defer restoring the loggers.
Types ¶
type AckingWriter ¶
type AckingWriter struct {
// contains filtered or unexported fields
}
AckingWriter is an io.Writer that writes messages to Kafka. Parallel calls to Write() will cause messages to be queued by the producer, and each Write() call will block until a response is received from the broker.
The AsyncProducer passed to NewAckingWriter must have Config.Return.Successes == true and Config.Return.Errors == true
Close() must be called when the writer is no longer needed.
func NewAckingWriter ¶
func NewAckingWriter(topic string, kp sarama.AsyncProducer, maxConcurrent int) *AckingWriter
NewAckingWriter returns an AckingWriter that uses kp to produce messages to Kafka topic 'topic', with a maximum of maxConcurrent concurrent writes.
kp MUST have been initialized with AckSuccesses = true or Write will block indefinitely.
func (*AckingWriter) Close ¶
func (aw *AckingWriter) Close() error
Close closes the writer. If the writer has already been closed, Close will return syscall.EINVAL. Thread-safe.
func (*AckingWriter) Closed ¶
func (aw *AckingWriter) Closed() bool
Closed returns true if the AckingWriter has been closed, false otherwise. Thread-safe.
func (*AckingWriter) ReadFrom ¶
func (aw *AckingWriter) ReadFrom(r io.Reader) (n int64, err error)
ReadFrom reads all available bytes from r and writes them to Kafka in a single message. The returned int64 will always be the total length of bytes read from r or 0 if reading from r returned an error. Trying to Write to a closed writer will return syscall.EINVAL
Note that AckingWriter doesn't support "streaming", so r is read in full before it's sent.
Implements io.ReaderFrom.
func (*AckingWriter) SetKeyer ¶ added in v2.1.1
func (aw *AckingWriter) SetKeyer(fn KeyerFn)
SetKeyer sets the keyer function used to specify keys for messages. Defaults to having nil keys for all messages. SetKeyer is NOT thread safe, and it must not be used if any writes are underway.
func (*AckingWriter) SetLogger ¶
func (aw *AckingWriter) SetLogger(l StdLogger)
SetLogger sets the logger used by this AckingWriter. Not thread-safe.
func (*AckingWriter) Write ¶
func (aw *AckingWriter) Write(p []byte) (n int, err error)
Write will queue p as a single message, blocking until a response is received. n will always be len(p) if the message was sent successfully, 0 otherwise. The message's key is determined by the keyer function set with SetKeyer, and defaults to nil. Trying to Write to a closed writer will return syscall.EINVAL.
Thread-safe.
Implements io.Writer.
type GroupReader ¶ added in v2.1.1
type GroupReader struct {
// contains filtered or unexported fields
}
func NewGroupReader ¶ added in v2.1.1
func NewGroupReader(group string, topics []string, zookeeper string, cgConf *kafkaconsumer.Config) (gr *GroupReader, err error)
func (*GroupReader) Close ¶ added in v2.1.1
func (gr *GroupReader) Close() (err error)
func (GroupReader) String ¶ added in v2.1.1
func (gr GroupReader) String() string
type KeyerFn ¶ added in v2.1.1
type KeyerFn func(*sarama.ProducerMessage) sarama.Encoder
type KeyerFn represents any function that can turn a message into a key for that particular message
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) Panic(v ...interface{}) Panicf(format string, v ...interface{}) }
StdLogger is the interface for log.Logger-compatible loggers
type UnsafeWriter ¶
type UnsafeWriter struct {
// contains filtered or unexported fields
}
UnsafeWriter is an io.Writer that writes messages to Kafka, ignoring any error responses sent by the brokers. Parallel calls to Write / ReadFrom are safe.
The AsyncProducer passed to NewUnsafeWriter must have Config.Return.Successes == false and Config.Return.Errors == false
Close() must be called when the writer is no longer needed.
func NewUnsafeWriter ¶
func NewUnsafeWriter(topic string, kp sarama.AsyncProducer) *UnsafeWriter
func (*UnsafeWriter) Close ¶
func (uw *UnsafeWriter) Close() (err error)
Close closes the writer. If the writer has already been closed, Close will return syscall.EINVAL. Thread-safe.
func (*UnsafeWriter) Closed ¶
func (uw *UnsafeWriter) Closed() bool
Closed returns true if the UnsafeWriter has been closed, false otherwise. Thread-safe.
func (*UnsafeWriter) ReadFrom ¶
func (uw *UnsafeWriter) ReadFrom(r io.Reader) (int64, error)
ReadFrom reads all available bytes from r and writes them to Kafka without checking for broker error responses. The returned error will be either nil or anything returned when reading from r. The returned int64 will always be the total length of bytes read from r or 0 if reading from r returned an error. Trying to ReadFrom using a closed Writer will return syscall.EINVAL.
Note that UnsafeWriter doesn't support "streaming", so r is read in full before it's sent.
Implements io.ReaderFrom.
func (*UnsafeWriter) SetKeyer ¶ added in v2.1.1
func (uw *UnsafeWriter) SetKeyer(fn KeyerFn)
SetKeyer sets the keyer function used to specify keys for messages. Defaults to having nil keys for all messages. SetKeyer is NOT thread safe, and it must not be used if any writes are underway.
func (*UnsafeWriter) SetLogger ¶
func (uw *UnsafeWriter) SetLogger(l StdLogger)
SetLogger sets the logger used by this UnsafeWriter. Not thread-safe.
func (*UnsafeWriter) Write ¶
func (uw *UnsafeWriter) Write(p []byte) (n int, err error)
Write writes byte slices to Kafka without checking for error responses. n will always be len(p) and err will be nil. Trying to Write to a closed writer will return syscall.EINVAL. Thread-safe.
Write might block if the Input() channel of the underlying sarama.AsyncProducer is full.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
to_kafka
to_kafka reads delimited data from stdin and writes it to Kafka.
|
to_kafka reads delimited data from stdin and writes it to Kafka. |
package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github
|
package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github |