Documentation ¶
Index ¶
- Constants
- type Client
- func (c *Client) Add(topic string, partition int32) error
- func (c *Client) Assignment() (map[string][]int32, error)
- func (c *Client) Commit(topic string, partition int32, offset int64) error
- func (c *Client) CustomRequest(key string, data interface{}) ([]byte, error)
- func (c *Client) Lag(topic string, partition int32) (int64, error)
- func (c *Client) Offset(topic string, partition int32) (int64, error)
- func (c *Client) Remove(topic string, partition int32) error
- func (c *Client) SetOffset(topic string, partition int32, offset int64) error
- type Consumer
- type Response
Constants ¶
const ( // RequestKeyAdd is a request key for consumer's Add function RequestKeyAdd = "add" // RequestKeyRemove is a request key for consumer's Remove function RequestKeyRemove = "remove" // RequestKeyAssignments is a request key for consumer's Assignment function RequestKeyAssignments = "assignments" // RequestKeyOffset is a request key for consumer's Offset function RequestKeyOffset = "offset" // RequestKeyCommit is a request key for consumer's Commit function RequestKeyCommit = "commit" // RequestKeySetOffset is a request key for consumer's SetOffset function RequestKeySetOffset = "setoffset" // RequestKeyLag is a request key for consumer's Lag function RequestKeyLag = "lag" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a TCP client that can talk to a Gonzo consumer instance wrapped by TCP layer.
func NewClient ¶
NewClient creates a new TCP client to talk to a Gonzo consumer instance located via given address. Returns a tcp client and an error if TCP connection failed.
func (*Client) Add ¶
Add adds a topic/partition to consume for associated consumer and starts consuming it immediately. Returns an error if PartitionConsumer for this topic/partition already exists.
func (*Client) Assignment ¶
Assignment returns a map of topic/partitions being consumed at the moment by associated consumer. The keys are topic names and values are slices of partitions.
func (*Client) Commit ¶
Commit commits the given offset for a given topic/partition to Kafka. Returns an error if the commit was unsuccessful.
func (*Client) CustomRequest ¶
CustomRequest allows to add any custom actions on the consumer.
func (*Client) Lag ¶
Lag returns the difference between the latest available offset in the partition and the latest fetched offset by associated consumer. This allows you to see how much behind the consumer is. Returns lag value for a given topic/partition and an error if the PartitionConsumer for given topic/partition does not exist.
func (*Client) Offset ¶
Offset returns the current consuming offset for a given topic/partition. Please note that this value does not correspond to the latest committed offset but the latest fetched offset. This call will return an error if the PartitionConsumer for given topic/partition does not exist.
func (*Client) Remove ¶
Remove stops consuming a topic/partition by associated consumer once it is done with the current batch. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work. Returns an error if PartitionConsumer for this topic/partition does not exist.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a TCP layer that can wrap a Gonzo consumer to expose its functionality via TCP.
func NewConsumer ¶
NewConsumer wraps the given Gonzo consumer to listen for external commands on the given address.
func (*Consumer) AwaitTermination ¶
func (tc *Consumer) AwaitTermination()
AwaitTermination blocks until Stop() is called.
func (*Consumer) Join ¶
func (tc *Consumer) Join()
Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0.
func (*Consumer) RegisterCustomHandler ¶
RegisterCustomHandler allows to expose any additional functionality which is not exposed by default.
type Response ¶
type Response struct { // Success is a flag whether the request associated with this response succeeded. Success bool // Message is any description or error message associated with this response. Message string // Data is any additional payload associated with this response. Data interface{} }
Response defines the response message format to exchange data.
func NewResponse ¶
NewResponse creates a new Response.