bee

package module
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 7, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

tests

bee - eventsourcing on nats.io

Bee is a minimal Go library for implementing CQRS & Event-Sourcing using NATS JetStream as the transport and persistence layer. It offers clear abstractions with minimal dependencies and overhead.

Bee

Why Bee?

  • Minimal Infrastructure: Leverages your existing NATS JetStream.
  • Type-Safe: Commands and events are protobuf-based.
  • Fast Replay: Efficiently rebuild aggregate states from event streams.
  • Small Footprint: Less than 1500 lines of simple, maintainable Go.

Typical Use Cases

Bee is great for small, event-centric scenarios:

  • Task-Oriented Microservices: Independent scaling of read/write sides.
  • Audit Trails & Ledgers: Immutable event history for compliance.
  • Sagas & Workflows: Event-driven state transitions.
  • Edge/IoT Deployments: Compact deployment on resource-limited devices.
  • Real-Time Game States: Fast catch-up of player states.
  • SaaS On-Prem Plugins: Easy local deployment without infrastructure complexity.
  • Ad-Hoc Analytics: Quickly spin up event projections.

Table of Contents

Getting Started

Installing

To start using bee, install Go and run go get:

go get github.com/blinkinglight/bee

This will retrieve the library and update your go.mod and go.sum files.

Settings
bee.EventsPrefix = "events"
bee.CommandsPrefix = "cmds"
bee.QueryPrefix = "query"
Interfaces

// Command handler
type CommandHandler interface {
	Handle(m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error)
}

// Projection handler
type EventApplier interface {
	ApplyEvent(event *gen.EventEnvelope) error
}

// Query handler
type Querier interface {
	Query(query *gen.QueryEnvelope) (interface{}, error)
}

// Replay handler
type ReplayHandler interface {
	ApplyEvent(m *gen.EventEnvelope) error
}

// Event process manager handler
type ManagerEventApplier interface {
	Handle(event *gen.EventEnvelope) ([]*gen.CommandEnvelope, error)
}

Functions
func Command(ctx context.Context, handler CommandHandler, opts ...co.Options)
func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error 
func Query(ctx context.Context, fn Querier, opts ...qo.Options) error 	
func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options)
func ReplayAndSubscribe[T EventApplier](ctx context.Context, agg T, opts ...ro.Options) <-chan T 
func Event(ctx context.Context, fn ManagerEventApplier, opts ...eo.Options)
Options

Command options

func WithSubject(subject string) Options
func WithAggreate(aggregate string) Options

Projection options

func WithSubject(subject string) Options
func WithDurable(name string) Options
func WithAggreate(aggregate string) Options
func WithAggrateID(aggregateID string) Options 
func WithPrefix(prefix string) Options

Query options

func WithSubject(subject string) Options 
func WithAggreate(aggregate string) Options

Replay options

func WithEventType(eventType string) Options
func WithParent(aggreate, id string) Options 
func WithSubject(subject string) Options
func WithAggreate(aggregate string) Options
func WithStartSeq(seq uint64) Options
func WithAggregateID(id string) Options
func WithTimeout(timeout time.Duration) Options

Event options

func WithSubject(subject string) 
func WithAggreate(aggregate string)
func WithAggregateID(aggregateID string)
func WithDurableName(durableName string)
func WithPrefix(prefix string)

Usage:

ctx = bee.WithNats(ctx, nc)
ctx = bee.WithJetStream(ctx, js)
go bee.Command(ctx, NewService(), co.WithAggreate("users"))
go bee.Project(ctx, NewUserProjection(), po.WithAggreate("users"))
go bee.Query(ctx, NewUserProjection(), qo.WithAggreate("users"))
go bee.Event(ctx, NewProcessManager(), eo.WithAggreate("users"))
agg := NewAggregate(m.AggregateId)
bee.Replay(ctx, agg, ro.WithAggreate(m.Aggregate), ro.WithAggregateID(m.AggregateId))

Example

router.Get("/stream/{id}", func(w http.ResponseWriter, r *http.Request) {
    w.WriteHeader(200)
    id := chi.URLParam(r, "id")
    sse := datastar.NewSSE(w, r)
    _ = sse

    ctx := bee.WithJetStream(r.Context(), js)
    ctx = bee.WithNats(ctx, nc)

    agg := &Aggregate{}
    updates := bee.ReplayAndSubscribe(ctx, agg, ro.WithAggreate(users.Aggregate), ro.WithAggregateID(id))
    for {
        select {
        case <-r.Context().Done():
            return
        case update := <-updates:
            sse.MergeFragmentTempl(partials.History(update.History))
        }
    }
})

and live projection aggrate:

type Aggregate struct {
	History []string
}

func (a *Aggregate) ApplyEvent(e *gen.EventEnvelope) error {
	event, err := bee.UnmarshalEvent(e)
	if err != nil {
		return fmt.Errorf("unmarshal event: %w", err)
	}
	switch event := event.(type) {
	case *users.UserCreated:
		a.History = append(a.History, "User created: "+event.Name+" from "+event.Country)
	case *users.UserUpdated:
		a.History = append(a.History, "User updated: "+event.Name+" from "+event.Country)
	case *users.UserNameChanged:
		a.History = append(a.History, "User name changed to: "+event.Name)
	default:
		log.Printf("unknown event type: %T", event)
		return nil // Ignore other event types
	}
	return nil
}
Prebuild examples

to run examples, first you need "nats server" to run with jetstream enabled. If you dont have one, first run this:

go run ./examples/natsserver

and then all other apps from examples:

go run ./examples/subscribers

and

go run ./examples/publishers

also

go run ./examples/query

Developing

to work with this package you need 2 apps:

https://buf.build/docs/ and go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

Roadmap

Version Planned Features
v0.3 Snapshots
v1.0 Stable API, full pkg.go.dev docs

Development & Contribution

Pull requests are welcome!

License

Apache-2.0 © 2025 BlinkLight

Documentation

Index

Constants

View Source
const DeliverAll = 0

Variables

View Source
var CommandsPrefix = "cmds"
View Source
var EventsPrefix = "events"
View Source
var QueryPrefix = "query"

Functions

func Command

func Command(ctx context.Context, handler CommandHandler, opts ...co.Options)

Command is the main entry point for processing commands. accepts co.Options to configure the command processor. co.WithSubject - use custom subject instead of default "cmds.aggregate" co.WithAggregate - use custom aggregate name

func Event added in v0.0.14

func Event(ctx context.Context, fn ManagerEventApplier, opts ...eo.Options) error

func GetCommand

func GetCommand(aggreate, command string) any

GetCommand retrieves a command instance based on the aggregate type and command type. It checks if the aggregate and command are registered, and if so, it calls the corresponding function to create a new instance of the command type. If the aggregate or command is not registered, it returns nil. This function is useful for dynamically handling commands in a type-safe manner.

func GetEvent

func GetEvent(aggreate, event string) any

GetEvent retrieves an event instance based on the aggregate type and event type. It checks if the aggregate and event are registered, and if so, it calls the corresponding function to create a new instance of the event type. If the aggregate or event is not registered, it returns nil. This function is useful for dynamically handling events in a type-safe manner.

func JetStream

func JetStream(ctx context.Context) (nats.JetStreamContext, bool)

JetStream retrieves the JetStream context from the context.

func MustUnmarshal added in v0.0.5

func MustUnmarshal[T any](data []byte) T

MustUnmarshal unmarshals JSON data into a struct of type T. It panics if unmarshaling fails, so it should be used when you are sure that the data is valid and will not cause an error. This is useful for tests or when you want to ensure that the data is always valid.

func Nats

func Nats(ctx context.Context) (*nats.Conn, bool)

Nats retrieves the NATS connection from the context.

func Project

func Project(ctx context.Context, fn EventApplier, opts ...po.Options) error

Project subscribes to events for a specific aggregate and applies them using the provided EventApplier function. It uses JetStream to manage the event stream and durable subscriptions. The function takes a context, an EventApplier function, and optional configuration options. The configuration options allow customization of the aggregate type, aggregate ID, subject, durable name, and prefix for the subscription. po.WithSubject sets the subject for the subscription po.WithAggreate sets the aggregate type for the subscription po.WithAggrateID sets the aggregate ID for the subscription po.WithPrefix sets a prefix for the durable name po.WithDurable sets the durable name for the subscription

func PublishCommand

func PublishCommand(ctx context.Context, cmd *gen.CommandEnvelope, payload any) error

PublishCommand publishes a command to the JetStream server. It takes a context, a CommandEnvelope, and an optional payload. If the payload is not nil, it marshals the payload into JSON and sets it in the CommandEnvelope. It retrieves the JetStream context from the context and publishes the command to the subject "cmds.<AggregateType>" with the serialized CommandEnvelope as the message body. If the JetStream context is not initialized, it returns an error. The function returns an error if the publish operation fails.

func Query

func Query(ctx context.Context, fn Querier, opts ...qo.Options) error

Querier is a function type that takes a QueryEnvelope and returns a result or an error. qo.Subnect - use custom subject instead of default "query.aggregate.get" qo.Aggregate - use custom aggregate

func RegisterCommand

func RegisterCommand[T any](aggreate, command string)

RegisterCommand registers a command type for a specific aggregate. The type T should be a struct that represents the command. It creates a function that returns a new instance of T when called. This allows for dynamic creation of command instances based on the aggregate and command type.

func RegisterEvent

func RegisterEvent[T any](aggreate, event string)

RegisterEvent registers an event type for a specific aggregate. The type T should be a struct that represents the event. It creates a function that returns a new instance of T when called. This allows for dynamic creation of event instances based on the aggregate and event type.

func Replay

func Replay(ctx context.Context, fn ReplayHandler, opts ...ro.Options) error

Replay replays events for a given aggregate and aggregate ID. accepts ro.Options to configure the replay behavior. ro.WithAggregate ro.WithAggregateID - configure the aggregate and aggregate ID ro.WithSubject - use custom subject instead of default "events.aggregate.aggregateID.>" ro.WithStartSeq - start from event (if you have snapshot) ro.WtihParent - nests subjects ro.WithTimeout - timeout if no events for stream

func ReplayAndSubscribe

func ReplayAndSubscribe[T EventApplier](ctx context.Context, agg T, opts ...ro.Options) <-chan T

ReplayAndSubscribe replays events for a given aggregate and aggregate ID, and subscribes to new events. It accepts ro.Options to configure the replay behavior. ro.WithAggregate ro.WithAggregateID - configure the aggregate and aggregate ID ro.WithSubject - use custom subject instead of default "events.aggregate.aggregateID.>" ro.WithStartSeq - start from event (if you have snapshot) ro.WtihParent - nests subjects ro.WithTimeout - timeout if no events for stream

func Unmarshal

func Unmarshal[T any](data []byte) (T, error)

func UnmarshalCommand

func UnmarshalCommand(c *gen.CommandEnvelope) (any, error)

UnmarshalCommand unmarshals a JSON payload into a command instance. It takes a CommandEnvelope, checks if it has valid command type and aggregate type, and retrieves the corresponding command instance using GetCommand. If the command type or aggregate type is not registered, it returns nil. If the command instance is found, it unmarshals the JSON payload into that instance. If unmarshaling fails, it returns an error. This function is useful for converting raw command data into structured command types.

func UnmarshalEvent

func UnmarshalEvent(e *gen.EventEnvelope) (any, error)

UnmarshalEvent unmarshals a JSON payload into an event instance. It takes an EventEnvelope, checks if it has valid event type and aggregate type, and retrieves the corresponding event instance using GetEvent. If the event type or aggregate type is not registered, it returns nil. If the event instance is found, it unmarshals the JSON payload into that instance. If unmarshaling fails, it returns an error. This function is useful for converting raw event data into structured event types.

func WithJetStream

func WithJetStream(ctx context.Context, js nats.JetStreamContext) context.Context

WithJetStream adds a JetStream context to the context.

func WithNats

func WithNats(ctx context.Context, nc *nats.Conn) context.Context

WithNats adds a NATS connection to the context.

Types

type CommandHandler

type CommandHandler interface {
	Handle(m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error)
}

type CommandProcessor

type CommandProcessor struct {
	// contains filtered or unexported fields
}

type EventApplier

type EventApplier interface {
	ApplyEvent(event *gen.EventEnvelope) error
}

type ManagerEventApplier added in v0.0.14

type ManagerEventApplier interface {
	Handle(event *gen.EventEnvelope) ([]*gen.CommandEnvelope, error)
}

type Projector

type Projector interface {
	EventApplier
	Querier
}

type Querier

type Querier interface {
	Query(query *gen.QueryEnvelope) (interface{}, error)
}

type ReplayHandler

type ReplayHandler interface {
	ApplyEvent(m *gen.EventEnvelope) error
}

interface

Directories

Path Synopsis
examples
natserver command
publishers command
query command
subscribers command
gen

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL