extensions

package
v0.0.4-vb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2024 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Index

Constants

View Source
const Prefix = "asyncapi-"

Prefix is the prefix used for all context keys in order to avoid collision with other keys that can be present in context.

Variables

View Source
var (
	// ErrAsyncAPI is the generic error for AsyncAPI generated code.
	ErrAsyncAPI = errors.New("error when using AsyncAPI")

	// ErrContextCanceled is given when a given context is canceled.
	ErrContextCanceled = fmt.Errorf("%w: context canceled", ErrAsyncAPI)

	// ErrNilBrokerController is raised when a nil broker controller is user.
	ErrNilBrokerController = fmt.Errorf("%w: nil broker controller has been used", ErrAsyncAPI)

	// ErrNilAppSubscriber is raised when a nil app subscriber is used (asyncapiv2 only).
	ErrNilAppSubscriber = fmt.Errorf("%w: nil app subscriber has been used", ErrAsyncAPI)

	// ErrNilUserSubscriber is raised when a nil user subscriber is used (asyncapiv2 only).
	ErrNilUserSubscriber = fmt.Errorf("%w: nil user subscriber has been used", ErrAsyncAPI)

	// ErrAlreadySubscribedChannel is raised when a subscription is done twice
	// or more without unsubscribing.
	ErrAlreadySubscribedChannel = fmt.Errorf("%w: the channel has already been subscribed", ErrAsyncAPI)

	// ErrSubscriptionCanceled is raised when expecting something and the subscription has been canceled before it happens.
	ErrSubscriptionCanceled = fmt.Errorf("%w: the subscription has been canceled", ErrAsyncAPI)

	// ErrNoCorrelationIDSet is raised when a correlation ID is expected, but none is detected.
	ErrNoCorrelationIDSet = fmt.Errorf("%w: no correlation ID but one is expected", ErrAsyncAPI)

	// ErrChannelAddressEmpty is raised when a given channel address is empty,
	// when dynamically set from message.
	ErrChannelAddressEmpty = fmt.Errorf("%w: channel address empty", ErrAsyncAPI)
)

Functions

func IfContextNotSetWith

func IfContextNotSetWith[T any](ctx context.Context, key ContextKey, fn func())

IfContextNotSetWith executes the function if the key is not set in the context.

func IfContextSetWith

func IfContextSetWith[T any](ctx context.Context, key ContextKey, fn func(value T))

IfContextSetWith executes the function if the key is set in the context.

func IfContextValueEquals

func IfContextValueEquals[T comparable](ctx context.Context, key ContextKey, expected T, fn func())

IfContextValueEquals executes the function if the key is set in the context as a given type and the value is equal to the expected value.

Types

type AcknowledgeableBrokerMessage

type AcknowledgeableBrokerMessage struct {
	BrokerMessage
	// contains filtered or unexported fields
}

AcknowledgeableBrokerMessage is the struct that embeds BrokerMessage and provide a BrokerAcknowledgment to acknowledge a message to the broker depending on the implementation. AcknowledgeableBrokerMessage make sure that only one acknowledgement is sent to the broker.

func NewAcknowledgeableBrokerMessage

func NewAcknowledgeableBrokerMessage(
	bm BrokerMessage,
	acknowledgment BrokerAcknowledgment,
) AcknowledgeableBrokerMessage

NewAcknowledgeableBrokerMessage return a new AcknowledgeableBrokerMessage from BrokerMessage and BrokerAcknowledgment.

func (*AcknowledgeableBrokerMessage) Ack

func (bm *AcknowledgeableBrokerMessage) Ack()

Ack will call the AckMessage of the underlying BrokerAcknowledgment implementation if the message was not already acked.

func (*AcknowledgeableBrokerMessage) Nak

func (bm *AcknowledgeableBrokerMessage) Nak()

Nak will call the NakMessage of the underlying BrokerAcknowledgment implementation if the message was not already acked.

type BrokerAcknowledgment

type BrokerAcknowledgment interface {
	AckMessage()
	NakMessage()
}

BrokerAcknowledgment represents the function that should be implemented to acknowledge a message from subscriber to the broker. Some brokers may do not support naks so is it up to the broker implementation to handle naks correctly.

type BrokerChannelSubscription

type BrokerChannelSubscription struct {
	// contains filtered or unexported fields
}

BrokerChannelSubscription is a struct that contains every returned structures when subscribing a channel.

func NewBrokerChannelSubscription

func NewBrokerChannelSubscription(
	messages chan AcknowledgeableBrokerMessage,
	cancel chan any,
) BrokerChannelSubscription

NewBrokerChannelSubscription creates a new broker channel subscription based on the channels used to receive message and cancel the subscription.

func (BrokerChannelSubscription) Cancel

func (bcs BrokerChannelSubscription) Cancel(ctx context.Context)

Cancel cancels the subscription from user perspective. It will ask for clean up on broker, which will return when finished to avoid dangling resources, such as non-existent queue listeners on (broker) server side.

func (BrokerChannelSubscription) MessagesChannel

func (bcs BrokerChannelSubscription) MessagesChannel() <-chan AcknowledgeableBrokerMessage

MessagesChannel returns the channel that will get the received messages from broker and from which the user should listen.

func (BrokerChannelSubscription) TransmitReceivedMessage

func (bcs BrokerChannelSubscription) TransmitReceivedMessage(msg AcknowledgeableBrokerMessage)

TransmitReceivedMessage should only be used by the broker to transmit the new received messages to the user.

func (BrokerChannelSubscription) WaitForCancellationAsync

func (bcs BrokerChannelSubscription) WaitForCancellationAsync(cleanup func())

WaitForCancellationAsync should be used by the broker only to wait for user request for cancellation. As it is asynchronous, it will return immediately after the call.

type BrokerController

type BrokerController interface {
	// Publish a message to the broker
	Publish(ctx context.Context, channel string, mw BrokerMessage) error

	// Subscribe to messages from the broker
	Subscribe(ctx context.Context, channel string) (BrokerChannelSubscription, error)
}

BrokerController represents the functions that should be implemented to connect the broker to the application or the user.

type BrokerMessage

type BrokerMessage struct {
	Headers map[string][]byte
	Payload []byte
}

BrokerMessage is a wrapper that will contain all information regarding a message.

func (BrokerMessage) IsUninitialized

func (bm BrokerMessage) IsUninitialized() bool

IsUninitialized check if the BrokerMessage is at zero value, i.e. the uninitialized structure. It can be used to check that a channel is closed.

func (BrokerMessage) String

func (bm BrokerMessage) String() string

String returns a string version of the broker message.

type ContextKey

type ContextKey string

ContextKey is the type of the keys used in the context.

const (
	// ContextKeyIsVersion is the AsyncAPI specification version.
	ContextKeyIsVersion ContextKey = Prefix + "version"
	// ContextKeyIsProvider is the name of the provider this data is coming from.
	// When coming from a generated user, it is `asyncapi`.
	ContextKeyIsProvider ContextKey = Prefix + "provider"
	// ContextKeyIsChannel is the name of the channel this data is coming from.
	ContextKeyIsChannel ContextKey = Prefix + "channel"
	// ContextKeyIsDirection is the direction this data is coming from.
	// It can be either "publication" or "reception".
	ContextKeyIsDirection ContextKey = Prefix + "operation"
	// ContextKeyIsBrokerMessage is the message that has been sent or received from/to the broker.
	ContextKeyIsBrokerMessage ContextKey = Prefix + "broker-message"
	// ContextKeyIsCorrelationID is the correlation ID of the message.
	ContextKeyIsCorrelationID ContextKey = Prefix + "correlationID"
)

func (ContextKey) String

func (k ContextKey) String() string

String returns the string representation of the key.

type DummyLogger

type DummyLogger struct {
}

DummyLogger is a logger that does not log anything.

func (DummyLogger) Error

func (dl DummyLogger) Error(_ context.Context, _ string, _ ...LogInfo)

Error logs error based on a message and key-value elements.

func (DummyLogger) Info

func (dl DummyLogger) Info(_ context.Context, _ string, _ ...LogInfo)

Info logs information based on a message and key-value elements.

func (DummyLogger) Warning

func (dl DummyLogger) Warning(_ context.Context, _ string, _ ...LogInfo)

Warning logs information based on a message and key-value elements.

type ErrorHandler

type ErrorHandler func(ctx context.Context, topic string, msg *AcknowledgeableBrokerMessage, err error)

ErrorHandler is the signature of the function that needs to be implemented to use errorhandler functionality.

func DefaultErrorHandler

func DefaultErrorHandler() ErrorHandler

DefaultErrorHandler returns the default error handler, which is a Noop errorhandler.

type LogInfo

type LogInfo struct {
	Key   string
	Value any
}

LogInfo is a key-value pair that will be added to the log.

type Logger

type Logger interface {
	// Info logs information based on a message and key-value elements
	Info(ctx context.Context, msg string, info ...LogInfo)

	// Warning logs information based on a message and key-value elements
	// This levels indicates a non-expected state but that does not prevent the
	// application to work properly
	Warning(ctx context.Context, msg string, info ...LogInfo)

	// Error logs error based on a message and key-value elements
	Error(ctx context.Context, msg string, info ...LogInfo)
}

Logger is the interface that must be implemented by a logger.

type Middleware

type Middleware func(ctx context.Context, msg *BrokerMessage, next NextMiddleware) error

Middleware is the signature of the function that needs to be implemented to use middleware functionnality

Message sent to or received from is passed as an argument. If you modify it, it will be kept as a modification of the sent or received message.

You can call the next middleware (by calling `next(ctx, msg)`) in the middleware code in order to wrap next code execution (for example, to time execution, or recover in case of panic).

type NextMiddleware

type NextMiddleware func(ctx context.Context) error

NextMiddleware represents the next middleware that can be executed during the previous middleware. If this is already the last middleware, it will execute the appropriate autogenerated code for reception/sending of messages.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL