GoEventBus

package module
v0.1.28 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2025 License: MIT Imports: 7 Imported by: 0

README

EventBus – A Lightweight Event Sourcing Library for Go

Go Report Card

EventBus is a simple and efficient event-driven system for Go applications.
It allows you to publish and subscribe to events seamlessly.


📦 Installation

To install the library, run:

go get github.com/Raezil/GoEventBus

🚀 Quick Start

1️⃣ Initialize a New Project
mkdir eventbus-demo
cd eventbus-demo
go mod init eventbus-demo
2️⃣ Create main.go
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	gbus "github.com/Raezil/GoEventBus"
	"github.com/gorilla/mux"
)

// HouseWasSold represents an event for a house sale.
type HouseWasSold struct{}

// NewDispatcher sets up event handlers.
func NewDispatcher() *gbus.Dispatcher {
	return &gbus.Dispatcher{
		HouseWasSold{}: func(data map[string]interface{}) (gbus.Result, error) {
			price, ok := data["price"].(int)
			if !ok {
				return gbus.Result{}, fmt.Errorf("invalid or missing 'price'")
			}
			message := fmt.Sprintf("House sold for %d!", price)
			log.Println(message)
			return gbus.Result{Message: message}, nil
		},
	}
}

func main() {
	dispatcher := NewDispatcher()
	eventStore := gbus.NewEventStore(dispatcher)

	router := mux.NewRouter()
	router.HandleFunc("/house-sold", func(w http.ResponseWriter, r *http.Request) {
		eventStore.Publish(gbus.NewEvent(HouseWasSold{}, map[string]interface{}{"price": 100}))
		
		if err := eventStore.Broadcast(); err != nil {
			log.Printf("Error broadcasting event: %v", err)
			http.Error(w, "Event processing failed", http.StatusInternalServerError)
			return
		}
		
		w.Header().Set("Content-Type", "application/json")
		json.NewEncoder(w).Encode(map[string]string{"status": "House sold event published"})
	})

	log.Println("Server running on :8080")
	log.Fatal(http.ListenAndServe(":8080", router))
}
3️⃣ Run Your Application
go run main.go

Now, visiting http://localhost:8080/house-sold will trigger the event and process it.


🐇 RabbitMQ Integration

1️⃣ Start RabbitMQ
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management
2️⃣ Implement Event Publishing in main.go
package main

import (
	"fmt"
	"log"
	. "github.com/Raezil/GoEventBus"
)

func NewDispatcher() *RabbitDispatcher {
	return &RabbitDispatcher{
		"HouseWasSold": func(data map[string]interface{}) (Result, error) {
			price, ok := data["price"].(float64)
			if !ok {
				return Result{}, fmt.Errorf("invalid or missing 'price'")
			}
			return Result{Message: fmt.Sprintf("House sold for %.2f", price)}, nil
		},
	}
}

func main() {
	dispatcher := NewDispatcher()
	rabbitStore, err := NewRabbitEventStore(dispatcher, "amqp://guest:guest@localhost:5672/", "events_queue")
	if err != nil {
		log.Fatalf("Failed to initialize RabbitEventStore: %v", err)
	}

	rabbitStore.Publish(&Event{Id: "12345", Projection: "HouseWasSold", Args: map[string]interface{}{"price": 100.0}})
	rabbitStore.Publish(&Event{Id: "123456", Projection: "HouseWasSold", Args: map[string]interface{}{"price": 200.0}})

	go rabbitStore.Broadcast()
	select {}
}
3️⃣ Run Your Application
go run main.go

📜 Contributing

Want to improve GoEventBus? 🚀

  1. Fork the repo
  2. Create a feature branch (git checkout -b feature-new)
  3. Commit your changes (git commit -m "Added feature")
  4. Push to your branch (git push origin feature-new)
  5. Submit a PR!

📖 References

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher map[any]func(map[string]interface{}) (Result, error)

Dispatcher maps event projections to their respective handler functions

type Event

type Event struct {
	Id         string
	Projection any
	Args       map[string]interface{}
}

func NewEvent

func NewEvent(projection any, args map[string]any) *Event

type EventStore

type EventStore struct {
	Mutex      sync.Mutex
	Dispatcher *Dispatcher
	Events     *sync.Pool
}

EventStore handles publishing and dispatching events

func NewEventStore

func NewEventStore(dispatcher *Dispatcher) *EventStore

NewEventStore initializes an EventStore with a dispatcher and an event pool

func (*EventStore) Broadcast

func (eventstore *EventStore) Broadcast() error

Broadcast locks the store and processes each event in the pool

func (*EventStore) Commit added in v0.1.11

func (eventstore *EventStore) Commit() error

Commit retrieves and processes an event from the pool

func (*EventStore) Publish

func (eventstore *EventStore) Publish(event *Event)

Publish adds an event to the event pool

type RabbitDispatcher added in v0.1.25

type RabbitDispatcher map[string]func(map[string]interface{}) (Result, error)

Dispatcher maps string representations of projections to handler functions

type RabbitEventStore added in v0.1.25

type RabbitEventStore struct {
	Mutex         sync.Mutex
	Dispatcher    *RabbitDispatcher
	RabbitConn    *amqp.Connection
	RabbitChannel *amqp.Channel
	QueueName     string
}

RabbitEventStore handles publishing and dispatching events via RabbitMQ

func NewRabbitEventStore added in v0.1.25

func NewRabbitEventStore(dispatcher *RabbitDispatcher, rabbitURL, queueName string) (*RabbitEventStore, error)

NewRabbitEventStore initializes a RabbitEventStore

func (*RabbitEventStore) Broadcast added in v0.1.25

func (store *RabbitEventStore) Broadcast(ctx context.Context)

Broadcast starts consuming events from RabbitMQ

func (*RabbitEventStore) Commit added in v0.1.25

func (store *RabbitEventStore) Commit(event *Event) error

Commit processes a single event

func (*RabbitEventStore) Publish added in v0.1.25

func (store *RabbitEventStore) Publish(event *Event)

Publish sends an event to the RabbitMQ queue

type Result added in v0.1.11

type Result struct {
	Message string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL