goeventbus

package module
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2024 License: MIT Imports: 3 Imported by: 0

README

Go Report Card codecov Go Reference workflow

EventBus for Golang

Description

This is a simple implementation of an event bus in golang. Actually support:

  • publish/subscribe messaging.

Get Started

To start use eventbus in your project, you can run the following command.

go get github.com/stanipetrosyan/go-eventbus

And import

import (
	goeventbus "github.com/stanipetrosyan/go-eventbus"
)

Publish/Subscribe

Simple example of publish/subscribe pattern.


var eventbus = goeventbus.NewEventBus()

address := "topic"
options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage().SetBody("Hi Topic").SetOptions(options)

eventbus.Channel(address).Subscriber().Listen(func(dc goeventbus.Context) {
	fmt.Printf("Message %s\n", dc.Result().Data)
})

eventbus.Channel(address).Publisher().Publish(message)

Message

For publishing, you need to create a Message object using this method.

message := goeventbus.CreateMessage().SetBody("Hi Topic")

Each message can have some options:


options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage()

message.SetOptions(options)

eventBus.Channel("address").Publisher().Publish(message)

Processor

A processor works like a middleware, in fact forwards messages only if the predicate is satisfied. The method accepts a function with message and return must return a boolean.


eventbus.Channel("topic1").Processor(func(message goeventbus.Message) bool {
	return message.Options.Headers().Contains("header")
})

Network Bus

A Network bus create a tcp connection between different services.

NetworkBus is a wrapper of local eventbus.

A simple server/client example is in examples/networkbus directory.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.6.0

type Channel interface {
	Publisher() Publisher
	Subscriber() Subscriber
	Processor(predicate func(message Message) bool) Channel
}

func NewChannel added in v0.6.0

func NewChannel(address string) Channel

type Client added in v0.7.0

type Client interface {
	Connect()
}

func NewClient added in v0.7.0

func NewClient(address string, eventbus EventBus) Client

type Context

type Context interface {
	Result() Message
}

func NewConsumerContextWithMessage added in v0.6.0

func NewConsumerContextWithMessage(message Message) Context

type DefaultEventBus

type DefaultEventBus struct {
	// contains filtered or unexported fields
}

func (*DefaultEventBus) Channel added in v0.6.0

func (e *DefaultEventBus) Channel(address string) Channel

type EventBus

type EventBus interface {
	Channel(adress string) Channel
}

func NewEventBus

func NewEventBus() EventBus

type Headers added in v0.7.0

type Headers struct {
	// contains filtered or unexported fields
}

func NewHeaders added in v0.7.0

func NewHeaders() Headers

func (Headers) Add added in v0.7.0

func (h Headers) Add(key string, value string) Headers

func (Headers) Contains added in v0.7.0

func (h Headers) Contains(key string) bool

func (Headers) Header added in v0.7.0

func (h Headers) Header(key string) string

type Message

type Message struct {
	Data    interface{}
	Options MessageOptions
}

func CreateMessage

func CreateMessage() Message

func (Message) SetBody

func (m Message) SetBody(data interface{}) Message

func (Message) SetOptions

func (m Message) SetOptions(options MessageOptions) Message

type MessageOptions

type MessageOptions struct {
	// contains filtered or unexported fields
}

func NewMessageOptions

func NewMessageOptions() MessageOptions

func (MessageOptions) Headers added in v0.7.0

func (op MessageOptions) Headers() Headers

func (MessageOptions) SetHeaders added in v0.7.0

func (op MessageOptions) SetHeaders(headers Headers) MessageOptions

type NetworkBus added in v0.7.0

type NetworkBus interface {
	Server() Server
	Client() Client
}

func NewNetworkBus added in v0.7.0

func NewNetworkBus(bus EventBus, address, path string) NetworkBus

type Processor added in v0.7.0

type Processor interface {
	// contains filtered or unexported methods
}

func NewProcessor added in v0.7.0

func NewProcessor() Processor

func NewProcessorWithPredicate added in v0.7.0

func NewProcessorWithPredicate(predicate func(message Message) bool) Processor

type Publisher

type Publisher interface {
	Publish(message Message)
}

func NewPublisher added in v0.6.0

func NewPublisher(ch chan Message) Publisher

type Request added in v0.7.0

type Request struct {
	Channel string  `json:"channel"`
	Message Message `json:"message"`
}

type Server added in v0.7.0

type Server interface {
	Listen() (Server, error)
	Publish(channel string, message Message)
}

func NewServer added in v0.7.0

func NewServer(address string) Server

type Subscriber added in v0.6.0

type Subscriber interface {
	Listen(consumer func(context Context))
}

func NewSubscriber added in v0.6.0

func NewSubscriber(ch chan Message) Subscriber

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL