kafkaclient

package module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 4, 2021 License: MIT Imports: 18 Imported by: 0

README

kafkaclient

An overly-opinionated library to simplify interactions with existing go/kafka libraries.

Supported base libraries

  • shopify/sarama
  • segmentio/kafka-go

Use

const (
    TestTopic       = "test_topic"
    TestTopicRetry1 = "test_topic_retry"
    TestTopicDLQ    = "test_topic_dlq"
)

func main() {
    ctx := context.Background()

    topics := []kafkaclient.TopicConfig{
        {
            Name:                  TestTopic,
            DoConsume:             true,
            MessageFormat:         kafkaclient.MessageFormatAvro,
            DelayProcessingMins:   0,
            FailedProcessingTopic: TestTopicDLQ,
            MessageProcessor:      processTestTopic,
        },
        {
            Name:                  TestTopicRetry1,
            DoConsume:             true,
            DoProduce:             true,
            MessageFormat:         kafkaclient.MessageFormatAvro,
            DelayProcessingMins:   15,
            FailedProcessingTopic: TestTopicDLQ,
            MessageProcessor:      processTestTopic,
        },
        {
            Name:                  TestTopicDLQ,
            DoProduce:             true,
            MessageFormat:         kafkaclient.MessageFormatAvro,
        },
    }

    // you can (optionally) configure a struct to inject important
    // data / data structures into your message processors, e.g. in 
    // order to access service-layer functionality
    pd := processorDependencies{
        service: mydomain.NewService()}

    config, e := kafkaclient.NewConfig(ctx, 
        "2.5.0", 
        []string{"127.0.0.1"}, 
        topics, 
        pd,
        "", 
        kafkaclient.ConsumerTypeGroup,
        "test_consumer", 
        kafkaclient.ProducerTypeSync, 
        true, 
        nil, 
        true)

    if e != nil {
        log.Println(e)
        return
    }

    kc, e := kafkaclient.New(kafkaclient.BaseSarama, config)
    if e != nil {
        log.Println(e)
        return
    }

    kc.StartConsume(ctx)

    e := kc.kafkaClient.ProduceMessage(ctx, TestTopic, 
	    "message_key_238to2efgb", testTopicAvroMessage{ID: 1, Name: "foofoo"})

    if e != nil {
        log.Println(e)
    }
}

type processorDependencies struct {
    service mydomain.Service
}

func processTestTopic(ctx context.Context, 
    dependencies kafkaclient.ProcessorDependencies, 
    msg kafkaclient.ConsumerMessage) (e error) {

    d, ok := dependencies.(processorDependencies)
    if !ok {
        e = errors.New("kafka processor incorrectly configured")
        log.Println(e)
        return
    } 

    data := testTopicAvroMessage{}
    e = msg.Unmarshall(ctx, &data)
    if e != nil {
        log.Println(e)
		return
    }
    
    e = dependencies.service.Save(ctx, data.ID, data.Name)
    if e != nil {
        log.Println(e)
    }

    return
}

type testTopicAvroMessage struct {
	ID    int64   `avro:"id"`
	Name  string  `avro:"name"`
}
Notes
  • topic consumption starts in its own goroutine, so calling StartConsume is non-blocking
  • kafkaclient will automatically add messages that have processing errors to a retry/dead-letter topic if one is configured for that topic, i.e TopicConfig.FailedProcessingTopic. If you do not want this to happen, simply avoid setting this attribute in the topic config
  • if there are problems with intializing the client (e.g. not being able to reach kafka), kafkaclient will continue to retry the init at intervals in a separate goroutine, so that the parent application can continue executing, e.g. in order to start a web server

Avro

Often topic messages will be stored in avro format. kafkaclient uses linkedin/go-avro to help with encoding and decoding them using their avro schemas. Schemas are fetched from, or can be added to, a confluent schema registry.

kafkaclient augments the capabilities of the go-avro library, by enabling the conversion of binary avro messages to golang structs and vice versa. For this to work, struct fields need to contain tags referencing schema record field names. See below:

Avro schema:

{
    "type": "record",
    "fields": [
        {
            "name": "ID",
            "type": [
                "null",
                "long"
            ],
            "default": null
        },
        {
            "name": "NAME",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },  
    ]
}

Matching struct:

type Thing struct {
    ID    int64  `avro:"ID"`
    Name  string `avro:"NAME"`
}
Nested (complex) messages

For encoding/decoding messages including nested messages (as might be the case with retry topics, which could include the error message as well as the original message binary for which processing has failed), an additional tag is necessary for finding the schema needed to decode/encode the nested message. See below:

Avro schema:

{
    "type": "record",
    "fields": [
        {
            "name": "ERROR_MESSAGE",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "ORIGINAL_MESSAGE",
            "type": [
                "null",
                "bytes"
            ],
            "default": null
        },
    ]
}

Matching struct:

type ThingRetry struct {
    ErrorMessage     string `avro:"ERROR_MESSAGE"`
    OriginalMessage  Thing  `avro:"ORIGINAL_MESSAGE" topic:"thing_retry_1"`
}

Documentation

Index

Constants

View Source
const (

	// ConsumerTypeGroup configures the consumer as part of a consumer group
	ConsumerTypeGroup consumerType = "CONSUMER_GROUP"

	// MessageFormatAvro specifies that messages in a topic are stored in avro format
	MessageFormatAvro messageFormat = "MESSAGE_AVRO"
	// MessageFormatJSON specifies that messages in a topic are stored in JSON format
	MessageFormatJSON messageFormat = "MESSAGE_JSON"
	// MessageFormatString specifies that messages in a topic are stored in string format
	MessageFormatString messageFormat = "MESSAGE_STRING"

	// ProducerTypeAsync configures a producer with an asynchronous response mechanism
	ProducerTypeAsync producerType = "PRODUCER_ASYNC"
	// ProducerTypeSync configures a producer with synchronous feedback
	ProducerTypeSync producerType = "PRODUCER_SYNC"
)
View Source
const (
	// BaseSarama can be used in kafkaclient.New to specify that
	// the underlying library used will be Shopify's sarama (https://github.com/Shopify/sarama/)
	BaseSarama baseLibrary = "SARAMA"

	// BaseKafkaGO can be used in kafkaclient.New to specify that
	// the underlying library used will be kafkago (https://github.com/segmentio/kafka-go)
	BaseKafkaGO baseLibrary = "KAFKAGO"
)

Variables

This section is empty.

Functions

func DefaultProcessor

func DefaultProcessor(ctx context.Context,
	dependencies ProcessorDependencies, msg ConsumerMessage) error

Types

type Config

type Config struct {
	KafkaVersion     string
	Brokers          []string
	Topics           []TopicConfig
	SchemaRegURL     string
	ConsumerType     consumerType
	ConsumerGroupID  string
	ProcDependencies ProcessorDependencies // injectable dependencies for message processors
	ProducerType     producerType
	ReadFromOldest   bool
	TLS              *tls.Config
	Debug            bool
}

Config holds specifics used to configure different part of the kafka client

func NewConfig

func NewConfig(
	ctx context.Context, version string, brokers []string,
	topics []TopicConfig, procDependencies ProcessorDependencies,
	schemaRegURL string, consType consumerType, groupID string,
	prodType producerType, readFromOldest bool,
	tls *tls.Config, debug bool) (c Config, e error)

NewConfig constructs and returns a Config struct

func (Config) ReadTopicNames

func (c Config) ReadTopicNames() (n []string)

ReadTopicNames constructs and returns a slice of all topic names

func (Config) TopicMap

func (c Config) TopicMap() (m map[string]TopicConfig)

TopicMap constructs and returns a map of topic configuration, using each topic name as the map key

func (Config) WriteTopicNames

func (c Config) WriteTopicNames() (n []string)

WriteTopicNames constructs and returns a slice of all topic names

type ConsumerMessage

type ConsumerMessage interface {
	Unmarshall(ctx context.Context, native interface{}) (e error)
	Topic() string
	Key() string
	Offset() int64
	Partition() int32
	Value() []byte
}

ConsumerMessage is an interface implememented by kafka consumer message types

type EncoderDecoder

type EncoderDecoder interface {
	// Encode encodes native golang as binary.
	//
	// topic: name of topic the message will be sent to
	// native: the golang data structure to be encoded
	Encode(ctx context.Context, topic string, native interface{}) (b []byte, e error)
	// Decode decodes binary into native golang.
	//
	// topic: name of topic the message was received from
	// b: the binary to be decoded,
	// target: pointer to data structure the binary data will be decoded into
	Decode(ctx context.Context, topic string, b []byte, target interface{}) error

	// GetSchemaID returns the topic schema ID, if applicable
	GetSchemaID(ctx context.Context, topic string) (int, error)
}

EncoderDecoder interface

type KafkaClient

type KafkaClient interface {
	// StartConsume starts the consumption of messages from the configured Kafka topics
	StartConsume(ctx context.Context) error

	// CancelConsume cancels the consumption of messages from configured topics
	CancelConsume() error
	// ProduceMessage adds messages to a specified topic
	ProduceMessage(ctx context.Context, topic string, key string, msg interface{}) error
	// contains filtered or unexported methods
}

KafkaClient is an interface describing the primary uses of this library

func New

func New(base baseLibrary, config Config) (KafkaClient, error)

New constructs and returns a new KafkaClient implementation

type KafkaGoClient

type KafkaGoClient struct {
	// contains filtered or unexported fields
}

KafkaGoClient implements the KafkaClient interface

func (*KafkaGoClient) CancelConsume

func (c *KafkaGoClient) CancelConsume() (e error)

CancelConsume calls the context's context.cancelFunc in order to stop the process of message consumption

func (*KafkaGoClient) ProduceMessage

func (c *KafkaGoClient) ProduceMessage(
	ctx context.Context, topic string, key string, msg interface{}) (e error)

ProduceMessage creates/encodes a message and sends it to the specified topic

func (*KafkaGoClient) StartConsume

func (c *KafkaGoClient) StartConsume(ctx context.Context) (e error)

StartConsume starts consuming configured kafka topic messages

type KafkaGoMessage

type KafkaGoMessage struct {
	// contains filtered or unexported fields
}

KafkaGoMessage holds kafka-go message contents as well as an EncoderDecoder used to unmarshall message data

func (KafkaGoMessage) InfoEvent

func (m KafkaGoMessage) InfoEvent(event string) string

InfoEvent constructs and returns a loggable event relating to the message

func (KafkaGoMessage) Key

func (m KafkaGoMessage) Key() string

Key returns the message key

func (KafkaGoMessage) Offset

func (m KafkaGoMessage) Offset() int64

Offset returns the message offset

func (KafkaGoMessage) Partition

func (m KafkaGoMessage) Partition() int32

Partition returns the message partition

func (KafkaGoMessage) Topic

func (m KafkaGoMessage) Topic() string

Topic returns the message topic

func (KafkaGoMessage) Unmarshall

func (m KafkaGoMessage) Unmarshall(ctx context.Context, native interface{}) (e error)

Unmarshall unmarshalls the message contents into the provided struct

func (KafkaGoMessage) Value

func (m KafkaGoMessage) Value() []byte

Value returns the message byte value

type ProcessorDependencies added in v1.0.2

type ProcessorDependencies interface{}

type RetryTopicMessage

type RetryTopicMessage struct {
	OriginalTopic     string `json:"original_topic" avro:"original_topic"`
	OriginalPartition int32  `json:"original_partition" avro:"original_partition"`
	OriginalOffset    int64  `json:"original_offset" avro:"original_offset"`
	OriginalMessage   []byte `json:"original_message" avro:"original_message"`
	Error             string `json:"error" avro:"error"`
}

RetryTopicMessage is a native go representation of a message on a retry topic

func NewRetryTopicMessage

func NewRetryTopicMessage(
	origTopic string, origPart int32,
	origOffset int64, origMsg []byte, e error) RetryTopicMessage

NewRetryTopicMessage constructs and returns a new RetryTopicMessage to be added to a retry topic

type SaramaClient

type SaramaClient struct {
	// contains filtered or unexported fields
}

SaramaClient implements the KafkaClient interface

func (*SaramaClient) CancelConsume

func (c *SaramaClient) CancelConsume() (e error)

CancelConsume call the context's context.cancelFunc in order to stop the process of message consumption

func (*SaramaClient) ProduceMessage

func (c *SaramaClient) ProduceMessage(
	ctx context.Context, topic string, key string, msg interface{}) (e error)

ProduceMessage creates/encodes a message and sends it to the specified topic

func (*SaramaClient) StartConsume

func (c *SaramaClient) StartConsume(ctx context.Context) (e error)

StartConsume starts consuming configured kafka topic messages

type SaramaMessage

type SaramaMessage struct {
	// contains filtered or unexported fields
}

SaramaMessage holds sarama message contents as well as an EncoderDecoder used to unmarshall message data

func (SaramaMessage) Key

func (m SaramaMessage) Key() string

Key returns the message key

func (SaramaMessage) Offset

func (m SaramaMessage) Offset() int64

Offset returns the message offset

func (SaramaMessage) Partition

func (m SaramaMessage) Partition() int32

Partition returns the message partition

func (SaramaMessage) Topic

func (m SaramaMessage) Topic() string

Topic returns the message topic

func (SaramaMessage) Unmarshall

func (m SaramaMessage) Unmarshall(ctx context.Context, native interface{}) (e error)

Unmarshall unmarshalls the message contents into the provided struct

func (SaramaMessage) Value

func (m SaramaMessage) Value() []byte

Value returns the message byte value

type TopicConfig

type TopicConfig struct {
	Name          string
	MessageFormat messageFormat

	// Set DoConsume to true if this topic should be consumed from
	DoConsume bool
	// Set SoProduce to true if you will need to produce messages to this topic
	DoProduce           bool
	DelayProcessingMins time.Duration
	// FailedProcessingTopic is the retry topic to which a message
	// should be handed off in the case of a failure to process the message
	FailedProcessingTopic string
	// Schema is an optional string representation of the topic schema
	Schema           string
	SchemaVersion    int
	MessageProcessor func(context.Context, ProcessorDependencies, ConsumerMessage) error
	// contains filtered or unexported fields
}

TopicConfig is a struct that holds data regarding an existing Kafka topic that can be consumed from or written to

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL