events

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 10 Imported by: 1

README

events logo

events is a typed event bus library for local dispatch and distributed pub/sub.

Go Reference Go Test Go version Latest tag Go Report Card Codecov Unit tests (executed count) Integration tests (executed count)

What events is

events is a typed event bus for Go and handles event publication and fan-out. Durable background work such as retries and worker queues belongs in queue.

It lets applications publish and subscribe to events using normal Go types, with delivery handled either in-process or through distributed backends like NATS, Redis, Kafka, or Google Pub/Sub.

Installation

go get github.com/goforj/events

Quick Start

package main

import (
	"context"
	"fmt"

	"github.com/goforj/events"
)

type UserCreated struct {
	ID string `json:"id"`
}

func main() {
	bus, _ := events.NewSync()
	_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
		fmt.Println("received", event.ID, ctx != nil)
		return nil
	})
	_ = bus.Publish(UserCreated{ID: "123"})
}

Topic Override

type UserCreated struct {
	ID string `json:"id"`
}

func (UserCreated) Topic() string { return "users.created" }

Drivers

Optional distributed backends are separate modules. Keep dependencies lean and install only what you use:

go get github.com/goforj/events/driver/natsevents
go get github.com/goforj/events/driver/redisevents
go get github.com/goforj/events/driver/kafkaevents
go get github.com/goforj/events/driver/natsjetstreamevents
go get github.com/goforj/events/driver/gcppubsubevents
go get github.com/goforj/events/driver/snsevents

Drivers

Driver / Backend Mode Fan-out Durable Queue Semantics Notes
Sync In-process x x Root-backed synchronous dispatch in the caller path.
Null Drop-only x x x Root-backed no-op transport for disabled eventing and tests.
NATS Distributed pub/sub x x Subject-based transport with live integration coverage.
NATS JetStream Distributed stream Partial x Ephemeral JetStream consumers preserve subscribe/close semantics while adding durable stream storage.
Redis Distributed pub/sub x x Redis pub/sub transport; Streams are intentionally deferred.
Kafka Distributed topic/log Partial x Current driver validates topic-based fan-out compatibility, not full consumer-group semantics.
SNS Distributed topic plus queue Partial x SNS fan-out with per-subscription SQS queues to preserve bus-style delivery semantics.
Google Pub/Sub Distributed topic/subscription Partial x Emulator-backed Google Pub/Sub integration with per-subscription fan-out mapping.
SQS Queue target Planned Deferred until a separate async capability surface is intentionally introduced.

Driver Constructor Quick Examples

Use root constructors for local backends, and driver-module constructors for distributed backends. Driver backends live in separate modules so applications only import/link the optional dependencies they actually use.

package main

import (
	"context"

	"github.com/goforj/events"
	"github.com/goforj/events/driver/gcppubsubevents"
	"github.com/goforj/events/driver/kafkaevents"
	"github.com/goforj/events/driver/natsjetstreamevents"
	"github.com/goforj/events/driver/natsevents"
	"github.com/goforj/events/driver/redisevents"
	"github.com/goforj/events/driver/snsevents"
)

func main() {
	ctx := context.Background()

	events.NewSync()
	events.NewNull()

	natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})
	natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})
	redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})
	kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})
	gcppubsubevents.New(ctx, gcppubsubevents.Config{
		ProjectID: "events-project",
		URI:       "127.0.0.1:8085",
	})
	snsevents.New(snsevents.Config{
		Region:   "us-east-1",
		Endpoint: "http://127.0.0.1:4566",
	})
}

Benchmarks

To refresh the live benchmark snapshot and regenerate the charts:

sh scripts/refresh-bench-snapshot.sh

These charts compare one publish-plus-delivery round trip for sync and each enabled distributed driver fixture.

Note: sns and gcppubsub run through local emulators in this repo, so read those results as development approximations rather than direct managed-service latency comparisons.

Events backend latency chart

Events backend throughput chart

Events backend bytes chart

Events backend allocations chart

These checks are for obvious regression detection, not for noisy micro-optimism or hard CI performance gates.

API Index

Group Functions
Bus Driver Ready ReadyContext
Config Config gcppubsubevents.Config kafkaevents.Config natsevents.Config natsjetstreamevents.Config redisevents.Config snsevents.Config
Construction New NewNull NewSync
Driver Constructors gcppubsubevents.New kafkaevents.New natsevents.New natsjetstreamevents.New redisevents.New snsevents.New
Lifecycle Close
Options Option WithCodec
Publish Publish PublishContext TopicEvent
Subscribe Subscribe SubscribeContext Subscription
Testing Fake Fake.Bus Fake.Count Fake.Records Fake.Reset NewFake Record

Bus

Driver

Driver reports the active backend.

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

Ready

Ready reports whether the bus is ready.

bus, _ := events.NewSync()
fmt.Println(bus.Ready() == nil)
// Output: true

ReadyContext

ReadyContext reports whether the bus is ready.

bus, _ := events.NewSync()
fmt.Println(bus.ReadyContext(context.Background()) == nil)
// Output: true

Config

Config

Config configures root bus construction.

Example: define bus construction config

cfg := events.Config{Driver: eventscore.DriverSync}

Example: define bus construction config with all fields

cfg := events.Config{
	Driver:    eventscore.DriverSync, // default: "sync" when empty and no Transport is provided
	Codec:     nil,                   // default: nil uses the built-in JSON codec
	Transport: nil,                   // default: nil keeps dispatch in-process
}

gcppubsubevents.Config

Config configures Google Pub/Sub transport construction.

Example: define Google Pub/Sub driver config

cfg := gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085",
}

Example: define Google Pub/Sub driver config with all fields

cfg := gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085", // default: "" is invalid unless Client is provided
	Client:    nil,              // default: nil creates a client from ProjectID and URI
}

kafkaevents.Config

Config configures Kafka transport construction.

Example: define Kafka driver config

cfg := kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}}

Example: define Kafka driver config with all fields

cfg := kafkaevents.Config{
	Brokers: []string{"127.0.0.1:9092"},
	Dialer:  nil, // default: nil uses a zero-value kafka.Dialer
	Writer:  nil, // default: nil builds a writer with single-message, auto-topic defaults
}

natsevents.Config

Config configures NATS transport construction.

Example: define NATS driver config

cfg := natsevents.Config{URL: "nats://127.0.0.1:4222"}

Example: define NATS driver config with all fields

cfg := natsevents.Config{
	URL:  "nats://127.0.0.1:4222",
	Conn: nil, // default: nil dials URL instead of reusing an existing connection
}

natsjetstreamevents.Config

Config configures NATS JetStream transport construction.

Example: define NATS JetStream driver config

cfg := natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"}

Example: define NATS JetStream driver config with all fields

cfg := natsjetstreamevents.Config{
	URL:               "nats://127.0.0.1:4222",
	Conn:              nil,                    // default: nil dials URL instead of reusing an existing connection
	SubjectPrefix:     "events.",              // default: "events."
	StreamNamePrefix:  "EVENTS_",              // default: "EVENTS_"
	InactiveThreshold: 30 * time.Second,       // default: 30s
	AckWait:           30 * time.Second,       // default: 30s
	FetchMaxWait:      250 * time.Millisecond, // default: 250ms
	Storage:           jetstream.MemoryStorage,// default: MemoryStorage
}

redisevents.Config

Config configures Redis transport construction.

Example: define Redis driver config

cfg := redisevents.Config{Addr: "127.0.0.1:6379"}

Example: define Redis driver config with all fields

cfg := redisevents.Config{
	Addr:   "127.0.0.1:6379",
	Client: nil, // default: nil constructs a client from Addr
}

snsevents.Config

Config configures SNS transport construction.

Example: define SNS driver config

cfg := snsevents.Config{
	Region:   "us-east-1",
	Endpoint: "http://127.0.0.1:4566",
}

Example: define SNS driver config with all fields

cfg := snsevents.Config{
	Region:            "us-east-1",
	Endpoint:          "http://127.0.0.1:4566", // default: "" uses normal AWS resolution
	SNSClient:         nil,                      // default: nil creates a client from Region and Endpoint
	SQSClient:         nil,                      // default: nil creates a client from Region and Endpoint
	TopicNamePrefix:   "events-",                // default: ""
	QueueNamePrefix:   "events-",                // default: ""
	WaitTimeSeconds:   1,                        // default: 1
	VisibilityTimeout: 30,                       // default: 30
}

Construction

New

New constructs a root bus for the requested driver.

bus, _ := events.New(events.Config{Driver: "sync"})
fmt.Println(bus.Driver())
// Output: sync

NewNull

NewNull constructs the root null bus.

bus, _ := events.NewNull()
fmt.Println(bus.Driver())
// Output: null

NewSync

NewSync constructs the root sync bus.

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

Driver Constructors

gcppubsubevents.New

New constructs a Google Pub/Sub-backed driver.

driver, _ := gcppubsubevents.New(context.Background(), gcppubsubevents.Config{
	ProjectID: "events-project",
	URI:       "127.0.0.1:8085",
})

kafkaevents.New

New constructs a Kafka-backed driver.

driver, _ := kafkaevents.New(kafkaevents.Config{Brokers: []string{"127.0.0.1:9092"}})

natsevents.New

New connects a NATS-backed driver from config.

driver, _ := natsevents.New(natsevents.Config{URL: "nats://127.0.0.1:4222"})

natsjetstreamevents.New

New connects a NATS JetStream-backed driver from config.

driver, _ := natsjetstreamevents.New(natsjetstreamevents.Config{URL: "nats://127.0.0.1:4222"})

redisevents.New

New constructs a Redis pub/sub-backed driver.

driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})

snsevents.New

New constructs an SNS-backed driver.

driver, _ := snsevents.New(snsevents.Config{
	Region:   "us-east-1",
	Endpoint: "http://127.0.0.1:4566",
})

Lifecycle

Close

Close closes the underlying Pub/Sub client.

driver, _ := redisevents.New(redisevents.Config{Addr: "127.0.0.1:6379"})

Options

Option

Option configures root bus behavior.

WithCodec

WithCodec overrides the default event codec.

bus, _ := events.NewSync(events.WithCodec(nil))
fmt.Println(bus.Driver())
// Output: sync

Publish

Publish

Publish publishes an event using the background context.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(event UserCreated) {
	fmt.Println(event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

PublishContext

PublishContext publishes an event using the configured codec and dispatch flow.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

TopicEvent

TopicEvent overrides the derived topic for an event.

Subscribe

Subscribe

Subscribe registers a handler using the background context.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID)
	return nil
})
defer sub.Close()
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

SubscribeContext

SubscribeContext registers a typed handler.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
defer sub.Close()
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

Subscription

Subscription releases a subscription when closed.

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(event UserCreated) {
	fmt.Println("received", event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
_ = sub.Close()
_ = bus.Publish(UserCreated{ID: "456"})
// Output: received 123

Testing

Fake

Fake provides a root-package testing helper that records published events.

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

Fake.Bus

Bus returns the wrapped API to inject into code under test.

fake := events.NewFake()
bus := fake.Bus()
fmt.Println(bus.Ready() == nil)
// Output: true

Fake.Count

Count returns the total number of recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(fake.Count())
// Output: 1

Fake.Records

Records returns a copy of recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(len(fake.Records()))
// Output: 1

Fake.Reset

Reset clears recorded publishes.

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fake.Reset()
fmt.Println(fake.Count())
// Output: 0

NewFake

NewFake creates a new fake event harness backed by the root sync bus.

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

Record

Record captures one published event observed by a Fake bus.

type UserCreated struct {
	ID string `json:"id"`
}

record := events.Record{Event: UserCreated{ID: "123"}}
fmt.Printf("%T\n", record.Event)
// Output: main.UserCreated

Docs Tooling

The repository includes lightweight docs tooling under docs/.

Run the watcher to auto-regenerate docs on file changes:

sh docs/watcher.sh

Documentation

Overview

Package events provides a type-based event bus API for Go applications.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrInvalidHandler indicates a subscribe handler has an unsupported shape.
	ErrInvalidHandler = errors.New("events: invalid handler")
	// ErrNilEvent indicates a publish call received a nil event.
	ErrNilEvent = errors.New("events: nil event")
	// ErrEmptyTopic indicates topic resolution produced an empty topic.
	ErrEmptyTopic = errors.New("events: empty topic")
)

Functions

This section is empty.

Types

type API

type API interface {
	// Driver reports the active bus backend.
	// @group Core
	//
	// Example: inspect the active backend through the interface
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	fmt.Println(bus.Driver())
	//	// Output: sync
	Driver() eventscore.Driver
	// Ready performs a background-context readiness check.
	// @group Core
	//
	// Example: check readiness through the interface
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	fmt.Println(bus.Ready() == nil)
	//	// Output: true
	Ready() error
	// ReadyContext performs a readiness check with the provided context.
	// @group Core
	//
	// Example: check readiness with a caller context
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	fmt.Println(bus.ReadyContext(context.Background()) == nil)
	//	// Output: true
	ReadyContext(ctx context.Context) error
	// Publish dispatches an event with the background context.
	// @group Core
	//
	// Example: publish a typed event through the interface
	//
	//	type UserCreated struct {
	//		ID string `json:"id"`
	//	}
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	_, _ = bus.Subscribe(func(event UserCreated) {
	//		fmt.Println(event.ID)
	//	})
	//	_ = bus.Publish(UserCreated{ID: "123"})
	//	// Output: 123
	Publish(event any) error
	// PublishContext dispatches an event with the provided context.
	// @group Core
	//
	// Example: publish with a caller context
	//
	//	type UserCreated struct {
	//		ID string `json:"id"`
	//	}
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	//		fmt.Println(event.ID, ctx != nil)
	//		return nil
	//	})
	//	_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
	//	// Output: 123 true
	PublishContext(ctx context.Context, event any) error
	// Subscribe registers a typed handler using the background context.
	// @group Core
	//
	// Example: subscribe through the interface
	//
	//	type UserCreated struct {
	//		ID string `json:"id"`
	//	}
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	//		_ = ctx
	//		_ = event
	//		return nil
	//	})
	//	defer sub.Close()
	Subscribe(handler any) (Subscription, error)
	// SubscribeContext registers a typed handler with the provided context.
	// @group Core
	//
	// Example: subscribe with a caller context through the interface
	//
	//	type UserCreated struct {
	//		ID string `json:"id"`
	//	}
	//
	//	api, _ := events.NewSync()
	//	var bus events.API = api
	//	sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
	//		_ = ctx
	//		_ = event
	//		return nil
	//	})
	//	defer sub.Close()
	SubscribeContext(ctx context.Context, handler any) (Subscription, error)
}

API is the root application-facing bus contract. @group Core

Example: keep an API-typed bus reference

api, _ := events.NewSync()
var bus events.API = api
fmt.Println(bus.Driver())
// Output: sync

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is the root event bus implementation. @group Bus

func New

func New(cfg Config, opts ...Option) (*Bus, error)

New constructs a root bus for the requested driver. @group Construction

Example: construct a bus from config

bus, _ := events.New(events.Config{Driver: "sync"})
fmt.Println(bus.Driver())
// Output: sync

func NewNull

func NewNull(opts ...Option) (*Bus, error)

NewNull constructs the root null bus. @group Construction

Example: construct a null bus

bus, _ := events.NewNull()
fmt.Println(bus.Driver())
// Output: null

func NewSync

func NewSync(opts ...Option) (*Bus, error)

NewSync constructs the root sync bus. @group Construction

Example: construct a sync bus

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

func (*Bus) Driver

func (b *Bus) Driver() eventscore.Driver

Driver reports the active backend. @group Bus

Example: inspect the active backend

bus, _ := events.NewSync()
fmt.Println(bus.Driver())
// Output: sync

func (*Bus) Publish

func (b *Bus) Publish(event any) error

Publish publishes an event using the background context. @group Publish

Example: publish a typed event

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(event UserCreated) {
	fmt.Println(event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

func (*Bus) PublishContext

func (b *Bus) PublishContext(ctx context.Context, event any) error

PublishContext publishes an event using the configured codec and dispatch flow. @group Publish

Example: publish with a caller context

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
_, _ = bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

func (*Bus) Ready

func (b *Bus) Ready() error

Ready reports whether the bus is ready. @group Bus

Example: check readiness

bus, _ := events.NewSync()
fmt.Println(bus.Ready() == nil)
// Output: true

func (*Bus) ReadyContext

func (b *Bus) ReadyContext(ctx context.Context) error

ReadyContext reports whether the bus is ready. @group Bus

Example: check readiness with a caller context

bus, _ := events.NewSync()
fmt.Println(bus.ReadyContext(context.Background()) == nil)
// Output: true

func (*Bus) Subscribe

func (b *Bus) Subscribe(handler any) (Subscription, error)

Subscribe registers a handler using the background context. @group Subscribe

Example: subscribe to a typed event

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID)
	return nil
})
defer sub.Close()
_ = bus.Publish(UserCreated{ID: "123"})
// Output: 123

func (*Bus) SubscribeContext

func (b *Bus) SubscribeContext(ctx context.Context, handler any) (Subscription, error)

SubscribeContext registers a typed handler. @group Subscribe

Example: subscribe with a caller context

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.SubscribeContext(context.Background(), func(ctx context.Context, event UserCreated) error {
	fmt.Println(event.ID, ctx != nil)
	return nil
})
defer sub.Close()
_ = bus.PublishContext(context.Background(), UserCreated{ID: "123"})
// Output: 123 true

type Codec

type Codec interface {
	Marshal(v any) ([]byte, error)
	Unmarshal(data []byte, v any) error
}

Codec marshals and unmarshals event payloads. @group Options

Example: define a custom codec

var codec events.Codec
fmt.Println(codec == nil)
// Output: true

type Config

type Config struct {
	// Driver selects the root bus backend.
	Driver eventscore.Driver
	// Codec overrides the default JSON codec.
	Codec Codec
	// Transport installs a driver-backed transport for distributed delivery.
	Transport eventscore.DriverAPI
}

Config configures root bus construction. @group Config

Example: define bus construction config

cfg := events.Config{Driver: eventscore.DriverSync}
_ = cfg

Example: define bus construction config with all fields

cfg := events.Config{
	Driver:    eventscore.DriverSync, // default: "sync" when empty and no Transport is provided
	Codec:     nil,                   // default: nil uses the built-in JSON codec
	Transport: nil,                   // default: nil keeps dispatch in-process
}
_ = cfg

type Fake

type Fake struct {
	// contains filtered or unexported fields
}

Fake provides a root-package testing helper that records published events. @group Testing

Example: keep a fake for assertions in tests

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

func NewFake

func NewFake() *Fake

NewFake creates a new fake event harness backed by the root sync bus. @group Testing

Example: construct a recording fake

fake := events.NewFake()
fmt.Println(fake.Count())
// Output: 0

func (*Fake) Bus

func (f *Fake) Bus() API

Bus returns the wrapped API to inject into code under test. @group Testing

Example: inject the fake bus into application code

fake := events.NewFake()
bus := fake.Bus()
fmt.Println(bus.Ready() == nil)
// Output: true

func (*Fake) Count

func (f *Fake) Count() int

Count returns the total number of recorded publishes. @group Testing

Example: count recorded publishes

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(fake.Count())
// Output: 1

func (*Fake) Records

func (f *Fake) Records() []Record

Records returns a copy of recorded publishes. @group Testing

Example: inspect recorded publishes

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fmt.Println(len(fake.Records()))
// Output: 1

func (*Fake) Reset

func (f *Fake) Reset()

Reset clears recorded publishes. @group Testing

Example: clear recorded publishes

type UserCreated struct {
	ID string `json:"id"`
}

fake := events.NewFake()
_ = fake.Bus().Publish(UserCreated{ID: "123"})
fake.Reset()
fmt.Println(fake.Count())
// Output: 0

type Option

type Option func(*options)

Option configures root bus behavior. @group Options

Example: keep an option for later bus construction

opt := events.WithCodec(nil)
fmt.Println(opt != nil)
// Output: true

func WithCodec

func WithCodec(codec Codec) Option

WithCodec overrides the default event codec. @group Options

Example: construct a bus with a custom codec

bus, _ := events.NewSync(events.WithCodec(nil))
fmt.Println(bus.Driver())
// Output: sync

type Record

type Record struct {
	Event any
}

Record captures one published event observed by a Fake bus. @group Testing

Example: inspect a recorded event

type UserCreated struct {
	ID string `json:"id"`
}

record := events.Record{Event: UserCreated{ID: "123"}}
fmt.Printf("%T\n", record.Event)
// Output: main.UserCreated

type Subscription

type Subscription = eventscore.Subscription

Subscription releases a subscription when closed. @group Subscribe

Example: unsubscribe from a typed event

type UserCreated struct {
	ID string `json:"id"`
}

bus, _ := events.NewSync()
sub, _ := bus.Subscribe(func(event UserCreated) {
	fmt.Println("received", event.ID)
})
_ = bus.Publish(UserCreated{ID: "123"})
_ = sub.Close()
_ = bus.Publish(UserCreated{ID: "456"})
// Output: received 123

type TopicEvent

type TopicEvent interface {
	Topic() string
}

TopicEvent overrides the derived topic for an event. @group Publish

Directories

Path Synopsis
driver
kafkaevents module
natsevents module
redisevents module
eventscore module
eventstest module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL