integration

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2020 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package integration provides a custom Kafka wrapper to work with the outbox pattern.

Send a event to a kafka topic:

profile := profile{
	ID:       "123",
	Name:     "Paul",
	Lastname: "Mark",
}

// newClientCreateEvent return *profileCreateEvent which implements the ExportedEvent interface.
profileCreateEvent := newClientCreateEvent(profile)
event := integration.BuildEventMessage(profileCreateEvent)

kafka := integration.NewKafka([]string{"localhost:9091", "localhost:9092", "localhost:9093"}, "profile")
kafka.Debug(true)
writer := kafka.NewWriter("CLIENT")

defer writer.Close()

err := kafka.SendEvent(context.Background(), writer, event)
if err != nil {
// ...

Consume event from a kafka topic:

kafka := integration.NewKafka([]string{"localhost:9091", "localhost:9092", "localhost:9093"}, "profile")
kafka.Debug(true)
reader := kafka.NewReader("ClientSubscribeTest", "CLIENT")

defer reader.Close()

ctx := context.Background()

fnCallback := func(event EventMessage, err error) {
	// ...
}

kafka.SubscribeEvent(ctx, reader, fnCallback)
// ...

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventMessage

type EventMessage struct {
	// ID unique id of each message.
	ID string `json:"id"`

	// AggregateType the type of the aggregate root to which a given event is related.
	AggregateType string `json:"aggregateType"`

	// AggregateID the id of the aggregate root that is affected by a given event.
	AggregateID string `json:"aggregateId"`

	// Type of event, e.g. "Order Created" or "Order Line Canceled".
	Type string `json:"eventType"`

	// Payload of the event.
	Payload []byte `json:"payload"`
}

EventMessage represents a message that contains the event to be sent or received (outbox pattern).

func BuildEventMessage

func BuildEventMessage(event ExportedEvent) EventMessage

BuildEventMessage takes ExportedEvent as a parameter and returns an EventMessage.

func (EventMessage) String

func (e EventMessage) String() string

type ExportedEvent

type ExportedEvent interface {
	// AggregateID the id of the aggregate root that is affected by a given event; this could for instance be the id
	// of a purchase order or a customer id; Similar to the aggregate type, events pertaining to a sub-entity contained
	// within an aggregate should use the id of the containing aggregate root, e.g. the purchase order id for an order
	// line cancelation event. This id will be used as the key for Kafka messages later on. That way, all events
	// pertaining to one aggregate root or any of its contained sub-entities will go into the same partition of that
	// Kafka topic, which ensures that consumers of that topic will consume all the events related to one and the same
	// aggregate in the exact order as they were produced.
	AggregateID() string

	// AggregateType the type of the aggregate root to which a given event is related; the idea being, leaning on the
	// same concept of domain-driven design, that exported events should refer to an aggregate ("a cluster of domain
	// objects that can be treated as a single unit"), where the aggregate root provides the sole entry point for
	// accessing any of the entities within the aggregate. This could for instance be "purchase order" or "customer".
	//
	// This value will be used to route events to corresponding topics in Kafka, so there’d be a topic for all events
	// related to purchase orders, one topic for all customer-related events etc. Note that also events pertaining to a
	// child entity contained within one such aggregate should use that same type. So e.g. an event representing the
	// cancelation of an individual order line (which is part of the purchase order aggregate) should also use the type
	// of its aggregate root, "order", ensuring that also this event will go into the "order" Kafka topic.
	AggregateType() string

	// Type the type of event, e.g. "Order Created" or "Order Line Canceled". Allows consumers to trigger suitable
	// event handlers.
	Type() string

	// Payload a JSON structure with the actual event contents, e.g. containing a purchase order, information about
	// the purchaser, contained order lines, their price etc.
	Payload() []byte
}

ExportedEvent interface must be implemented by structures that represent events (outbox pattern).

type Kafka

type Kafka struct {
	// contains filtered or unexported fields
}

Kafka is a custom Kafka wrapper to work with the outbox pattern.

func NewKafka

func NewKafka(kafkaBrokerUrls []string, clientId string) *Kafka

NewKafka creates a new Kafka instance taking as an parameter an array of kafka brokers and the profile ID.

func (*Kafka) Debug

func (k *Kafka) Debug(debug bool)

Debug enables logging of incoming messages.

func (*Kafka) NewReader

func (k *Kafka) NewReader(groupID, topic string) *kafka.Reader

NewReader creates a new kafka.Reader to read messages for a topic.

func (*Kafka) NewWriter

func (k *Kafka) NewWriter(topic string) *kafka.Writer

NewWriter creates a new kafka.Writer to write messages on a topic.

func (*Kafka) SendEvent

func (k *Kafka) SendEvent(ctx context.Context, writer *kafka.Writer, event EventMessage) error

SendEvent produces and sends an event message for a kafka topic. The context passed as first argument may also be used to asynchronously cancel the operation.

func (*Kafka) SubscribeEvent

func (k *Kafka) SubscribeEvent(ctx context.Context, reader *kafka.Reader, callback func(event EventMessage, err error))

SubscribeEvent consumes the events of a topic and passes the events to the informed callback function. The method call blocks until an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL