goeventbus

package module
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 11, 2023 License: MIT Imports: 2 Imported by: 0

README

Go Report Card workflow

EventBus for Golang

Description

This is a simple implementation of an event bus in golang. Actually support:

  • publish/subscribe messaging.
  • request/reply messaging

Get Started

To start use eventbus in your project, you can run the following command.

go get github.com/StaniPetrosyan/go-eventbus

And import

import (
	goeventbus "github.com/StaniPetrosyan/go-eventbus"
)

Publish/Subscribe


var eventbus = goeventbus.NewEventBus()

address := "topic"
options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage().SetBody("Hi Topic").SetOptions(options)

eventbus.Subscribe(address, func(dc goeventbus.ConsumerContext) {
	fmt.Printf("Message %s\n", dc.Result().Data)
})

for {
	eventbus.Publish(address, message)
	time.Sleep(time.Second)
}

Request/Reply messaging


var eventbus = goeventbus.NewEventBus()

address := "topic"

eventbus.Subscribe(address, func(dc goeventbus.ConsumerContext) {
	fmt.Printf("Message %s\n", dc.Result().Data)
	dc.Reply("Hi from topic")
})
	
eventbus.Request(address, "Hi Topic", func(dc goeventbus.ConsumerContext) {
	dc.Handle(func(message Message) {
			fmt.Printf("Message %s\n", message.Data)
	})
})

Message

For publishing, you need to create a Message object using this method.

message := goeventbus.CreateMessage().SetBody("Hi Topic")

Each message can have some options:


options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage()

message.SetOptions(options)

eventBus.Publish("address", message)

In Bound Interceptor


eventbus.AddInBoundInterceptor("topic1", func(context goeventbus.InterceptonContext) {
	if (some logic)
		context.Next()
})

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerContext added in v0.5.0

type ConsumerContext struct {
	Ch chan Message
	// contains filtered or unexported fields
}

func NewConsumerContext added in v0.5.0

func NewConsumerContext(ch chan Message) ConsumerContext

func (*ConsumerContext) Handle added in v0.5.0

func (d *ConsumerContext) Handle(consume func(message Message))

func (*ConsumerContext) Reply added in v0.5.0

func (d *ConsumerContext) Reply(data any)

func (*ConsumerContext) Result added in v0.5.0

func (d *ConsumerContext) Result() Message

func (ConsumerContext) SetData added in v0.5.0

func (d ConsumerContext) SetData(msg Message) ConsumerContext

type ConsumerHandler added in v0.5.0

type ConsumerHandler struct {
	// contains filtered or unexported fields
}

func NewConsumer added in v0.5.0

func NewConsumer(address string, callback func(context ConsumerContext)) ConsumerHandler

func (ConsumerHandler) Chain added in v0.5.0

func (h ConsumerHandler) Chain() chan Message

func (ConsumerHandler) Closed added in v0.5.0

func (h ConsumerHandler) Closed() bool

func (ConsumerHandler) Context added in v0.5.0

func (h ConsumerHandler) Context() ConsumerContext

func (ConsumerHandler) Handle added in v0.5.0

func (h ConsumerHandler) Handle(wg *sync.WaitGroup)

type Context added in v0.5.0

type Context interface {
	Result() Message
}

type DefaultEventBus

type DefaultEventBus struct {
	// contains filtered or unexported fields
}

func (*DefaultEventBus) AddInBoundInterceptor added in v0.5.0

func (e *DefaultEventBus) AddInBoundInterceptor(address string, callback func(context InterceptorContext))

func (*DefaultEventBus) Publish

func (e *DefaultEventBus) Publish(address string, message Message)

func (*DefaultEventBus) Request

func (e *DefaultEventBus) Request(address string, message Message, callback func(context ConsumerContext))

func (*DefaultEventBus) Subscribe

func (e *DefaultEventBus) Subscribe(address string, callback func(context ConsumerContext))

type EventBus

type EventBus interface {
	Subscribe(address string, callback func(context ConsumerContext))
	AddInBoundInterceptor(address string, callback func(context InterceptorContext))
	Publish(address string, message Message)
	Request(address string, message Message, callback func(context ConsumerContext))
}

func NewEventBus

func NewEventBus() EventBus

type Handler

type Handler interface {
	Handle(wg *sync.WaitGroup)
	Chain() chan Message
	Closed() bool
}

type InterceptorContext added in v0.5.0

type InterceptorContext struct {
	// contains filtered or unexported fields
}

func NewInterceptorContext added in v0.5.0

func NewInterceptorContext(topic *Topic) InterceptorContext

func (*InterceptorContext) Next added in v0.5.0

func (d *InterceptorContext) Next()

func (*InterceptorContext) Result added in v0.5.0

func (d *InterceptorContext) Result() Message

func (InterceptorContext) SetData added in v0.5.0

type InterceptorHandler added in v0.5.0

type InterceptorHandler struct {
	// contains filtered or unexported fields
}

func NewInterceptor added in v0.5.0

func NewInterceptor(address string, callback func(context InterceptorContext), context InterceptorContext) InterceptorHandler

func (InterceptorHandler) Chain added in v0.5.0

func (h InterceptorHandler) Chain() chan Message

func (InterceptorHandler) Closed added in v0.5.0

func (h InterceptorHandler) Closed() bool

func (InterceptorHandler) Context added in v0.5.0

func (InterceptorHandler) Handle added in v0.5.0

func (h InterceptorHandler) Handle(wg *sync.WaitGroup)

type Message

type Message struct {
	Data    interface{}
	Options MessageOptions
}

func CreateMessage added in v0.5.0

func CreateMessage() Message

func (Message) SetBody added in v0.5.0

func (m Message) SetBody(data interface{}) Message

func (Message) SetOptions added in v0.5.0

func (m Message) SetOptions(options MessageOptions) Message

func (Message) ToJson

func (m Message) ToJson() ([]byte, error)

type MessageOptions

type MessageOptions struct {
	// contains filtered or unexported fields
}

func NewMessageOptions

func NewMessageOptions() MessageOptions

func (MessageOptions) AddHeader

func (op MessageOptions) AddHeader(key string, value string) MessageOptions

func (MessageOptions) Header added in v0.5.0

func (op MessageOptions) Header(key string) string

func (MessageOptions) SetHeader

func (op MessageOptions) SetHeader(headers map[string]string) MessageOptions

type Publisher added in v0.5.0

type Publisher struct {
}

func (*Publisher) Publish added in v0.5.0

func (p *Publisher) Publish()

type Topic added in v0.5.0

type Topic struct {
	Address      string
	Consumers    []Handler
	Interceptors []Handler
}

func NewTopic added in v0.5.0

func NewTopic(address string) *Topic

func (*Topic) AddConsumer added in v0.5.0

func (t *Topic) AddConsumer(callback func(context ConsumerContext)) ConsumerHandler

func (*Topic) AddInterceptor added in v0.5.0

func (t *Topic) AddInterceptor(callback func(context InterceptorContext)) InterceptorHandler

func (*Topic) GetChannels added in v0.5.0

func (t *Topic) GetChannels() []chan Message

func (*Topic) GetHandlers added in v0.5.0

func (t *Topic) GetHandlers() []Handler

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL