GoEventBus

package module
v0.1.22 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2024 License: MIT Imports: 6 Imported by: 0

README

EventBus lib in Golang!

> Simple event source system

This project lets you publish and subscribe events easily.

To download:

go get github.com/Raezil/GoEventBus

Quick Start

Let's make a pub/sub application:

  1. Create a project
mkdir demo
cd demo
go mod init demo
  1. Add main.go

Example 1.

package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	gbus "github.com/Raezil/GoEventBus"

	"github.com/gorilla/mux"
	_ "github.com/lib/pq"
)

// HouseWasSold represents an event for when a house has been sold
type HouseWasSold struct{}

// NewDispatcher initializes the dispatcher with event handlers
func NewDispatcher() *gbus.Dispatcher {
	return &gbus.Dispatcher{
		HouseWasSold{}: func(m map[string]interface{}) (gbus.Result, error) {
			price, ok := m["price"].(int) // Match the correct key "price"
			if !ok {
				return gbus.Result{}, fmt.Errorf("price not provided or invalid")
			}
			result := fmt.Sprintf("House was sold for %d", price)
			log.Println(result)
			return gbus.Result{
				Message: result,
			}, nil
		},
	}
}

func main() {
	// Initialize dispatcher and event store
	dispatcher := NewDispatcher()
	eventstore := gbus.NewEventStore(dispatcher)

	router := mux.NewRouter()
	router.HandleFunc("/house-sold", func(w http.ResponseWriter, r *http.Request) {
		// Publish the event with the correct key "price"
		eventstore.Publish(gbus.NewEvent(
			HouseWasSold{},
			map[string]interface{}{
				"price": 100,
			},
		))

		// Broadcast the event after publishing, wait for completion
		if err := eventstore.Broadcast(); err != nil {
			log.Printf("Error broadcasting event: %v", err)
			http.Error(w, "Failed to process event", http.StatusInternalServerError)
			return
		}

		// Send response back to client
		w.Header().Set("Content-Type", "application/json")
		response := map[string]string{"status": "House sold event published"}
		json.NewEncoder(w).Encode(response)
	})

	serverAddress := ":8080"
	log.Printf("Server is listening on %s", serverAddress)
	if err := http.ListenAndServe(serverAddress, router); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

Example 2.

package main

import (
	"fmt"
	"log"

	gbus "github.com/Raezil/GoEventBus"

	_ "github.com/lib/pq"
)

// HouseWasSold represents an event for when a house has been sold
type HouseWasSold struct{}

// NewDispatcher initializes the dispatcher with event handlers
func NewDispatcher() *gbus.Dispatcher {
	return &gbus.Dispatcher{
		HouseWasSold{}: func(m map[string]interface{}) (gbus.Result, error) {
			price, ok := m["price"].(int)
			if !ok {
				return gbus.Result{}, fmt.Errorf("price not provided or invalid")
			}
			result := fmt.Sprintf("House was sold for %d", price)
			log.Println(result)
			return gbus.Result{
				Message: result,
			}, nil
		},
	}
}

func main() {
	// Initialize dispatcher and event store
	dispatcher := NewDispatcher()
	eventstore := gbus.NewEventStore(dispatcher)
	eventstore.Publish(gbus.NewEvent(
		HouseWasSold{},
		map[string]interface{}{
			"price": 100,
		},
	))

	eventstore.Publish(gbus.NewEvent(
		HouseWasSold{},
		map[string]interface{}{
			"price": 100,
		},
	))
	// Broadcast the event
	eventstore.Broadcast()

}
  1. Get the dependency
go get github.com/Raezil/GoEventBus
  1. Run the project
go run ./

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher map[any]func(map[string]interface{}) (Result, error)

Dispatcher maps event projections to their respective handler functions

type Event

type Event struct {
	Id         string
	Projection any
	Args       map[string]interface{}
}

func NewEvent

func NewEvent(projection any, args map[string]any) *Event

type EventStore

type EventStore struct {
	Mutex      sync.Mutex
	Dispatcher *Dispatcher
	Events     *sync.Pool
	RabbitMQ   *amqp.Connection
	Channel    *amqp.Channel
}

EventStore handles publishing and dispatching events

func NewEventStore

func NewEventStore(dispatcher *Dispatcher) *EventStore

NewEventStore initializes an EventStore with a dispatcher and an event pool

func (*EventStore) Broadcast

func (eventstore *EventStore) Broadcast() error

Broadcast locks the store and processes each event in the pool

func (*EventStore) BroadcastWithRabbitMQ added in v0.1.14

func (eventstore *EventStore) BroadcastWithRabbitMQ(queueName string) error

func (*EventStore) CloseRabbitMQ added in v0.1.13

func (eventstore *EventStore) CloseRabbitMQ()

func (*EventStore) Commit added in v0.1.11

func (eventstore *EventStore) Commit() error

Commit retrieves and processes an event from the pool

func (*EventStore) InitRabbitMQ added in v0.1.20

func (eventstore *EventStore) InitRabbitMQ() error

func (*EventStore) Publish

func (eventstore *EventStore) Publish(event *Event)

Publish adds an event to the event pool

func (*EventStore) PublishWithRabbitMQ added in v0.1.20

func (eventstore *EventStore) PublishWithRabbitMQ(event *Event)

type Result added in v0.1.11

type Result struct {
	Message string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL