Documentation ¶
Overview ¶
Package integration provides a custom Kafka wrapper to work with the outbox pattern.
Send a event to a kafka topic:
profile := profile{ ID: "123", Name: "Paul", Lastname: "Mark", } // newClientCreateEvent return *profileCreateEvent which implements the ExportedEvent interface. profileCreateEvent := newClientCreateEvent(profile) event := integration.BuildEventMessage(profileCreateEvent) kafka := integration.NewKafka([]string{"localhost:9091", "localhost:9092", "localhost:9093"}, "profile") kafka.Debug(true) writer := kafka.NewWriter("CLIENT") defer writer.Close() err := kafka.SendEvent(context.Background(), writer, event) if err != nil { // ...
Consume event from a kafka topic:
kafka := integration.NewKafka([]string{"localhost:9091", "localhost:9092", "localhost:9093"}, "profile") kafka.Debug(true) reader := kafka.NewReader("ClientSubscribeTest", "CLIENT") defer reader.Close() ctx := context.Background() fnCallback := func(event EventMessage, err error) { // ... } kafka.SubscribeEvent(ctx, reader, fnCallback) // ...
Index ¶
- type EventMessage
- type ExportedEvent
- type Kafka
- func (k *Kafka) Debug(debug bool)
- func (k *Kafka) NewReader(groupID, topic string) *kafka.Reader
- func (k *Kafka) NewWriter(topic string) *kafka.Writer
- func (k *Kafka) SendEvent(ctx context.Context, writer *kafka.Writer, event EventMessage) error
- func (k *Kafka) SubscribeEvent(ctx context.Context, reader *kafka.Reader, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventMessage ¶
type EventMessage struct { // ID unique id of each message. ID string `json:"id"` // AggregateType the type of the aggregate root to which a given event is related. AggregateType string `json:"aggregateType"` // AggregateID the id of the aggregate root that is affected by a given event. AggregateID string `json:"aggregateId"` // Type of event, e.g. "Order Created" or "Order Line Canceled". Type string `json:"eventType"` // Payload of the event. Payload []byte `json:"payload"` }
EventMessage represents a message that contains the event to be sent or received (outbox pattern).
func BuildEventMessage ¶
func BuildEventMessage(event ExportedEvent) EventMessage
BuildEventMessage takes ExportedEvent as a parameter and returns an EventMessage.
func (EventMessage) String ¶
func (e EventMessage) String() string
type ExportedEvent ¶
type ExportedEvent interface { // AggregateID the id of the aggregate root that is affected by a given event; this could for instance be the id // of a purchase order or a customer id; Similar to the aggregate type, events pertaining to a sub-entity contained // within an aggregate should use the id of the containing aggregate root, e.g. the purchase order id for an order // line cancelation event. This id will be used as the key for Kafka messages later on. That way, all events // pertaining to one aggregate root or any of its contained sub-entities will go into the same partition of that // Kafka topic, which ensures that consumers of that topic will consume all the events related to one and the same // aggregate in the exact order as they were produced. AggregateID() string // AggregateType the type of the aggregate root to which a given event is related; the idea being, leaning on the // same concept of domain-driven design, that exported events should refer to an aggregate ("a cluster of domain // objects that can be treated as a single unit"), where the aggregate root provides the sole entry point for // accessing any of the entities within the aggregate. This could for instance be "purchase order" or "customer". // // This value will be used to route events to corresponding topics in Kafka, so there’d be a topic for all events // related to purchase orders, one topic for all customer-related events etc. Note that also events pertaining to a // child entity contained within one such aggregate should use that same type. So e.g. an event representing the // cancelation of an individual order line (which is part of the purchase order aggregate) should also use the type // of its aggregate root, "order", ensuring that also this event will go into the "order" Kafka topic. AggregateType() string // Type the type of event, e.g. "Order Created" or "Order Line Canceled". Allows consumers to trigger suitable // event handlers. Type() string // Payload a JSON structure with the actual event contents, e.g. containing a purchase order, information about // the purchaser, contained order lines, their price etc. Payload() []byte }
ExportedEvent interface must be implemented by structures that represent events (outbox pattern).
type Kafka ¶
type Kafka struct {
// contains filtered or unexported fields
}
Kafka is a custom Kafka wrapper to work with the outbox pattern.
func NewKafka ¶
NewKafka creates a new Kafka instance taking as an parameter an array of kafka brokers and the profile ID.
func (*Kafka) SendEvent ¶
func (k *Kafka) SendEvent(ctx context.Context, writer *kafka.Writer, event EventMessage) error
SendEvent produces and sends an event message for a kafka topic. The context passed as first argument may also be used to asynchronously cancel the operation.
func (*Kafka) SubscribeEvent ¶
func (k *Kafka) SubscribeEvent(ctx context.Context, reader *kafka.Reader, callback func(event EventMessage, err error))
SubscribeEvent consumes the events of a topic and passes the events to the informed callback function. The method call blocks until an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.