glocbus

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: MIT Imports: 11 Imported by: 0

README

glocbus

Glocbus is an event bus and a framework to publish events within a Golang application. Based on CloudEvent and OpenTelemetry, Glocbus aims to be fully observable and provide end-to-end traceability for propagated events:

image

Run the demo

An example of an application that uses Glocbus to publish and consume heartbeat events can be found under the example folder.

The easiest way to run the demo and witness how Glocbus fulfills its promise of end-to-end event traceability is to use the docker-compose file. Two containers will be started: One that will run the example and another one that will run a all-in-one jaeger instance (monitoring).

docker-compose up

Once containers have been started, you can open a browser and open the Jaeger UI at http://localhost:16686 to inspect received traces.

Getting started

Here are the high level steps to use Glocbus:

  1. Implement an event source like the one from the example.
  2. Register the event source to the event bus and start it.
  3. Use the event bus to subscribe to the event source.

Please take a look at the example application which shows how to implement and use all involved components. The example uses the uber.fx framework to manage the application lifecycle and the dependency injection (see providers).

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type EventBusInterface

type EventBusInterface interface {
	// # Description
	//
	// Register a new source of events, bind it to the provided propagator and start the
	// propagator. Optionally, the event source will be started after the propagator.
	//
	// # Inputs
	//
	//	- id: Unique ID for the event source.
	//	- source: Event source to register. The source can be already started when provided.
	//	- propagator: Propagator to use to propagate events to subscribers. Must not be started.
	//    A separate propagator is expected to be used for each source.
	//	- startSource: Indicates whether source should be started after starting the propagator.
	//	- description: Optional user defined struct used to provide additional information about
	//    the event source. Can be nil.
	//
	// # Return
	//
	// An error if the event source could not be registered. Possible causes are:
	//	- The provided ID is not unique
	//	- The provided propagator is already started.
	//	- In case startSource is true, the provided source fails to start.
	RegisterEventSource(
		ctx context.Context,
		id string,
		source EventSourceInterface,
		propagator propagators.EventPropagatorInterface,
		startSource bool,
		description EventSourceDescription,
	) error
	// # Description
	//
	// Subscribe to a source of event identified by the provided ID. The provided channel will be
	// used by the propagator to publish events from the event source.
	//
	// # Implementation requirements & hints
	//
	//   - The channel used to push new subscribers to propagator can be closed by the propagator
	//     when it is stopped or when the event source is stopped. The event bus must handle the
	//     case when it writes to a closed channel. In this case, the event source must be removed
	//     from the list of event sources and an error must be returned to subscriber.
	//
	//   - The choice to use blocking or non-blocking write to propagate events is left to the
	//     propagator. The same thing applies to scaling: It is up to the propagator implementation
	//     to make clear statements about how it scales, handles congestion, ...
	//
	// # Inputs
	//
	//   - ctx: Context used for stracing purpose.
	//   - id: ID of the event source
	//   - name: Name defined by the subscriber to identify itself. It is not required to be unique.
	//   - subscriber: Channel provided by the subscriber to receive events from the source.
	//
	// # Return
	//
	// An error when subscription failed. Possible causes are:
	//   - The event source does not exist
	//   - The event source has been closed.
	//   - The provided context has expired before subscription is complete.
	//
	// In the two later case, the method will close the provided channel.
	SubscribeEventSource(
		ctx context.Context,
		id string,
		name string,
		subscriber chan event.Event,
	) error
	// # Description
	//
	// List all currently available event sources.
	//
	// # Return
	//
	// The list of currently available event sources.
	ListEventSources() []EventSourceInformation
}

Interface for an event bus that coordinates event sources, event propagators and subscribers.

type EventSourceDescription

type EventSourceDescription any

Can be used by the users to provide more information about an event source

type EventSourceInformation

type EventSourceInformation struct {
	// ID for the source of events
	Id string
	// Source of events
	Source EventSourceInterface
	// Propagator used to propagate events from the source to subscribers
	Propagator propagators.EventPropagatorInterface
	// OPtional, user defined additional information about the event source.
	Description EventSourceDescription
	// contains filtered or unexported fields
}

Structure which holds all data and components used by the event bus to an manage event source.

type EventSourceInterface

type EventSourceInterface interface {
	// # Description
	//
	// Provide the channel used by the event source to publish its events.
	//
	// # Implementation requirements & hints
	//
	//	- The method must return the same channel instance to all callers.
	//	- The source must provide its publication channel even if it has not been started yet.
	//	- The source must continue providing the same channel even if the source has been stopped.
	//	- To achieve end to end event traceability, events are expected to embed tracing data as
	//    described in the following documentation:
	//    https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/extensions/distributed-tracing.md
	//
	// # Return
	//
	// The channel used by this source to publish its events.
	GetChannel() chan event.Event
	// # Description
	//
	// Start the event source that will start publishing events on its publication channel.
	//
	// # Implementation requirements & hints
	//
	//	- The method must return an error if the source has already been started.
	//	- The method must return an error if the provided context has been canceled or has expired.
	//	- The method must return an error if the source has been stopped (no restart).
	//	- The method must close the publication channel.
	//
	// # Inputs
	//
	//	- ctx: Context used for tracing purpose.
	//
	// # Return
	//
	// An error if the event source could not be started.
	Start(ctx context.Context) error
	// # Description
	//
	// Stop the event source and close the publication channel.
	//
	// # Implementation requirements & hints
	//
	//	- The method must return an error if the source has not been started.
	//	- The method must return an error if the source has already been stopped.
	//	- The method must close the publication channel even if the underlying event source could
	//    not properly closed.
	//	- The closed publication channel must be kept: source must not be restarted (stale source).
	//
	// # Inputs
	//
	//	- ctx: Context used for tracing purpose.
	//
	// # Return
	//
	// An error if the event source could not be closed.
	Stop(ctx context.Context) error
}

Interface for an event source.

type EventSourceMock

type EventSourceMock struct {
	mock.Mock
}

Mock for an event source.

func (*EventSourceMock) GetChannel

func (mock *EventSourceMock) GetChannel() chan event.Event

Description

Provide the channel used by the event source to publish its events.

Implementation requirements & hints

Return

The channel used by this source to publish its events.

func (*EventSourceMock) Start

func (mock *EventSourceMock) Start(ctx context.Context) error

Description

Start the event source that will start publishing events on its publication channel.

Implementation requirements & hints

  • The method must return an error if the source has already been started.
  • The method must return an error if the provided context has been canceled or has expired.
  • The method must return an error if the source has been stopped (no restart).
  • The method must close the publication channel.

Inputs

  • ctx: Context used for tracing purpose.

Return

An error if the event source could not be started.

func (*EventSourceMock) Stop

func (mock *EventSourceMock) Stop(ctx context.Context) error

Description

Stop the event source and close the publication channel.

Implementation requirements & hints

  • The method must return an error if the source has not been started.
  • The method must return an error if the source has already been stopped.
  • The method must close the publication channel even if the underlying event source could not properly closed.
  • The closed publication channel must be kept: source must not be restarted (stale source).

Inputs

  • ctx: Context used for tracing purpose.

Return

An error if the event source could not be closed.

type Glocbus

type Glocbus struct {
	// contains filtered or unexported fields
}

Event-bus implementation that amanges event sources and propagators to relay events in a publish-subscribe fashion.

func NewGlocbus

func NewGlocbus(tracerProvider trace.TracerProvider) *Glocbus

Description

Build and return a new event bus without any event source set.

Inputs

  • tracerProvider: Tracer provider ot use to get the tracer used to instrument code. If nil, the global tracer provider will be used.

Return

A new Glocbus without any source.

func (*Glocbus) ListEventSources

func (bus *Glocbus) ListEventSources() []EventSourceInformation

Description

List all currently available event sources.

Return

The list of currently available event sources.

func (*Glocbus) RegisterEventSource

func (bus *Glocbus) RegisterEventSource(
	ctx context.Context,
	id string,
	source EventSourceInterface,
	propagator propagators.EventPropagatorInterface,
	startSource bool,
	description EventSourceDescription,
) error

Description

Register a new source of events, bind it to the provided propagator and start the propagator. Optionally, the event source will be started after the propagator.

Inputs

  • ctx: Context used for tracing purpose.
  • id: Unique ID for the event source.
  • source: Event source to register. The source can be already started when provided.
  • propagator: Propagator to use to propagate events to subscribers. Must not be started. A separate propagator is expected to be used for each source.
  • startSource: Indicates whether source should be started after starting the propagator.
  • description: Optional user defined struct used to provide additional information about the event source. Can be nil.

Return

An error if the event source could not be registered. Possible causes are:

  • The provided ID is not unique
  • The provided propagator is already started.
  • In case startSource is true, the provided source fails to start.

func (*Glocbus) SubscribeEventSource

func (bus *Glocbus) SubscribeEventSource(
	ctx context.Context,
	id string,
	name string,
	subscriber chan event.Event,
) error

Description

Subscribe to a source of event identified by the provided ID. The provided channel will be used by the propagator to publish events from the event source.

Implementation requirements & hints

  • The channel used to push new subscribers to propagator can be closed by the propagator when it is stopped or when the event source is stopped. The event bus must handle the case when it writes to a closed channel. In this case, the event source must be removed from the list of event sources and an error must be returned to subscriber.

  • The choice to use blocking or non-blocking write to propagate events is left to the propagator. The same thing applies to scaling: It is up to the propagator implementation to make clear statements about how it scales, handles congestion, ...

Inputs

  • ctx: Context used for stracing purpose.
  • id: ID of the event source
  • name: Name defined by the subscriber to identify itself. It is not required to be unique.
  • subscriber: Channel provided by the subscriber to receive events from the source.

Return

An error when subscription failed. Possible causes are:

  • The event source does not exist
  • The event source has been closed.
  • The provided context has expired before subscription is complete.

In the two later case, the method will close the provided channel.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL