GoEventBus

package module
v0.1.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2024 License: MIT Imports: 6 Imported by: 0

README

EventBus lib in Golang!

> Simple event source system

This project lets you publish and subscribe events easily.

To download:

go get github.com/Raezil/GoEventBus

Quick Start

Let's make a pub/sub application:

  1. Create a project
mkdir demo
cd demo
go mod init demo
  1. Add main.go

Example 1.

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	gbus "github.com/Raezil/GoEventBus"

	"github.com/gorilla/mux"
	_ "github.com/lib/pq"
)

// HouseWasSold represents an event for when a house has been sold
type HouseWasSold struct{}

// NewDispatcher initializes the dispatcher with event handlers
func NewDispatcher() *gbus.Dispatcher {
	return &gbus.Dispatcher{
		HouseWasSold{}: func(m map[string]interface{}) (gbus.Result, error) {
			price, ok := m["price"].(int) // Match the correct key "price"
			if !ok {
				return gbus.Result{}, fmt.Errorf("price not provided or invalid")
			}
			result := fmt.Sprintf("House was sold for %d", price)
			log.Println(result)
			return gbus.Result{
				Message: result,
			}, nil
		},
	}
}

func main() {
	// Initialize dispatcher and event store
	dispatcher := NewDispatcher()
	eventstore := gbus.NewEventStore(dispatcher)

	router := mux.NewRouter()
	router.HandleFunc("/house-sold", func(w http.ResponseWriter, r *http.Request) {
		// Publish the event with the correct key "price"
		eventstore.Publish(gbus.NewEvent(
			HouseWasSold{},
			map[string]interface{}{
				"price": 100,
			},
		))

		// Broadcast the event after publishing, wait for completion
		if err := eventstore.Broadcast(); err != nil {
			log.Printf("Error broadcasting event: %v", err)
			http.Error(w, "Failed to process event", http.StatusInternalServerError)
			return
		}

		// Send response back to client
		w.Header().Set("Content-Type", "application/json")
		response := map[string]string{"status": "House sold event published"}
		json.NewEncoder(w).Encode(response)
	})

	serverAddress := ":8080"
	log.Printf("Server is listening on %s", serverAddress)
	if err := http.ListenAndServe(serverAddress, router); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

Example 2.

package main

import (
	"fmt"
	"log"

	gbus "github.com/Raezil/GoEventBus"

	_ "github.com/lib/pq"
)

// HouseWasSold represents an event for when a house has been sold
type HouseWasSold struct{}

// NewDispatcher initializes the dispatcher with event handlers
func NewDispatcher() *gbus.Dispatcher {
	return &gbus.Dispatcher{
		HouseWasSold{}: func(m map[string]interface{}) (gbus.Result, error) {
			price, ok := m["price"].(int)
			if !ok {
				return gbus.Result{}, fmt.Errorf("price not provided or invalid")
			}
			result := fmt.Sprintf("House was sold for %d", price)
			log.Println(result)
			return gbus.Result{
				Message: result,
			}, nil
		},
	}
}

func main() {
	// Initialize dispatcher and event store
	dispatcher := NewDispatcher()
	eventstore := gbus.NewEventStore(dispatcher)
	eventstore.Publish(gbus.NewEvent(
		HouseWasSold{},
		map[string]interface{}{
			"price": 100,
		},
	))

	eventstore.Publish(gbus.NewEvent(
		HouseWasSold{},
		map[string]interface{}{
			"price": 100,
		},
	))
	// Broadcast the event
	eventstore.Broadcast()

}

Example 3

run RabbitMQ

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4.0-management

then run go run main.go

package main

import (
	"fmt"
	"log"
	. "github.com/Raezil/GoEventBus"
)

func NewDispatcher() *RabbitDispatcher {
	return &RabbitDispatcher{
		"HouseWasSold": func(m map[string]interface{}) (Result, error) {
			price, ok := m["price"].(float64)
			if !ok {
				return Result{}, fmt.Errorf("price not provided or invalid")
			}

			result := fmt.Sprintf("House was sold for %.2f", price)
			return Result{Message: result}, nil
		},
	}
}

func main() {
	dispatcher := NewDispatcher()

	rabbitStore, err := NewRabbitEventStore(dispatcher, "amqp://guest:guest@localhost:5672/", "events_queue")
	if err != nil {
		log.Fatalf("Failed to initialize RabbitEventStore: %v", err)
	}

	rabbitStore.Publish(&Event{
		Id:         "12345",
		Projection: "HouseWasSold",
		Args: map[string]interface{}{
			"price": 100.0,
		},
	})
	rabbitStore.Publish(&Event{
		Id:         "123456",
		Projection: "HouseWasSold",
		Args: map[string]interface{}{
			"price": 200.0,
		},
	})

	go rabbitStore.Broadcast()

	select {}
}
  1. Get the dependency
go get github.com/Raezil/GoEventBus
  1. Run the project
go run ./

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher map[any]func(map[string]interface{}) (Result, error)

Dispatcher maps event projections to their respective handler functions

type Event

type Event struct {
	Id         string
	Projection any
	Args       map[string]interface{}
}

func NewEvent

func NewEvent(projection any, args map[string]any) *Event

type EventStore

type EventStore struct {
	Mutex      sync.Mutex
	Dispatcher *Dispatcher
	Events     *sync.Pool
}

EventStore handles publishing and dispatching events

func NewEventStore

func NewEventStore(dispatcher *Dispatcher) *EventStore

NewEventStore initializes an EventStore with a dispatcher and an event pool

func (*EventStore) Broadcast

func (eventstore *EventStore) Broadcast() error

Broadcast locks the store and processes each event in the pool

func (*EventStore) Commit added in v0.1.11

func (eventstore *EventStore) Commit() error

Commit retrieves and processes an event from the pool

func (*EventStore) Publish

func (eventstore *EventStore) Publish(event *Event)

Publish adds an event to the event pool

type RabbitDispatcher added in v0.1.25

type RabbitDispatcher map[string]func(map[string]interface{}) (Result, error)

Dispatcher maps string representations of projections to handler functions

type RabbitEventStore added in v0.1.25

type RabbitEventStore struct {
	Mutex         sync.Mutex
	Dispatcher    *RabbitDispatcher
	RabbitConn    *amqp.Connection
	RabbitChannel *amqp.Channel
	QueueName     string
}

RabbitEventStore handles publishing and dispatching events via RabbitMQ

func NewRabbitEventStore added in v0.1.25

func NewRabbitEventStore(dispatcher *RabbitDispatcher, rabbitURL, queueName string) (*RabbitEventStore, error)

NewRabbitEventStore initializes a RabbitEventStore

func (*RabbitEventStore) Broadcast added in v0.1.25

func (store *RabbitEventStore) Broadcast()

Broadcast starts consuming events from RabbitMQ

func (*RabbitEventStore) Commit added in v0.1.25

func (store *RabbitEventStore) Commit(event *Event) error

Commit processes a single event

func (*RabbitEventStore) Publish added in v0.1.25

func (store *RabbitEventStore) Publish(event *Event)

Publish sends an event to the RabbitMQ queue

type Result added in v0.1.11

type Result struct {
	Message string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL