consumer

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2020 License: MIT Imports: 14 Imported by: 3

README

message-queue-gonsumer

GoDoc Circle CI Go Report Card Coverage Status

Go implementation of https://github.com/Financial-Times/message-queue-consumer library

Usage

go get github.com/Financial-Times/message-queue-gonsumer

import logger   "github.com/Financial-Times/go-logger/v2"
import consumer "github.com/Financial-Times/message-queue-gonsumer"

The consumer API is used by calling:

consumer.NewConsumer(QueueConfig, func(m Message), *http.Client, *logger.UPPLogger).Start()

According the QueueConfig it will start consuming messages on one or more streams and call the passed in function for every message. Make sure the function you pass in is thread safe.

conf := QueueConfig{
  Addr: "<addr>",
  Group: "<group>",
  Topic: "<topic>",
  Queue: "<required in co-co>",
  Offset: "<set to `earliest` otherwise the default `latest` will be considered>",
  BackoffPeriod: "<Period in seconds to back off if error occured or queue is empty>",
  StreamCount: "<Number of goroutines used to consume/process messages. This should be less or equal than the number of kafka partitions. Defaults to 1.>",
  ConcurrentProcessing: <true|false Whether messages can be processed concurrently or not>,
  NoOfProcessors: <Number of processors per Stream used to process messages when ConcurrentProcessing is enabled. Defaults to 100.>
  AuthorizationKey: "<required from AWS to UCS>",
  AutoCommitEnable: "<true|false Whether messages are smaller/larger. Default value is false.>",
}
l := logger.NewUPPLogger("annotations-writer-ontotext", "WARN", logConf)
c := queueConsumer.NewConsumer(conf, func(m queueConsumer.Message) { /* process message in a thread safe manner */ }, &http.Client{}, l)
go c.Start()
c.Stop()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNoQueueAddresses = errors.New("no kafka-rest-proxy addresses configured")

Functions

This section is empty.

Types

type AgeingClient

type AgeingClient struct {
	HTTPClient *http.Client
	MaxAge     time.Duration
	Logger     *log.UPPLogger
}

AgeingClient defines an ageing http client for consuming messages

func NewAgeingClient

func NewAgeingClient(client *http.Client, maxAge time.Duration, logger *log.UPPLogger) (*AgeingClient, error)

NewAgeingClient returns a new instance of AgeingClient. It guarantees that all required properties are set

func (AgeingClient) StartAgeingProcess

func (c AgeingClient) StartAgeingProcess()

StartAgeingProcess periodically close idle connections according to the MaxAge of an AgeingClient

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer provides methods to consume messages from a kafka proxy

func (*Consumer) ConnectivityCheck

func (c *Consumer) ConnectivityCheck() (string, error)

ConnectivityCheck returns the connection status with the kafka proxy

func (*Consumer) Start

func (c *Consumer) Start()

Start is a method that triggers the consumption of messages from the queue Start is a blocking methode, it will return only when Stop() is called. If you don't want to block start it in a different goroutine.

func (*Consumer) Stop

func (c *Consumer) Stop()

Stop is a methode to stop the consumer

type Message

type Message struct {
	Headers map[string]string
	Body    string
}

Message defines the consumed messages

type MessageConsumer

type MessageConsumer interface {
	Start()
	Stop()
	ConnectivityCheck() (string, error)
}

MessageConsumer is a high level generic interface for consumers.

Start triggers the consumption of messages.

Stop method stops the consumption of messages.

ConnectivityCheck implements the logic to check the current connectivity to the queue. The method should return a message about the status of the connection and an error in case of connectivity failure.

func NewAgeingConsumer

func NewAgeingConsumer(config QueueConfig, handler func(m Message), client *AgeingClient) MessageConsumer

NewAgeingConsumer returns a new instance of a Consumer with an AgeingClient

func NewBatchedConsumer

func NewBatchedConsumer(config QueueConfig, handler func(m []Message), client *http.Client, logger *log.UPPLogger) MessageConsumer

NewBatchedConsumer returns a Consumer to manage batches of messages

func NewConsumer

func NewConsumer(config QueueConfig, handler func(m Message), client *http.Client, logger *log.UPPLogger) MessageConsumer

NewConsumer returns a new instance of a Consumer

type QueueConfig

type QueueConfig struct {
	Addrs                []string `json:"address"` //list of queue addresses.
	Group                string   `json:"group"`
	Topic                string   `json:"topic"`
	Queue                string   `json:"queue"` //The name of the queue.
	Offset               string   `json:"offset"`
	BackoffPeriod        int      `json:"backoffPeriod"`
	StreamCount          int      `json:"streamCount"`
	ConcurrentProcessing bool     `json:"concurrentProcessing"`
	AuthorizationKey     string   `json:"authorizationKey"`
	AutoCommitEnable     bool     `json:"autoCommitEnable"`
	NoOfProcessors       int      `json:"noOfProcessors"`
}

QueueConfig represents the configuration of the queue, consumer group and topic the consumer interested about.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL