kafka

package
v0.13.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2021 License: MIT Imports: 10 Imported by: 0

README

Gnomock Kafka

Gnomock Kafka is a Gnomock preset for running tests against a real Kafka event streaming platform, without mocks.

package kafka_test

import (
	"context"
	"os"
	"testing"
	"time"

	"github.com/orlangure/gnomock"
	"github.com/orlangure/gnomock/preset/kafka"
	kafkaclient "github.com/segmentio/kafka-go"
	"github.com/stretchr/testify/require"
)

// nolint:funlen
func TestPreset(t *testing.T) {
	t.Parallel()

	messages := []kafka.Message{
		{
			Topic: "events",
			Key:   "order",
			Value: "1",
			Time:  time.Now().UnixNano(),
		},
		{
			Topic: "alerts",
			Key:   "CPU",
			Value: "92",
			Time:  time.Now().UnixNano(),
		},
	}

	p := kafka.Preset(
		kafka.WithTopics("topic-1", "topic-2"),
		kafka.WithMessages(messages...),
	)

	container, err := gnomock.Start(
		p,
		gnomock.WithDebugMode(), gnomock.WithLogWriter(os.Stdout),
		gnomock.WithContainerName("kafka"),
	)
	require.NoError(t, err)

	defer func() { require.NoError(t, gnomock.Stop(container)) }()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
	defer cancel()

	alertsReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{
		Brokers: []string{container.Address(kafka.BrokerPort)},
		Topic:   "alerts",
	})

	m, err := alertsReader.ReadMessage(ctx)
	require.NoError(t, err)
	require.NoError(t, alertsReader.Close())

	require.Equal(t, "CPU", string(m.Key))
	require.Equal(t, "92", string(m.Value))

	eventsReader := kafkaclient.NewReader(kafkaclient.ReaderConfig{
		Brokers: []string{container.Address(kafka.BrokerPort)},
		Topic:   "events",
	})

	m, err = eventsReader.ReadMessage(ctx)
	require.NoError(t, err)
	require.NoError(t, eventsReader.Close())

	require.Equal(t, "order", string(m.Key))
	require.Equal(t, "1", string(m.Value))

	c, err := kafkaclient.Dial("tcp", container.Address(kafka.BrokerPort))
	require.NoError(t, err)

	require.NoError(t, c.DeleteTopics("topic-1", "topic-2"))
	require.Error(t, c.DeleteTopics("unknown-topic"))

	require.NoError(t, c.Close())
}

Documentation

Overview

Package kafka provides a Gnomock Preset for Kafka.

Index

Constants

View Source
const (
	BrokerPort    = "broker"
	ZooKeeperPort = "zookeeper"
	WebPort       = "web"
)

The following ports are exposed by this preset:

Variables

This section is empty.

Functions

func Preset

func Preset(opts ...Option) gnomock.Preset

Preset creates a new Gmomock Kafka preset. This preset includes a Kafka specific healthcheck function and default Kafka image and ports.

Kafka preset uses a constant broker port number (49092) instead of allocating a random unoccupied port on every run. Please make sure this port is available when using this preset.

By default, this preset uses `lensesio/fast-data-dev` docker image with version `2.5.1-L0` (version can be changed using `WithVersion`).

Types

type Message

type Message struct {
	Topic string `json:"topic"`
	Key   string `json:"key"`
	Value string `json:"value"`
	Time  int64  `json:"time"`
}

Message is a single message sent to Kafka.

type Option

type Option func(*P)

Option is an optional configuration of this Gnomock preset. Use available Options to configure the container.

func WithMessages

func WithMessages(messages ...Message) Option

WithMessages makes sure that these messages can be consumed during the test once the container is ready.

func WithMessagesFile

func WithMessagesFile(files string) Option

WithMessagesFile allows to load messages to be sent into Kafka from one or multiple files.

func WithTopics

func WithTopics(topics ...string) Option

WithTopics makes sure that the provided topics are available when Kafka is up and running.

func WithVersion

func WithVersion(version string) Option

WithVersion sets image version.

type P

type P struct {
	Version       string    `json:"version"`
	Topics        []string  `json:"topics"`
	Messages      []Message `json:"messages"`
	MessagesFiles []string  `json:"messages_files"`
}

P is a Gnomock Preset implementation of Kafka.

func (*P) Image

func (p *P) Image() string

Image returns an image that should be pulled to create this container.

func (*P) Options

func (p *P) Options() []gnomock.Option

Options returns a list of options to configure this container.

func (*P) Ports

func (p *P) Ports() gnomock.NamedPorts

Ports returns ports that should be used to access this container.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL