producer

package module
v0.0.0-...-a374fc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 23, 2016 License: Apache-2.0 Imports: 14 Imported by: 24

README

siesta-producer

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ByteSerializer

func ByteSerializer(value interface{}) ([]byte, error)

func Critical

func Critical(tag interface{}, message interface{})

Critical writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Criticalf formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Debug writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Debugf formats a given message according to given params with a given tag to log with level Debug.

func Error

func Error(tag interface{}, message interface{})

Error writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Errorf formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Info writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Infof formats a given message according to given params with a given tag to log with level Info.

func StringSerializer

func StringSerializer(value interface{}) ([]byte, error)

func Trace

func Trace(tag interface{}, message interface{})

Trace writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Tracef formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Warn writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Warnf formats a given message according to given params with a given tag to log with level Warn.

Types

type ConnectionRequest

type ConnectionRequest struct {
	// contains filtered or unexported fields
}

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger is a default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

Critical formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

Debug formats a given message according to given params to log with level Debug.

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

Error formats a given message according to given params to log with level Error.

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

Info formats a given message according to given params to log with level Info.

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

Trace formats a given message according to given params to log with level Trace.

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

Warn formats a given message according to given params to log with level Warn.

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

func NewHashPartitioner

func NewHashPartitioner() *HashPartitioner

func (*HashPartitioner) Partition

func (hp *HashPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})
}

KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type KafkaProducer

type KafkaProducer struct {
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer

func (*KafkaProducer) Close

func (kp *KafkaProducer) Close()

func (*KafkaProducer) Send

func (kp *KafkaProducer) Send(record *ProducerRecord) <-chan *RecordMetadata

type LogLevel

type LogLevel string

LogLevel represents a logging level.

const (
	// TraceLevel is used for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	// DebugLevel is used for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	// InfoLevel is used for general information about a running application.
	InfoLevel LogLevel = "info"

	// WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	// ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	// CriticalLevel is used to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type ManualPartitioner

type ManualPartitioner struct{}

func NewManualPartitioner

func NewManualPartitioner() *ManualPartitioner

func (*ManualPartitioner) Partition

func (mp *ManualPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type NetworkClient

type NetworkClient struct {
	*NetworkClientConfig
	// contains filtered or unexported fields
}

func NewNetworkClient

func NewNetworkClient(config *NetworkClientConfig, connector siesta.Connector, selector *Selector) *NetworkClient

type NetworkClientConfig

type NetworkClientConfig struct {
	RequiredAcks int
	AckTimeoutMs int32
	Retries      int
	RetryBackoff time.Duration
	Topic        string
	Partition    int32
}

type NetworkRequest

type NetworkRequest struct {
	// contains filtered or unexported fields
}

TODO better struct name

type Partitioner

type Partitioner interface {
	Partition(record *ProducerRecord, partitions []int32) (int32, error)
}

type Producer

type Producer interface {
	// Send the given record asynchronously and return a channel which will eventually contain the response information.
	Send(*ProducerRecord) <-chan *RecordMetadata

	// Tries to close the producer cleanly.
	Close()
}

type ProducerConfig

type ProducerConfig struct {
	Partitioner     Partitioner
	CompressionType string
	BatchSize       int
	Linger          time.Duration
	Retries         int
	RetryBackoff    time.Duration

	ClientID        string
	MaxRequests     int
	SendRoutines    int
	ReceiveRoutines int
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	RequiredAcks    int
	AckTimeoutMs    int32
	BrokerList      []string
}

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

func ProducerConfigFromFile

func ProducerConfigFromFile(filename string) (*ProducerConfig, error)

type ProducerRecord

type ProducerRecord struct {
	Topic     string
	Partition int32
	Key       interface{}
	Value     interface{}
	// contains filtered or unexported fields
}

type RandomPartitioner

type RandomPartitioner struct{}

func NewRandomPartitioner

func NewRandomPartitioner() *RandomPartitioner

func (*RandomPartitioner) Partition

func (rp *RandomPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type RecordAccumulator

type RecordAccumulator struct {
	// contains filtered or unexported fields
}

func NewRecordAccumulator

func NewRecordAccumulator(config *RecordAccumulatorConfig, networkClient *NetworkClient) *RecordAccumulator

type RecordAccumulatorConfig

type RecordAccumulatorConfig struct {
	// contains filtered or unexported fields
}

type RecordMetadata

type RecordMetadata struct {
	Record    *ProducerRecord
	Offset    int64
	Topic     string
	Partition int32
	Error     error
}

type Selector

type Selector struct {
	// contains filtered or unexported fields
}

func NewSelector

func NewSelector(config *SelectorConfig) *Selector

func (*Selector) Close

func (s *Selector) Close()

func (*Selector) Send

func (s *Selector) Send(connection *siesta.BrokerConnection, request siesta.Request) <-chan *rawResponseAndError

func (*Selector) Start

func (s *Selector) Start()

type SelectorConfig

type SelectorConfig struct {
	ClientID        string
	MaxRequests     int
	SendRoutines    int
	ReceiveRoutines int
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	RequiredAcks    int
}

TODO proper config entry names that match upstream Kafka

func DefaultSelectorConfig

func DefaultSelectorConfig() *SelectorConfig

func NewSelectorConfig

func NewSelectorConfig(producerConfig *ProducerConfig) *SelectorConfig

type Serializer

type Serializer func(interface{}) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL