gbus

package
v1.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//MaxRetryCount defines the max times a retry can run.
	//Default is 3 but it is configurable
	MaxRetryCount uint = 3
	//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
	//for a random retry time. Default is 10 but it is configurable.
	BaseRetryDuration = 10 * time.Millisecond
	//RPCHeaderName used to define the header in grabbit for RPC
	RPCHeaderName                  = "x-grabbit-msg-rpc-id"
	ResurrectedHeaderName          = "x-resurrected-from-death"
	FirstDeathRoutingKeyHeaderName = "x-first-death-routing-key"
)

Functions

func GetDeliveryLogEntries added in v1.1.6

func GetDeliveryLogEntries(delivery amqp.Delivery) logrus.Fields

func GetFqn

func GetFqn(obj interface{}) string

GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename

func GetMessageName added in v1.1.1

func GetMessageName(delivery amqp.Delivery) string

GetMessageName extracts the valuee of the custom x-msg-name header from an amq delivery

func GetTypeFQN

func GetTypeFQN(t reflect.Type) string

GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename

Types

type AMQPOutbox

type AMQPOutbox struct {
	SvcName string
	// contains filtered or unexported fields
}

AMQPOutbox sends messages to the amqp transport

func (*AMQPOutbox) NotifyConfirm

func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)

NotifyConfirm send an amqp notification

func (*AMQPOutbox) Post

func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)

Post implements Outbox.Send

func (*AMQPOutbox) Shutdown added in v1.1.0

func (out *AMQPOutbox) Shutdown()

Shutdown stops the outbox

type Builder

type Builder interface {
	PurgeOnStartUp() Builder
	WithDeadlettering(deadletterExchange string) Builder
	/*
		Txnl sets the bus to be transactional using a persisted saga store
		provider: mysql for mysql database
		connStr: connection string in the format of the passed in provider
	*/
	Txnl(provider, connStr string) Builder
	//WithSerializer provides the ability to plugin custom serializers
	WithSerializer(serializer Serializer) Builder
	/*
		 		WorkerNum sets the number of worker go routines consuming messages from the queue
				The default value if this option is not set is 1
	*/
	WorkerNum(workers uint, prefetchCount uint) Builder

	/*
	   WithConfirms enables publisher confirms
	*/
	WithConfirms() Builder

	//WithPolicies defines the default policies that are applied for evey outgoing amqp messge
	WithPolicies(policies ...MessagePolicy) Builder

	//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
	ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder

	//RetriesNum defines the number of retries upon error
	WithConfiguration(config BusConfiguration) Builder

	//Build the bus
	Build(svcName string) Bus

	//WithLogger set custom logger instance
	WithLogger(logger logrus.FieldLogger) Builder
}

Builder is the main interface that should be used to create an instance of a Bus

type Bus

Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus

type BusConfiguration added in v1.0.2

type BusConfiguration struct {
	MaxRetryCount     uint
	BaseRetryDuration int //TODO:Change type to uint
	OutboxCfg         OutboxConfiguration
}

BusConfiguration provides configuration passed to the bus builder

type BusMessage

type BusMessage struct {
	ID                string
	IdempotencyKey    string
	CorrelationID     string
	SagaID            string
	SagaCorrelationID string
	Semantics         Semantics /*cmd or evt*/
	Payload           Message
	PayloadFQN        string
	RPCID             string
}

BusMessage the structure that gets sent to the underlying transport

func NewBusMessage

func NewBusMessage(payload Message) *BusMessage

NewBusMessage factory method for creating a BusMessage that wraps the given payload

func NewFromDelivery added in v1.1.1

func NewFromDelivery(delivery amqp.Delivery) (*BusMessage, error)

NewFromDelivery creates a BusMessage from an amqp delivery

func (*BusMessage) GetAMQPHeaders

func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)

func (*BusMessage) GetTraceLog

func (bm *BusMessage) GetTraceLog() (fields []log.Field)

GetTraceLog returns an array of log entires containing all of the message properties

func (*BusMessage) SetFromAMQPHeaders

func (bm *BusMessage) SetFromAMQPHeaders(delivery amqp.Delivery)

SetFromAMQPHeaders convert from AMQP headers Table everything but a payload

func (*BusMessage) SetIdempotencyKey added in v1.1.6

func (bm *BusMessage) SetIdempotencyKey(idempotencyKey string)

func (*BusMessage) SetPayload

func (bm *BusMessage) SetPayload(payload Message)

SetPayload sets the payload and makes sure that Name is saved

func (*BusMessage) TargetSaga added in v1.1.3

func (bm *BusMessage) TargetSaga(sagaID string)

TargetSaga allows sending the message to a specific Saga instance

type BusSwitch

type BusSwitch interface {
	/*
		Start starts the bus, once the bus is started messages get consiumed from the queue
		and handlers get invoced.
		Register all handlers prior to calling GBus.Start()
	*/
	Start() error
	/*
		Shutdown the bus and close connection to the underlying broker
	*/
	Shutdown() error
}

BusSwitch starts and shutdowns the bus

type Deadlettering added in v1.1.0

type Deadlettering interface {
	/*
		HandleDeadletter is deprecated use RawMessageHandling.SetGlobalRawMessageHandler instead.
		This function will be removed in future grabbit releases
	*/
	HandleDeadletter(handler RawMessageHandler)
	ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue

type DefaultBus

type DefaultBus struct {
	*Safety
	*Glogged
	Outbox        TxOutbox
	PrefetchCount uint
	AmqpConnStr   string

	SvcName string

	Registrations []*Registration

	RPCHandlers map[string]MessageHandler

	HandlersLock         *sync.Mutex
	RPCLock              *sync.Mutex
	SenderLock           *sync.Mutex
	ConsumerLock         *sync.Mutex
	RegisteredSchemas    map[string]bool
	DelayedSubscriptions [][]string
	PurgeOnStartup       bool

	Glue       SagaGlue
	TxProvider TxProvider

	WorkerNum       uint
	Serializer      Serializer
	DLX             string
	DefaultPolicies []MessagePolicy
	Confirm         bool

	DbPingTimeout time.Duration
	// contains filtered or unexported fields
}

DefaultBus implements the Bus interface

func (*DefaultBus) GetHealth

func (b *DefaultBus) GetHealth() HealthCard

GetHealth implements Health.GetHealth

func (*DefaultBus) HandleDeadletter

func (b *DefaultBus) HandleDeadletter(handler RawMessageHandler)

HandleDeadletter implements Deadlettering.HandleDeadletter

func (*DefaultBus) HandleEvent

func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error

HandleEvent implements GBus.HandleEvent

func (*DefaultBus) HandleMessage

func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error

HandleMessage implements GBus.HandleMessage

func (*DefaultBus) NotifyHealth

func (b *DefaultBus) NotifyHealth(health chan error)

NotifyHealth implements Health.NotifyHealth

func (*DefaultBus) Publish

func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error

Publish implements GBus.Publish(topic, message)

func (*DefaultBus) RPC

func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)

RPC implements GBus.RPC

func (*DefaultBus) RegisterSaga

func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error

RegisterSaga impements GBus.RegisterSaga

func (*DefaultBus) ReturnDeadToQueue added in v1.1.0

func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error

ReturnDeadToQueue returns a message to its original destination

func (*DefaultBus) Send

func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error

Send implements GBus.Send(destination string, message interface{})

func (*DefaultBus) SetGlobalRawMessageHandler added in v1.1.1

func (b *DefaultBus) SetGlobalRawMessageHandler(handler RawMessageHandler)

SetGlobalRawMessageHandler implements RawMessageHandling.SetGlobalRawMessageHandler

func (*DefaultBus) Shutdown

func (b *DefaultBus) Shutdown() (shutdwonErr error)

Shutdown implements GBus.Start()

func (*DefaultBus) Start

func (b *DefaultBus) Start() error

Start implements GBus.Start()

type DeliveryInfo added in v1.1.0

type DeliveryInfo struct {
	Attempt       uint
	MaxRetryCount uint
}

DeliveryInfo provdes information as to the attempted deilvery of the invocation

type Glogged added in v1.0.3

type Glogged struct {
	// contains filtered or unexported fields
}

Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus

func (*Glogged) Log added in v1.0.3

func (gl *Glogged) Log() logrus.FieldLogger

Log returns the set default log or a new instance of a logrus.FieldLogger

func (*Glogged) SetLogger added in v1.0.3

func (gl *Glogged) SetLogger(entry logrus.FieldLogger)

SetLogger sets the default logrus.FieldLogger that should be used when logging a new message

type HandlerRegister

type HandlerRegister interface {
	/*
		HandleMessage registers a handler to a specific message type
		Use this method to register handlers for commands and reply messages
		Use the HandleEvent method to subscribe on events and register a handler
	*/
	HandleMessage(message Message, handler MessageHandler) error
	/*
		HandleEvent registers a handler for a specific message type published
		to an exchange with a specific topic
	*/
	HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}

HandlerRegister registers message handlers to specific messages and events

type Health

type Health interface {
	NotifyHealth(health chan error)
	GetHealth() HealthCard
}

Health reports om health issues in which the bus needs to be restarted

type HealthCard

type HealthCard struct {
	DbConnected        bool
	RabbitConnected    bool
	RabbitBackPressure bool
}

HealthCard that holds the health values of the bus

type Invocation

type Invocation interface {
	Logged
	Reply(ctx context.Context, message *BusMessage) error
	Bus() Messaging
	Tx() *sql.Tx
	Ctx() context.Context
	InvokingSvc() string
	Routing() (exchange, routingKey string)
	DeliveryInfo() DeliveryInfo
}

Invocation context for a specific processed message

type Logged added in v1.0.3

type Logged interface {
	SetLogger(entry logrus.FieldLogger)
	Log() logrus.FieldLogger
}

Logged represents a grabbit component that can be logged

type Message

type Message interface {
	SchemaName() string
}

Message a common interface that passes to the serializers to allow decoding and encoding of content

type MessageFilter

type MessageFilter struct {
	Exchange   string
	RoutingKey string
	MsgName    string
}

MessageFilter matches rabbitmq topic patterns

func NewMessageFilter

func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter

NewMessageFilter creates a new MessageFilter

func (*MessageFilter) Matches

func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool

Matches the passed in exchange, routingKey, msgName with the defined filter

type MessageHandler

type MessageHandler func(invocation Invocation, message *BusMessage) error

MessageHandler signature for all command handlers

func (MessageHandler) Name added in v1.1.0

func (mg MessageHandler) Name() string

Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type

type MessagePolicy

type MessagePolicy interface {
	Apply(publishing *amqp.Publishing)
}

MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..

type Messaging

type Messaging interface {
	/*
		Send a command or a command response to a specific service
		one-to-one semantics
	*/
	Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error

	/*
		Publish and event, one-to-many semantics
	*/
	Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error

	/*
		RPC calls the service passing him the request BusMessage and blocks until a reply is
		received or timeout experied.

	*/
	RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
}

Messaging interface to send and publish messages to the bus

type OutboxConfiguration added in v1.1.6

type OutboxConfiguration struct {
	/*
		Ackers the number of goroutines configured to drain incoming ack/nack signals from the broker.
		Increase this value if you are experiencing deadlocks.
		Default is 10
	*/
	Ackers uint
	//PageSize is the amount of pending messsage records the outbox selects from the database every iteration, the default is 500
	PageSize uint
	//MetricsInterval is the duration the outbox waits between each metrics report, default is 15 seconds
	MetricsInterval time.Duration
	//SendInterval is the duration the outbox waits before each iteration, default is 1 second
	SendInterval time.Duration
	/*
		ScavengeInterval is the duration the outbox waits before attempting to re-send messages that
		were already sent to the broker but were not yet confirmed.
		Default is 60 seconds
	*/
	ScavengeInterval time.Duration
}

OutboxConfiguration configures the transactional outbox

type RawMessageHandler added in v1.1.1

type RawMessageHandler func(tx *sql.Tx, delivery *amqp.Delivery) error

RawMessageHandler signature for handlers that handle raw amqp deliveries

func (RawMessageHandler) Name added in v1.1.1

func (dlmg RawMessageHandler) Name() string

Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type

type RawMessageHandling added in v1.1.1

type RawMessageHandling interface {
	/*
				SetGlobalRawMessageHandler registers a handler that gets called for each amqp.Delivery that is delivered
		        to the service queue.
		        The handler will get called with a scoped transaction that is a different transaction than the ones that
		        regular message handlers are scoped by as we want the RawMessage handler to get executed even if the amqp.Delivery
		        can not be serialized by the bus to one of the registered schemas

		        In case a bus has both a raw message handler and regular ones the bus will first call the raw message handler
		        and afterward will call any registered message handlers.
		        if the global raw handler returns an error the message gets rejected and any additional
		        handlers will not be called.
		        You should not use the global raw message handler to drive business logic as it breaks the local transactivity
		        guarantees grabbit provides and should only be used in specialized cases.
		        If you do decide to use this feature try not shooting yourself in the foot.
	*/
	SetGlobalRawMessageHandler(handler RawMessageHandler)
}

RawMessageHandling provides the ability to consume and send raq amqp messages with the transactional guarantees that the bus provides

type Registration

type Registration struct {
	Handler MessageHandler
	// contains filtered or unexported fields
}

Registration represents a message handler's registration for a given exchange, topic and msg combination

func NewRegistration

func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration

NewRegistration creates a new registration

func (*Registration) Matches

func (sub *Registration) Matches(exchange, routingKey, msgName string) bool

Matches the registration with the given xchange, routingKey, msgName

type RequestSagaTimeout

type RequestSagaTimeout interface {
	TimeoutDuration() time.Duration
	Timeout(tx *sql.Tx, bus Messaging) error
}

RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess

type Safety

type Safety struct{}

Safety provides utility methods to safly invoke methods

func (*Safety) SafeWithRetries

func (s *Safety) SafeWithRetries(funk func() error, retries uint) error

SafeWithRetries safely invoke the function with the number of retries

type Saga

type Saga interface {
	//StartedBy returns the messages that when received should create a new saga instance
	StartedBy() []Message
	/*
		RegisterAllHandlers passes in the HandlerRegister so that the saga can register
		the messages that it handles
	*/
	RegisterAllHandlers(register HandlerRegister)

	//IsComplete retruns if the saga is complete and can be discarded
	IsComplete() bool

	//New is a factory method used by the bus to crerate new instances of a saga
	New() Saga
}

Saga is the base interface for all Sagas.

type SagaConfFn

type SagaConfFn func(Saga) Saga

SagaConfFn is a function to allow configuration of a saga in the context of the gbus

type SagaGlue added in v1.1.0

type SagaGlue interface {
	SagaRegister
	Logged
	Start() error
	Stop() error
}

SagaGlue glues together all the parts needed in order to orchistrate saga instances

type SagaInvocation added in v1.1.2

type SagaInvocation interface {
	ReplyToInitiator(ctx context.Context, message *BusMessage) error
	//HostingSvc returns the svc name that is executing the service
	HostingSvc() string

	//SagaID returns the saga id of the currently invoked saga instance
	SagaID() string
}

SagaInvocation allows saga instances to reply to their creator even when not in the conext of handling the message that starts the saga. A message handler that is attached to a saga instance can safly cast the passed in invocation to SagaInvocation and use the ReplyToInitiator function to send a message to the originating service that sent the message that started the saga

type SagaRegister

type SagaRegister interface {
	RegisterSaga(saga Saga, conf ...SagaConfFn) error
}

SagaRegister registers sagas to the bus

type SagaTimeoutMessage

type SagaTimeoutMessage struct {
	SagaID string
}

SagaTimeoutMessage is the timeout message for Saga's

func (SagaTimeoutMessage) SchemaName

func (SagaTimeoutMessage) SchemaName() string

SchemaName implements gbus.Message

type Semantics

type Semantics string

Semantics reopresents the semantics of a grabbit message

const (
	//CMD represenst a messge with command semantics in grabbit
	CMD Semantics = "cmd"
	//EVT represenst a messge with event semantics in grabbit
	EVT Semantics = "evt"
	//REPLY represenst a messge with reply semantics in grabbit
	REPLY Semantics = "reply"
)

type Serializer

type Serializer interface {
	Name() string
	Encode(message Message) ([]byte, error)
	Decode(buffer []byte, schemaName string) (Message, error)
	Register(obj Message)
}

Serializer is the base interface for all message serializers

type TimeoutManager added in v1.1.0

type TimeoutManager interface {
	//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
	RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
	//ClearTimeout clears a timeout for a specific saga
	ClearTimeout(tx *sql.Tx, sagaID string) error
	//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
	SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
	//Start starts the timeout manager
	Start() error
	//Stop shuts the timeout manager down
	Stop() error
}

TimeoutManager abstracts the implementation of determining when a saga should be timed out

type TxOutbox

type TxOutbox interface {
	Logged
	Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
	Start(amqpOut *AMQPOutbox) error
	Stop() error
}

TxOutbox abstracts the transactional outgoing channel type

type TxProvider

type TxProvider interface {
	New() (*sql.Tx, error)
	Dispose()
	Ping(timeoutInSeconds time.Duration) bool
}

TxProvider provides a new Tx from the configured driver to the bus

Directories

Path Synopsis
tx

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL