producer

package
Version: v0.0.0-...-95d107b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2016 License: Apache-2.0, Apache-2.0 Imports: 15 Imported by: 0

README

siesta-producer

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ByteSerializer

func ByteSerializer(value interface{}) ([]byte, error)

func Critical

func Critical(tag interface{}, message interface{})

Critical writes a given message with a given tag to log with level Critical.

func Criticalf

func Criticalf(tag interface{}, message interface{}, params ...interface{})

Criticalf formats a given message according to given params with a given tag to log with level Critical.

func Debug

func Debug(tag interface{}, message interface{})

Debug writes a given message with a given tag to log with level Debug.

func Debugf

func Debugf(tag interface{}, message interface{}, params ...interface{})

Debugf formats a given message according to given params with a given tag to log with level Debug.

func Error

func Error(tag interface{}, message interface{})

Error writes a given message with a given tag to log with level Error.

func Errorf

func Errorf(tag interface{}, message interface{}, params ...interface{})

Errorf formats a given message according to given params with a given tag to log with level Error.

func Info

func Info(tag interface{}, message interface{})

Info writes a given message with a given tag to log with level Info.

func Infof

func Infof(tag interface{}, message interface{}, params ...interface{})

Infof formats a given message according to given params with a given tag to log with level Info.

func StringSerializer

func StringSerializer(value interface{}) ([]byte, error)

func Trace

func Trace(tag interface{}, message interface{})

Trace writes a given message with a given tag to log with level Trace.

func Tracef

func Tracef(tag interface{}, message interface{}, params ...interface{})

Tracef formats a given message according to given params with a given tag to log with level Trace.

func Warn

func Warn(tag interface{}, message interface{})

Warn writes a given message with a given tag to log with level Warn.

func Warnf

func Warnf(tag interface{}, message interface{}, params ...interface{})

Warnf formats a given message according to given params with a given tag to log with level Warn.

Types

type ConnectionRequest

type ConnectionRequest struct {
	// contains filtered or unexported fields
}

type DefaultLogger

type DefaultLogger struct {
	// contains filtered or unexported fields
}

DefaultLogger is a default implementation of KafkaLogger interface used in this client.

func NewDefaultLogger

func NewDefaultLogger(Level LogLevel) *DefaultLogger

NewDefaultLogger creates a new DefaultLogger that is configured to write messages to console with minimum log level Level.

func (*DefaultLogger) Critical

func (dl *DefaultLogger) Critical(message string, params ...interface{})

Critical formats a given message according to given params to log with level Critical.

func (*DefaultLogger) Debug

func (dl *DefaultLogger) Debug(message string, params ...interface{})

Debug formats a given message according to given params to log with level Debug.

func (*DefaultLogger) Error

func (dl *DefaultLogger) Error(message string, params ...interface{})

Error formats a given message according to given params to log with level Error.

func (*DefaultLogger) Info

func (dl *DefaultLogger) Info(message string, params ...interface{})

Info formats a given message according to given params to log with level Info.

func (*DefaultLogger) Trace

func (dl *DefaultLogger) Trace(message string, params ...interface{})

Trace formats a given message according to given params to log with level Trace.

func (*DefaultLogger) Warn

func (dl *DefaultLogger) Warn(message string, params ...interface{})

Warn formats a given message according to given params to log with level Warn.

type HashPartitioner

type HashPartitioner struct {
	// contains filtered or unexported fields
}

func NewHashPartitioner

func NewHashPartitioner() *HashPartitioner

func (*HashPartitioner) Partition

func (hp *HashPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type KafkaLogger

type KafkaLogger interface {
	//Formats a given message according to given params to log with level Trace.
	Trace(message string, params ...interface{})

	//Formats a given message according to given params to log with level Debug.
	Debug(message string, params ...interface{})

	//Formats a given message according to given params to log with level Info.
	Info(message string, params ...interface{})

	//Formats a given message according to given params to log with level Warn.
	Warn(message string, params ...interface{})

	//Formats a given message according to given params to log with level Error.
	Error(message string, params ...interface{})

	//Formats a given message according to given params to log with level Critical.
	Critical(message string, params ...interface{})
}

KafkaLogger is a logger interface. Lets you plug-in your custom logging library instead of using built-in one.

Logger used by this client. Defaults to build-in logger with Info log level.

type KafkaProducer

type KafkaProducer struct {
	RecordsMetadata chan *RecordMetadata
	// contains filtered or unexported fields
}

func NewKafkaProducer

func NewKafkaProducer(config *ProducerConfig, keySerializer Serializer, valueSerializer Serializer, connector siesta.Connector) *KafkaProducer

func (*KafkaProducer) Close

func (kp *KafkaProducer) Close(timeout time.Duration)

TODO return channel and remove timeout

func (*KafkaProducer) Flush

func (kp *KafkaProducer) Flush()

func (*KafkaProducer) Metrics

func (kp *KafkaProducer) Metrics() map[string]Metric

func (*KafkaProducer) PartitionsFor

func (kp *KafkaProducer) PartitionsFor(topic string) []PartitionInfo

func (*KafkaProducer) Send

func (kp *KafkaProducer) Send(record *ProducerRecord) <-chan *RecordMetadata

type LogLevel

type LogLevel string

LogLevel represents a logging level.

const (
	// TraceLevel is used for debugging to find problems in functions, variables etc.
	TraceLevel LogLevel = "trace"

	// DebugLevel is used for detailed system reports and diagnostic messages.
	DebugLevel LogLevel = "debug"

	// InfoLevel is used for general information about a running application.
	InfoLevel LogLevel = "info"

	// WarnLevel is used to indicate small errors and failures that should not happen normally but are recovered automatically.
	WarnLevel LogLevel = "warn"

	// ErrorLevel is used to indicate severe errors that affect application workflow and are not handled automatically.
	ErrorLevel LogLevel = "error"

	// CriticalLevel is used to indicate fatal errors that may cause data corruption or loss.
	CriticalLevel LogLevel = "critical"
)

type ManualPartitioner

type ManualPartitioner struct{}

func NewManualPartitioner

func NewManualPartitioner() *ManualPartitioner

func (*ManualPartitioner) Partition

func (mp *ManualPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

func NewMetadata

func NewMetadata(connector siesta.Connector, metadataExpire time.Duration) *Metadata

func (*Metadata) Get

func (tmc *Metadata) Get(topic string) ([]int32, error)

func (*Metadata) Refresh

func (tmc *Metadata) Refresh(topics []string) error

type Metric

type Metric struct{}

type NetworkClient

type NetworkClient struct {
	// contains filtered or unexported fields
}

func NewNetworkClient

func NewNetworkClient(config NetworkClientConfig, connector siesta.Connector, producerConfig *ProducerConfig) *NetworkClient

type NetworkClientConfig

type NetworkClientConfig struct {
}

type NetworkRequest

type NetworkRequest struct {
	// contains filtered or unexported fields
}

TODO better struct name

type PartitionInfo

type PartitionInfo struct{}

type Partitioner

type Partitioner interface {
	Partition(record *ProducerRecord, partitions []int32) (int32, error)
}

type Producer

type Producer interface {
	// Send the given record asynchronously and return a channel which will eventually contain the response information.
	Send(*ProducerRecord) <-chan *RecordMetadata

	// Flush any accumulated records from the producer. Blocks until all sends are complete.
	Flush()

	// Get a list of partitions for the given topic for custom partition assignment. The partition metadata will change
	// over time so this list should not be cached.
	PartitionsFor(topic string) []PartitionInfo

	// Return a map of metrics maintained by the producer
	Metrics() map[string]Metric

	// Tries to close the producer cleanly within the specified timeout. If the close does not complete within the
	// timeout, fail any pending send requests and force close the producer.
	Close(timeout time.Duration)
}

type ProducerConfig

type ProducerConfig struct {
	Partitioner       Partitioner
	MetadataExpire    time.Duration
	CompressionType   string
	BatchSize         int
	Linger            time.Duration
	Retries           int
	RetryBackoff      time.Duration
	BlockOnBufferFull bool

	ClientID        string
	MaxRequests     int
	SendRoutines    int
	ReceiveRoutines int
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	RequiredAcks    int
	AckTimeoutMs    int32
	BrokerList      []string
}

func NewProducerConfig

func NewProducerConfig() *ProducerConfig

func ProducerConfigFromFile

func ProducerConfigFromFile(filename string) (*ProducerConfig, error)

type ProducerRecord

type ProducerRecord struct {
	Topic     string
	Partition int32
	Key       interface{}
	Value     interface{}
	// contains filtered or unexported fields
}

type RandomPartitioner

type RandomPartitioner struct{}

func NewRandomPartitioner

func NewRandomPartitioner() *RandomPartitioner

func (*RandomPartitioner) Partition

func (rp *RandomPartitioner) Partition(record *ProducerRecord, partitions []int32) (int32, error)

type RecordAccumulator

type RecordAccumulator struct {
	// contains filtered or unexported fields
}

func NewRecordAccumulator

func NewRecordAccumulator(config *RecordAccumulatorConfig, metadataChan chan *RecordMetadata) *RecordAccumulator

type RecordAccumulatorConfig

type RecordAccumulatorConfig struct {
	// contains filtered or unexported fields
}

type RecordBatch

type RecordBatch struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

type RecordMetadata

type RecordMetadata struct {
	Record    *ProducerRecord
	Offset    int64
	Topic     string
	Partition int32
	Error     error
}

type Selector

type Selector struct {
	// contains filtered or unexported fields
}

func NewSelector

func NewSelector(config *SelectorConfig) *Selector

func (*Selector) Close

func (s *Selector) Close()

func (*Selector) Send

func (s *Selector) Send(link siesta.BrokerLink, request siesta.Request) <-chan *rawResponseAndError

func (*Selector) Start

func (s *Selector) Start()

type SelectorConfig

type SelectorConfig struct {
	ClientID        string
	MaxRequests     int
	SendRoutines    int
	ReceiveRoutines int
	ReadTimeout     time.Duration
	WriteTimeout    time.Duration
	RequiredAcks    int
}

TODO proper config entry names that match upstream Kafka

func DefaultSelectorConfig

func DefaultSelectorConfig() *SelectorConfig

func NewSelectorConfig

func NewSelectorConfig(producerConfig *ProducerConfig) *SelectorConfig

type Serializer

type Serializer func(interface{}) ([]byte, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL