notificationcenter

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 3, 2021 License: Apache-2.0 Imports: 9 Imported by: 4

README

Notificationcenter pub/sub library

Build Status Go Report Card GoDoc Coverage Status

License Apache 2.0

Publish/subscribe messaging, or pub/sub messaging, is a form of asynchronous service-to-service communication used in serverless and microservices architectures. In a pub/sub model, any message published to a topic is immediately received by all of the subscribers to the topic. Pub/sub messaging can be used to enable event-driven architectures, or to decouple applications in order to increase performance, reliability and scalability.

Library provides basic primitives to use different queue implementations behind, simplify writing pub/sub-services.

Using examples

Basic examples of usage.

import(
  nc "github.com/geniusrabbit/notificationcenter"
)
Create new publisher processor
// Create new publisher processor
eventStream, err = nats.NewPublisher(nats.WithNatsURL("nats://hostname:4222/group?topics=event"))
if err != nil {
  log.Fatal(err)
}

// Register stream processor
err = nc.Register("events", eventStream)
if err != nil {
  log.Fatal(err)
}
Send event by the notification publisher
// Send by global functions
nc.Publish(context.Background(), "events", message{title: "event 1"})

// Send by logger interface
events := nc.Publisher("events")
events.Publish(context.Background(), message{title: "event 2"})
Subscribe by the specific notification publisher
import (
  nc "github.com/geniusrabbit/notificationcenter"
  "github.com/geniusrabbit/notificationcenter/nats"
)

func main() {
  ctx := context.Background()
  events := nats.MustNewSubscriber(nats.WithTopics("events"),
    nats.WithNatsURL("nats://connection"), nats.WithGroupName(`group`))
  nc.Register("events", events)
  nc.Register("refresh", interval.NewSubscriber(time.Minute * 5))

  // Add new receiver to process the stream "events"
  nc.Subscribe("events", nc.FuncReceiver(ctx, func(msg nc.Message) error {
    fmt.Printf("%v\n", msg.Data())
    return nil
  }))

  // Add new time interval receiver to refresh the data every 5 minutes
  nc.Subscribe("refresh", nc.FuncReceiver(ctx, func(msg nc.Message) error {
    return db.Reload()
  }))

  // Run subscriber listeners
  nc.Listen(ctx)
}

TODO

  • Add support Amazon SQS queue
  • Add support Redis queue
  • Add support RabbitMQ queue
  • Add support MySQL notifications queue
  • Add support PostgreSQL notifications queue
  • Remove metrics from the queue (DEPRECATED)
  • Add support NATS & NATS stream
  • Add support kafka queue
  • Add support native GO chanels
  • Add support native GO time interval

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrInvalidRegisterParameter     = errors.New(`invalid register parameter`)
	ErrUndefinedPublisherInterface  = errors.New(`undefined publisher interface`)
	ErrUndefinedSubscriberInterface = errors.New(`undefined subscriber interface`)
	ErrInterfaceAlreadySubscribed   = errors.New("[notificationcenter] interface already subscribed")
)

Error list...

View Source
var DefaultRegistry = NewRegistry()

DefaultRegistry is global registry

Functions

func Close

func Close() error

Close notification center

func Listen

func Listen(ctx context.Context) error

Listen runs subscribers listen interface

func MultiError added in v1.3.0

func MultiError(errors ...error) error

MultiError data type

func OnClose

func OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func Publish added in v1.0.0

func Publish(ctx context.Context, name string, messages ...interface{}) error

Publish one or more messages to the pub-service

func Register

func Register(params ...interface{}) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func Subscribe

func Subscribe(ctx context.Context, name string, receiver Receiver) error

Subscribe new handler on some particular subscriber interface by name

Types

type ErrorHandler added in v1.0.0

type ErrorHandler func(msg Message, err error)

ErrorHandler type to process error value

type FuncPublisher added in v1.4.0

type FuncPublisher func(context.Context, ...interface{}) error

FuncPublisher provides custom function wrapper for the custom publisher processor

func (FuncPublisher) Publish added in v1.4.0

func (f FuncPublisher) Publish(ctx context.Context, messages ...interface{}) error

Publish method call the original custom publisher function

type FuncReceiver added in v1.0.0

type FuncReceiver func(msg Message) error

FuncReceiver implements Receiver interface for a single function

func (FuncReceiver) Receive added in v1.0.0

func (f FuncReceiver) Receive(msg Message) error

Receive message from sub-service to process it with function

type Message

type Message interface {
	// Context of the message
	Context() context.Context

	// Unical message ID (depends on transport)
	ID() string

	// Body returns message data as bytes
	Body() []byte

	// Acknowledgment of the message processing
	Ack() error
}

Message describes the access methods to the message original object

type ModelSubscriber added in v1.0.0

type ModelSubscriber struct {

	// Error handler pointer
	ErrorHandler ErrorHandler

	// Panic handler pointer
	PanicHandler PanicHandler
	// contains filtered or unexported fields
}

ModelSubscriber provedes subscibe functionality implementation

func (*ModelSubscriber) Close added in v1.0.0

func (s *ModelSubscriber) Close() error

Close all receivers if supports io.Closer interface

func (*ModelSubscriber) ProcessMessage added in v1.0.0

func (s *ModelSubscriber) ProcessMessage(msg Message) error

ProcessMessage by all receivers

func (*ModelSubscriber) Subscribe added in v1.0.0

func (s *ModelSubscriber) Subscribe(ctx context.Context, receiver Receiver) error

Subscribe new receiver to receive messages from the subsribtion

type MultiPublisher added in v1.4.0

type MultiPublisher []Publisher

MultiPublisher wrapper

func (MultiPublisher) Publish added in v1.4.0

func (p MultiPublisher) Publish(ctx context.Context, messages ...interface{}) error

Publish one or more messages to the banch of pub-services

type PanicHandler added in v1.0.0

type PanicHandler func(msg Message, recoverData interface{})

PanicHandler type to process panic action

type Publisher added in v1.0.0

type Publisher interface {
	// Publish one or more messages to the pub-service
	Publish(ctx context.Context, messages ...interface{}) error
}

Publisher pipeline base declaration

func PublisherByName added in v1.0.0

func PublisherByName(name string) Publisher

PublisherByName returns pub interface by name if exists or Nil otherwise

type Receiver added in v1.0.0

type Receiver interface {
	Receive(msg Message) error
}

Receiver describe interface of message processing

func ExtFuncReceiver added in v1.5.0

func ExtFuncReceiver(f interface{}, decs ...decoder.Decoder) Receiver

ExtFuncReceiver wraps function argument with arbitrary input data type

func ReceiverFrom added in v1.5.0

func ReceiverFrom(handler interface{}) Receiver

ReceiverFrom converts income handler type to Receiver interface

type Registry added in v1.0.0

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides functionality of access to pub/sub interfaces by string names.

func NewRegistry added in v1.0.0

func NewRegistry() *Registry

NewRegistry init new registry object

func (*Registry) Close added in v1.0.0

func (r *Registry) Close() error

Close notification center

func (*Registry) Listen added in v1.0.0

func (r *Registry) Listen(ctx context.Context) (err error)

Listen runs subscribers listen interface

func (*Registry) OnClose added in v1.0.0

func (r *Registry) OnClose() <-chan bool

OnClose event will be executed only after closing all interfaces

Usecases in the application makes subsribing for the finishing event very convinient

```go

func myDatabaseObserver() {
  <- notificationcenter.OnClose()
  // ... Do something
}

```

func (*Registry) Publish added in v1.0.0

func (r *Registry) Publish(ctx context.Context, name string, messages ...interface{}) error

Publish one or more messages to the pub-service

func (*Registry) Publisher added in v1.0.0

func (r *Registry) Publisher(name string) Publisher

Publisher returns pub interface by name if exists or Nil otherwise

func (*Registry) Register added in v1.0.0

func (r *Registry) Register(params ...interface{}) error

Register one or more Publisher or Subscriber services. As input parameters must be order of parameters {Name, interface}

Example: ```

nc.Register(
  "events", kafka.MustNewSubscriber(),
  "notifications", nats.MustNewSubscriber(),
)

```

func (*Registry) Subscribe added in v1.0.0

func (r *Registry) Subscribe(ctx context.Context, name string, receiver Receiver) error

Subscribe new handler on some particular subscriber interface by name

func (*Registry) Subscriber added in v1.0.0

func (r *Registry) Subscriber(name string) Subscriber

Subscriber returns sub interface by name if exists or Nil otherwise

type Subscriber

type Subscriber interface {
	io.Closer

	// Subscribe new receiver to receive messages from the subsribtion
	Subscribe(ctx context.Context, receiver Receiver) error

	// Start processing queue
	Listen(ctx context.Context) error
}

Subscriber provides methods of working with subscription

func SubscriberByName

func SubscriberByName(name string) Subscriber

SubscriberByName returns sub interface by name if exists or Nil otherwise

Directories

Path Synopsis
internal
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.
Package pg provides posibility to subscribe to internal postgres events.
Package pg provides posibility to subscribe to internal postgres events.
wrappers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL