tcp

package
v0.0.0-...-2a93a3b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// RequestKeyAdd is a request key for consumer's Add function
	RequestKeyAdd = "add"

	// RequestKeyRemove is a request key for consumer's Remove function
	RequestKeyRemove = "remove"

	// RequestKeyAssignments is a request key for consumer's Assignment function
	RequestKeyAssignments = "assignments"

	// RequestKeyOffset is a request key for consumer's Offset function
	RequestKeyOffset = "offset"

	// RequestKeyCommit is a request key for consumer's Commit function
	RequestKeyCommit = "commit"

	// RequestKeySetOffset is a request key for consumer's SetOffset function
	RequestKeySetOffset = "setoffset"

	// RequestKeyLag is a request key for consumer's Lag function
	RequestKeyLag = "lag"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a TCP client that can talk to a Gonzo consumer instance wrapped by TCP layer.

func NewClient

func NewClient(addr string) (*Client, error)

NewClient creates a new TCP client to talk to a Gonzo consumer instance located via given address. Returns a tcp client and an error if TCP connection failed.

func (*Client) Add

func (c *Client) Add(topic string, partition int32) error

Add adds a topic/partition to consume for associated consumer and starts consuming it immediately. Returns an error if PartitionConsumer for this topic/partition already exists.

func (*Client) Assignment

func (c *Client) Assignment() (map[string][]int32, error)

Assignment returns a map of topic/partitions being consumed at the moment by associated consumer. The keys are topic names and values are slices of partitions.

func (*Client) Commit

func (c *Client) Commit(topic string, partition int32, offset int64) error

Commit commits the given offset for a given topic/partition to Kafka. Returns an error if the commit was unsuccessful.

func (*Client) CustomRequest

func (c *Client) CustomRequest(key string, data interface{}) ([]byte, error)

CustomRequest allows to add any custom actions on the consumer.

func (*Client) Lag

func (c *Client) Lag(topic string, partition int32) (int64, error)

Lag returns the difference between the latest available offset in the partition and the latest fetched offset by associated consumer. This allows you to see how much behind the consumer is. Returns lag value for a given topic/partition and an error if the PartitionConsumer for given topic/partition does not exist.

func (*Client) Offset

func (c *Client) Offset(topic string, partition int32) (int64, error)

Offset returns the current consuming offset for a given topic/partition. Please note that this value does not correspond to the latest committed offset but the latest fetched offset. This call will return an error if the PartitionConsumer for given topic/partition does not exist.

func (*Client) Remove

func (c *Client) Remove(topic string, partition int32) error

Remove stops consuming a topic/partition by associated consumer once it is done with the current batch. This means the PartitionConsumer will stop accepting new batches but will have a chance to finish its current work. Returns an error if PartitionConsumer for this topic/partition does not exist.

func (*Client) SetOffset

func (c *Client) SetOffset(topic string, partition int32, offset int64) error

SetOffset overrides the current fetch offset value for given topic/partition. This does not commit offset but allows you to move back and forth throughout the partition. Returns an error if the PartitionConsumer for this topic/partition does not exist.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a TCP layer that can wrap a Gonzo consumer to expose its functionality via TCP.

func NewConsumer

func NewConsumer(addr string, consumer gonzo.Consumer) *Consumer

NewConsumer wraps the given Gonzo consumer to listen for external commands on the given address.

func (*Consumer) AwaitTermination

func (tc *Consumer) AwaitTermination()

AwaitTermination blocks until Stop() is called.

func (*Consumer) Join

func (tc *Consumer) Join()

Join blocks until consumer has at least one topic/partition to consume, e.g. until len(Assignment()) > 0.

func (*Consumer) RegisterCustomHandler

func (tc *Consumer) RegisterCustomHandler(key string, handler func([]byte) (*Response, error))

RegisterCustomHandler allows to expose any additional functionality which is not exposed by default.

func (*Consumer) Start

func (tc *Consumer) Start() error

Start starts listening the given TCP address. Returns an error if anything went wrong while listening TCP.

func (*Consumer) Stop

func (tc *Consumer) Stop()

Stop stops both this TCP wrapper and the underlying consumer.

type Response

type Response struct {
	// Success is a flag whether the request associated with this response succeeded.
	Success bool

	// Message is any description or error message associated with this response.
	Message string

	// Data is any additional payload associated with this response.
	Data interface{}
}

Response defines the response message format to exchange data.

func NewResponse

func NewResponse(success bool, message string, data interface{}) *Response

NewResponse creates a new Response.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL