event

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 28, 2022 License: Apache-2.0 Imports: 5 Imported by: 1

README

Event is a concurrent Event and Error Stream Implementation for Go

Go Report Card codecov Go Reference PRs Welcome

The event package provides a concurrent Event and Error Stream Implementation for Go. It is designed using concurrency patterns and best practices to ensure that it properly functions in highly parallel environments as well as sequential.

The implementation provides a Publisher which is used to publish events as well as ReadEvents and ReadErrors to allow subscribers to access read-only channels of those events and errors.

NOTE: Publisher will only publish events when ReadEvents or ReadErrors are called to ensure that there are no wasted cycles waiting for subscribers. To that end the methods for publishing said errors/Events will return an error if the ReadEvents/Errors are not called.

Use

go get -u go.devnw.com/event
Create a Publisher
import "go.devnw.com/event"
...

publisher := NewPublisher(ctx)
defer func() {
    err := publisher.Close()
    if err != nil {
        t.Errorf("Publisher.Close() failed: %v", err)
    }
}()
Create A Handler for both the Error and Event Streams

func main() {
    ...
    publisher := NewPublisher(ctx)
    ...

    // Create a subscriber to read events and errors from the publisher
    // See `Example **Subscriber**` below for more details on possible
    // subscriber implementations.
    Subscribe(
        ctx,

        // The caller supplies the buffer to use when reading events/errors
        // this buffer is only applied on the first call as that is
        // what creates the underlying channel in the implementation.
        publisher.ReadEvents(buffer).Interface(),
        publisher.ReadErrors(buffer).Interface(),
    )

    ...
}
Example Subscriber
// This is an example of a subscriber that will handle both events and errors
// from the incoming channels. 
func Subscribe(ctx context.Context, streams ...<-chan interface{}) {
    for _, stream := range streams {
        go func(stream <-chan interface{}) {
            for {
                case <-ctx.Done():
                    return
                case e, ok := <-stream:
                    if !ok {
                        return
                    }

                    switch event := e.(type) {
                        case error:
                            // Do something with the error
                        case Event:
                            // Do something with the event
                    }
            }
        }(stream)
    }
}

Documentation

Overview

Package event provides access to a publisher for events and errors which implement the `error` or Event types. This package is intended to be used by highly concurrent implementations which require streams of events and errors rather than one-off events and errors.

Creation of Events/errors can happen using several different methods. The most common use case will likely be reading from an Event/ErrorStream that the publisher subscribes to and is supplied to the `Publisher.Events/Errors` method.

Alternatively, the `Publisher.EventFunc/ErrorFunc` methods provide support for deferred creation and rendering of event and error data for publishing. This allows for a reduction in memory allocations in the event that there are no subscribers for events or errors respectively.

Once a publisher is instantiated using the `NewPublisher` method, a user must create subscriptions to the publisher's events or errors using the `ReadErrors/ReadEvents` methods prior to publishing otherwise the events and errors will be lost and the publish methods will error.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Combo

type Combo interface {
	error
	Event
}

Combo is an interface description for types which implement both the Event interface and the error interface. Combo is treated as an error type.

type ErrorFunc

type ErrorFunc func() error

ErrorFunc provides a function signature for error publishing which allows for lower memory allocation overhead. This happens by allowing the Publisher to only execute the ErrorFunc in the event there is actually a listener for errors, rendering errors only at that time. This is a performance optimization.

type ErrorStream

type ErrorStream <-chan error

ErrorStream is a read only stream of errors.

func (ErrorStream) Interface

func (e ErrorStream) Interface() <-chan interface{}

Interface returns a read only stream of errors as empty interface types instead of errors.

type ErrorWriter

type ErrorWriter chan<- error

ErrorWriter is a write only stream of errors.

type Event

type Event interface {
	Event() string
}

Event defines an interface which can be implemented by any type which is to be published as an event. This interface is similar to how the error interface is setup such that the event supplies an Event function which returns a string.

type EventFunc

type EventFunc func() Event

EventFunc provides a function signature for event publishing which allows for lower memory allocation overhead. This happens by allowing the Publisher to only execute the EventFunc in the event there is actually a listener for events, rendering events only at that time. This is a performance optimization. nolint:revive

type EventStream

type EventStream <-chan Event

EventStream is a read only stream of events. nolint:revive

func (EventStream) Interface

func (e EventStream) Interface() <-chan interface{}

Interface returns a read only stream of events as empty interface types instead of Event types.

type EventWriter

type EventWriter chan<- Event

EventWriter is a write only stream of events. nolint:revive

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher provides the ability to publish events and errors in a thread-safe concurrent manner using best practices.

func NewPublisher

func NewPublisher(ctx context.Context) *Publisher

NewPublisher creates a new publisher for serving Event and Error streams

func (*Publisher) Close

func (p *Publisher) Close() (err error)

Close closes the event and error streams

func (*Publisher) ErrorFunc

func (p *Publisher) ErrorFunc(ctx context.Context, fn ErrorFunc)

ErrorFunc Accepts an ErrorFunc type as a parameter and executes it only if there are subscribers to the underlying error channel allowing for delayed data rendering of an error.

func (*Publisher) Errors

func (p *Publisher) Errors(ctx context.Context, errs ...ErrorStream) error

Errors accepts a number of error streams and forwards them to the publisher. (Fan-In)

func (*Publisher) EventFunc

func (p *Publisher) EventFunc(ctx context.Context, fn EventFunc)

EventFunc Accepts an EventFunc type as a parameter and executes it only if there are subscribers to the underlying event channel allowing for delayed data rendering of an event.

func (*Publisher) Events

func (p *Publisher) Events(ctx context.Context, events ...EventStream) error

Events accepts a number of event streams and forwards them to the event writer.(Fan-In)

func (*Publisher) ReadErrors

func (p *Publisher) ReadErrors(buffer int) ErrorStream

ReadErrors returns a stream of published errors

func (*Publisher) ReadEvents

func (p *Publisher) ReadEvents(buffer int) EventStream

ReadEvents returns a stream of published events

func (*Publisher) Split

func (p *Publisher) Split(ctx context.Context, in <-chan interface{}) error

Split accepts a channel of interface types and splits them into event and error streams. (Fan-Out) nolint: gocyclo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL