gbus

package
v0.0.0-...-a50e536 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2019 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (

	//MaxRetryCount defines the max times a retry can run
	MaxRetryCount uint = 3
)

Functions

func GetFqn

func GetFqn(obj interface{}) string

GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename

func GetTypeFQN

func GetTypeFQN(t reflect.Type) string

GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename

Types

type AMQPOutbox

type AMQPOutbox struct {
	// contains filtered or unexported fields
}

AMQPOutbox sends messages to the amqp transport

func (*AMQPOutbox) NotifyConfirm

func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)

NotifyConfirm send an amqp notification

func (*AMQPOutbox) Post

func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)

Post implements Outbox.Send

type Builder

type Builder interface {
	PurgeOnStartUp() Builder
	WithDeadlettering(deadletterExchange string) Builder
	/*
		Txnl sets the bus to be transactional using a persisted saga store
		provider: pg for PostgreSQL
		connStr: connection string in the format of the passed in provider
	*/
	Txnl(provider, connStr string) Builder
	//WithSerializer provides the ability to plugin custom serializers
	WithSerializer(serializer Serializer) Builder
	/*
		 		WorkerNum sets the number of worker go routines consuming messages from the queue
				The default value if this option is not set is 1
	*/
	WorkerNum(workers uint, prefetchCount uint) Builder

	/*
	   WithConfirms enables publisher confirms
	*/
	WithConfirms() Builder

	//WithPolicies defines the default policies that are applied for evey outgoing amqp messge
	WithPolicies(policies ...MessagePolicy) Builder

	//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
	ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder

	//Build the bus
	Build(svcName string) Bus
}

Builder is the main interface that should be used to create an instance of a Bus

type Bus

Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus

type BusMessage

type BusMessage struct {
	ID                string
	CorrelationID     string
	SagaID            string
	SagaCorrelationID string
	Semantics         string /*cmd or evt*/
	Payload           Message
	PayloadFQN        string
	RPCID             string
}

BusMessage the structure that gets sent to the underlying transport

func NewBusMessage

func NewBusMessage(payload Message) *BusMessage

NewBusMessage factory method for creating a BusMessage that wraps the given payload

func NewFromAMQPHeaders

func NewFromAMQPHeaders(headers amqp.Table) *BusMessage

NewFromAMQPHeaders creates a BusMessage from headers of an amqp message

func (*BusMessage) GetAMQPHeaders

func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)

GetAMQPHeaders convert to AMQP headers Table everything but a payload

func (*BusMessage) SetFromAMQPHeaders

func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table)

SetFromAMQPHeaders convert from AMQP headers Table everything but a payload

func (*BusMessage) SetPayload

func (bm *BusMessage) SetPayload(payload Message)

SetPayload sets the payload and makes sure that Name is saved

type BusSwitch

type BusSwitch interface {
	/*
		Start starts the bus, once the bus is started messages get consiumed from the queue
		and handlers get invoced.
		Register all handlers prior to calling GBus.Start()
	*/
	Start() error
	/*
		Shutdown the bus and close connection to the underlying broker
	*/
	Shutdown() error
}

BusSwitch starts and shutdowns the bus

type DefaultBus

type DefaultBus struct {
	*Safety
	Outgoing      *AMQPOutbox
	Outbox        TxOutbox
	PrefetchCount uint
	AmqpConnStr   string

	AMQPChannel *amqp.Channel

	SvcName string

	Registrations []*Registration

	RPCHandlers map[string]MessageHandler

	HandlersLock         *sync.Mutex
	RPCLock              *sync.Mutex
	SenderLock           *sync.Mutex
	ConsumerLock         *sync.Mutex
	RegisteredSchemas    map[string]bool
	DelayedSubscriptions [][]string
	PurgeOnStartup       bool

	Glue            SagaRegister
	TxProvider      TxProvider
	IsTxnl          bool
	WorkerNum       uint
	Serializer      Serializer
	DLX             string
	DefaultPolicies []MessagePolicy
	Confirm         bool

	DbPingTimeout time.Duration
	// contains filtered or unexported fields
}

DefaultBus implements the Bus interface

func (*DefaultBus) GetHealth

func (b *DefaultBus) GetHealth() HealthCard

GetHealth implements Health.GetHealth

func (*DefaultBus) HandleDeadletter

func (b *DefaultBus) HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)

HandleDeadletter implements GBus.HandleDeadletter

func (*DefaultBus) HandleEvent

func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error

HandleEvent implements GBus.HandleEvent

func (*DefaultBus) HandleMessage

func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error

HandleMessage implements GBus.HandleMessage

func (*DefaultBus) NotifyHealth

func (b *DefaultBus) NotifyHealth(health chan error)

NotifyHealth implements Health.NotifyHealth

func (*DefaultBus) Publish

func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error

Publish implements GBus.Publish(topic, message)

func (*DefaultBus) RPC

func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)

RPC implements GBus.RPC

func (*DefaultBus) RegisterSaga

func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error

RegisterSaga impements GBus.RegisterSaga

func (*DefaultBus) Send

func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error

Send implements GBus.Send(destination string, message interface{})

func (*DefaultBus) Shutdown

func (b *DefaultBus) Shutdown() (shutdwonErr error)

Shutdown implements GBus.Start()

func (*DefaultBus) Start

func (b *DefaultBus) Start() error

Start implements GBus.Start()

type HandlerRegister

type HandlerRegister interface {
	/*
		HandleMessage registers a handler to a specific message type
		Use this method to register handlers for commands and reply messages
		Use the HandleEvent method to subscribe on events and register a handler
	*/
	HandleMessage(message Message, handler MessageHandler) error
	/*
		HandleEvent registers a handler for a specific message type published
		to an exchange with a specific topic
	*/
	HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}

HandlerRegister registers message handlers to specific messages and events

type Health

type Health interface {
	NotifyHealth(health chan error)
	GetHealth() HealthCard
}

Health reports om health issues in which the bus needs to be restarted

type HealthCard

type HealthCard struct {
	DbConnected        bool
	RabbitConnected    bool
	RabbitBackPressure bool
}

HealthCard that holds the health values of the bus

type Invocation

type Invocation interface {
	Reply(ctx context.Context, message *BusMessage) error
	Bus() Messaging
	Tx() *sql.Tx
	Ctx() context.Context
	Routing() (exchange, routingKey string)
}

Invocation context for a specific processed message

type Message

type Message interface {
	SchemaName() string
}

Message a common interface that passes to the serializers to allow decoding and encoding of content

type MessageFilter

type MessageFilter struct {
	Exchange   string
	RoutingKey string
	MsgName    string
}

func NewMessageFilter

func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter

func (*MessageFilter) Matches

func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool

type MessageHandler

type MessageHandler func(invocation Invocation, message *BusMessage) error

MessageHandler signature for all command handlers

type MessagePolicy

type MessagePolicy interface {
	Apply(publishing *amqp.Publishing)
}

MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..

type Messaging

type Messaging interface {
	/*
		Send a command or a command response to a specific service
		one-to-one semantics
	*/
	Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error

	/*
		Publish and event, one-to-many semantics
	*/
	Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error

	/*
		RPC calls the service passing him the request BusMessage and blocks until a reply is
		received or timeout experied.

	*/
	RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
}

Messaging interface to send and publish messages to the bus

type RegisterDeadletterHandler

type RegisterDeadletterHandler interface {
	HandleDeadletter(handler func(tx *sql.Tx, poision amqp.Delivery) error)
}

RegisterDeadletterHandler provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue

type Registration

type Registration struct {
	Handler MessageHandler
	// contains filtered or unexported fields
}

func NewRegistration

func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration

func (*Registration) Matches

func (sub *Registration) Matches(exchange, routingKey, msgName string) bool

type RequestSagaTimeout

type RequestSagaTimeout interface {
	TimeoutDuration() time.Duration
	Timeout(invocation Invocation, message *BusMessage) error
}

RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess

type Safety

type Safety struct{}

Safety provides utility methods to safly invoke methods

func (*Safety) SafeWithRetries

func (s *Safety) SafeWithRetries(funk func() error, retries uint) error

SafeWithRetries safely invoke the function with the number of retries

type Saga

type Saga interface {
	//StartedBy returns the messages that when received should create a new saga instance
	StartedBy() []Message
	/*
		RegisterAllHandlers passes in the HandlerRegister so that the saga can register
		the messages that it handles
	*/
	RegisterAllHandlers(register HandlerRegister)

	//IsComplete retruns if the saga is complete and can be discarded
	IsComplete() bool

	//New is a factory method used by the bus to crerate new instances of a saga
	New() Saga
}

Saga is the base interface for all Sagas.

type SagaConfFn

type SagaConfFn func(Saga) Saga

SagaConfFn is a function to allow configuration of a saga in the context of the gbus

type SagaRegister

type SagaRegister interface {
	RegisterSaga(saga Saga, conf ...SagaConfFn) error
}

SagaRegister registers sagas to the bus

type SagaTimeoutMessage

type SagaTimeoutMessage struct {
	SagaID string
}

SagaTimeoutMessage is the timeout message for Saga's

func (SagaTimeoutMessage) SchemaName

func (SagaTimeoutMessage) SchemaName() string

SchemaName implements gbus.Message

type Serializer

type Serializer interface {
	Name() string
	Encode(message Message) ([]byte, error)
	Decode(buffer []byte, schemaName string) (Message, error)
	Register(obj Message)
}

Serializer is the base interface for all message serializers

type TxOutbox

type TxOutbox interface {
	Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
	Start(amqpOut *AMQPOutbox) error
	Stop() error
}

TxOutbox abstracts the transactional outgoing channel type

type TxProvider

type TxProvider interface {
	New() (*sql.Tx, error)
	Dispose()
	Ping(timeoutInSeconds time.Duration) bool
}

TxProvider provides a new Tx from the configured driver to the bus

Directories

Path Synopsis
tx
pg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL