kafka

package module
v0.0.0-...-8c522c3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2026 License: Apache-2.0 Imports: 9 Imported by: 0

README

xevent/kafka

xevent/kafka provides Kafka adapters for the transport-agnostic xevent core. It bridges xevent.Event and xevent.Dispatcher onto the xkafka producer and consumer primitives.

Installation

go get github.com/codesjoy/pkg/basic/xevent/kafka

This module depends on:

  • github.com/codesjoy/pkg/basic/xevent
  • github.com/codesjoy/pkg/basic/xkafka

What This Module Provides

  • Publisher: maps an xevent.Event onto a Kafka message
  • Publisher.Send: maps an xevent.Outbound onto a Kafka message
  • Subscriber: consumes Kafka messages and dispatches them through a bound xevent.Dispatcher

Publish Example

package main

import (
	"context"
	"encoding/json"

	xeventkafka "github.com/codesjoy/pkg/basic/xevent/kafka"
)

type OrderCreated struct {
	ID      string `json:"id"`
	OrderID string `json:"order_id"`
}

func (*OrderCreated) EventType() string {
	return "order.created"
}

func (e *OrderCreated) EventID() string {
	return e.ID
}

func (e *OrderCreated) PartitionKey() string {
	return e.OrderID
}

func (e *OrderCreated) MarshalPayload() ([]byte, error) {
	return json.Marshal(e)
}

func (e *OrderCreated) UnmarshalPayload(data []byte) error {
	return json.Unmarshal(data, e)
}

func main() {
	producer := mustNewProducer() // returns *xkafka.Producer

	publisher, err := xeventkafka.NewPublisher(xeventkafka.PublisherConfig{
		Producer: producer,
		Topic:    "orders",
	})
	if err != nil {
		panic(err)
	}

	if err := publisher.Publish(context.Background(), &OrderCreated{
		ID:      "evt_1",
		OrderID: "order-1",
	}); err != nil {
		panic(err)
	}
}

Subscribe Example

package main

import (
	"context"
	"encoding/json"
	"fmt"

	xevent "github.com/codesjoy/pkg/basic/xevent"
	xeventkafka "github.com/codesjoy/pkg/basic/xevent/kafka"
)

type OrderCreated struct {
	ID      string `json:"id"`
	OrderID string `json:"order_id"`
}

func (*OrderCreated) EventType() string {
	return "order.created"
}

func (e *OrderCreated) EventID() string {
	return e.ID
}

func (e *OrderCreated) PartitionKey() string {
	return e.OrderID
}

func (e *OrderCreated) MarshalPayload() ([]byte, error) {
	return json.Marshal(e)
}

func (e *OrderCreated) UnmarshalPayload(data []byte) error {
	return json.Unmarshal(data, e)
}

func main() {
	dispatcher := xevent.NewDispatcher()

	if err := xevent.On[*OrderCreated](
		dispatcher,
		func(_ context.Context, event *OrderCreated) error {
			fmt.Println(event.ID, event.OrderID)
			return nil
		},
	); err != nil {
		panic(err)
	}

	consumer := mustNewGroupConsumer() // returns *xkafka.GroupConsumer

	subscriber, err := xeventkafka.NewSubscriber(xeventkafka.SubscriberConfig{
		Consumer:   consumer,
		Dispatcher: dispatcher,
	})
	if err != nil {
		panic(err)
	}
	defer subscriber.Close()

	if err := subscriber.Subscribe(context.Background()); err != nil {
		panic(err)
	}
}

API Overview

PublisherConfig
  • Producer: required *xkafka.Producer
  • Topic: required Kafka topic name
  • EventTypeHeader: optional override for the event type header name
  • EventIDHeader: optional override for the event ID header name
SubscriberConfig
  • Consumer: required *xkafka.GroupConsumer
  • Dispatcher: required *xevent.Dispatcher
  • EventTypeHeader: optional override for the event type header name

Behavior Notes

  • Default Kafka headers:
    • event type: x-event-type
    • event ID: x-event-id
  • Subscriber.Subscribe(ctx) is one-shot for a subscriber instance.
  • Subscriber.Close() is idempotent.
  • NewSubscriber requires a dispatcher up front; typed handlers are registered on that dispatcher before consumption starts.

Use As Outbox Sender

Publisher also implements xevent.Sender, so it can be used directly by xevent/outbox:

relay, err := outbox.NewRelay(outbox.RelayConfig{
	Store:  store,
	Sender: publisher,
})
if err != nil {
	panic(err)
}
_ = relay

Relationship To xevent

Use xevent to define events, bind typed handlers, and work with transport-neutral dispatch. Use xevent/kafka when you need to publish those events to Kafka or consume Kafka messages into an xevent.Dispatcher.

Documentation

Overview

Package kafka provides Kafka adapters for xevent using the xkafka package.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNilProducer indicates the Kafka producer dependency is nil.
	ErrNilProducer = errors.New("xevent kafka producer is nil")
	// ErrNilConsumer indicates the Kafka consumer dependency is nil.
	ErrNilConsumer = errors.New("xevent kafka consumer is nil")
	// ErrNilDispatcher indicates the Kafka dispatcher dependency is nil.
	ErrNilDispatcher = errors.New("xevent kafka dispatcher is nil")
	// ErrTopicRequired indicates the Kafka topic is empty.
	ErrTopicRequired = errors.New("xevent kafka topic is required")
	// ErrEventTypeHeaderRequired indicates the Kafka event type header is missing.
	ErrEventTypeHeaderRequired = errors.New("xevent kafka event type header is required")
)

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher adapts xevent.Publisher onto xkafka.Producer.

func NewPublisher

func NewPublisher(cfg PublisherConfig) (*Publisher, error)

NewPublisher creates a Kafka-backed xevent publisher.

func (*Publisher) BatchSend

func (p *Publisher) BatchSend(ctx context.Context, outbounds []*xevent.Outbound) []error

BatchSend sends multiple xevent.Outbound payloads to Kafka in one batch. It returns a slice of errors, one per outbound; nil means success. An empty input returns nil.

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, event xevent.Event) error

Publish publishes one xevent.Event to Kafka.

func (*Publisher) Send

func (p *Publisher) Send(ctx context.Context, outbound *xevent.Outbound) error

Send publishes one xevent.Outbound to Kafka.

type PublisherConfig

type PublisherConfig struct {
	Producer        *xkafka.Producer
	Topic           string
	EventTypeHeader string
	EventIDHeader   string
}

PublisherConfig configures the Kafka-backed xevent publisher.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber adapts xkafka.GroupConsumer onto xevent.Subscriber.

func NewSubscriber

func NewSubscriber(cfg SubscriberConfig) (*Subscriber, error)

NewSubscriber creates a Kafka-backed xevent subscriber.

func (*Subscriber) Close

func (s *Subscriber) Close() error

Close releases the wrapped Kafka consumer.

func (*Subscriber) Subscribe

func (s *Subscriber) Subscribe(ctx context.Context) error

Subscribe starts consuming Kafka messages and dispatching them to typed handlers.

type SubscriberConfig

type SubscriberConfig struct {
	Consumer        *xkafka.GroupConsumer
	Dispatcher      *xevent.Dispatcher
	EventTypeHeader string
}

SubscriberConfig configures the Kafka-backed xevent subscriber.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL