rabbit

package
v0.37.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2023 License: Apache-2.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	BindingKey    = "x-knative-trigger"
	DLQBindingKey = "x-knative-dlq"
)
View Source
const (
	TriggerLabelKey = "eventing.knative.dev/trigger"
	SourceLabelKey  = "eventing.knative.dev/SourceName"
)
View Source
const (
	BrokerURLSecretKey = "brokerURL"
)
View Source
const CA_SECRET_KEYNAME = "caSecretName"

Variables

This section is empty.

Functions

func ChannelConfirm added in v0.34.0

func ChannelConfirm(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error

func ChannelQoS added in v0.34.0

func ChannelQoS(connection RabbitMQConnectionInterface, channel RabbitMQChannelInterface) error

func CloudEventToRabbitMQMessage added in v0.35.0

func CloudEventToRabbitMQMessage(event *cloudevents.Event, tp, ts string) *amqp.Publishing

func ConvertMessageToHTTPRequest added in v0.32.0

func ConvertMessageToHTTPRequest(
	ctx context.Context,
	sourceName, namespace, queueName string,
	msg *amqp.Delivery,
	req *nethttp.Request,
	logger *zap.Logger) error

func ConvertToCloudEvent added in v0.32.0

func ConvertToCloudEvent(event *cloudevents.Event, msg *amqp.Delivery, namespace, sourceName, queueName string) error

func Labels added in v0.31.0

Labels generates the labels for a RabbitMQ resource Used by exchanges, queues, and bindings created by broker, trigger, and source controllers

func MakeSecret added in v0.33.0

func MakeSecret(name, typeString, namespace, url string, owner kmeta.OwnerRefable) *corev1.Secret

MakeSecret creates the secret for Broker deployments for Rabbit Broker.

func NewBinding added in v0.31.0

func NewBinding(args *BindingArgs) (*rabbitv1beta1.Binding, error)

func NewBrokerDLXPolicy added in v0.32.0

func NewBrokerDLXPolicy(args *QueueArgs) *rabbitv1beta1.Policy

NewBrokerDLXPolicy configures the broker dead letter exchange for trigger queues that does not have dlx defined

func NewExchange added in v0.31.0

func NewExchange(args *ExchangeArgs) *rabbitv1beta1.Exchange

NewExchange returns an `exchange.rabbitmq.com` object used by trigger, broker, and source reconcilers when used by trigger and broker, exchange properties such as `durable`, autoDelete`, and `type` are hardcoded

func NewPolicy added in v0.31.0

func NewPolicy(args *QueueArgs) *rabbitv1beta1.Policy

func NewQueue added in v0.31.0

func NewQueue(args *QueueArgs) *rabbitv1beta1.Queue

func ReconcileSecret added in v0.33.0

func ReconcileSecret(ctx context.Context, secretLister corev1listers.SecretLister, kubeClientSet kubernetes.Interface, s *corev1.Secret) error

reconcileSecret reconciles the K8s Secret 's'.

func SecretLabels added in v0.33.0

func SecretLabels(resourceName, typeString string) map[string]string

SecretLabels generates the labels present on all resources representing the secret of the given Broker.

func SecretName added in v0.33.0

func SecretName(resourceName, typeString string) string

func VHostHandler added in v0.35.0

func VHostHandler(broker string, vhost string) string

Types

type BindingArgs added in v0.31.0

type BindingArgs struct {
	Name                     string
	Namespace                string
	Owner                    metav1.OwnerReference
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	RabbitMQVhost            string
	Source                   string
	Destination              string
	Labels                   map[string]string
	Filters                  map[string]string
	ClusterName              string
}

type DeleteResourceArgs added in v0.34.0

type DeleteResourceArgs struct {
	Kind      interface{}
	Name      string
	Namespace string
	Owner     metav1.Object
}

type ExchangeArgs added in v0.31.0

type ExchangeArgs struct {
	Name                     string
	Namespace                string
	RabbitMQVhost            string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	RabbitMQURL              *url.URL
	Broker                   *eventingv1.Broker
	Trigger                  *eventingv1.Trigger
	Source                   *v1alpha1.RabbitmqSource
}

ExchangeArgs are the arguments to create a RabbitMQ Exchange.

type Message added in v0.32.0

type Message struct {
	Value       []byte
	Headers     map[string][]byte
	ContentType string
	// contains filtered or unexported fields
}

Message holds a rabbitmq message. this message *can* be read several times safely

func NewMessage added in v0.32.0

func NewMessage(value []byte, contentType string, headers map[string][]byte) *Message

NewMessage returns a binding.Message that holds the provided rabbitmq message components. The returned binding.Message *can* be read several times safely

func NewMessageFromDelivery added in v0.32.0

func NewMessageFromDelivery(sourceName, namespace, queueName string, msg *amqp.Delivery) *Message

NewMessageFromDelivery returns a binding.Message that holds the provided RabbitMQ Message. The returned binding.Message *can* be read several times safely

func (*Message) Finish added in v0.32.0

func (m *Message) Finish(error) error

func (*Message) GetAttribute added in v0.32.0

func (m *Message) GetAttribute(k spec.Kind) (spec.Attribute, interface{})

func (*Message) GetExtension added in v0.32.0

func (m *Message) GetExtension(name string) interface{}

func (*Message) ReadBinary added in v0.32.0

func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) (err error)

func (*Message) ReadEncoding added in v0.32.0

func (m *Message) ReadEncoding() binding.Encoding

func (*Message) ReadStructured added in v0.32.0

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error

type QueueArgs added in v0.31.0

type QueueArgs struct {
	Name                     string
	Namespace                string
	RabbitMQVhost            string
	QueueName                string
	RabbitmqClusterReference *rabbitv1beta1.RabbitmqClusterReference
	Owner                    metav1.OwnerReference
	Labels                   map[string]string
	DLXName                  *string
	Source                   *v1alpha1.RabbitmqSource
	BrokerUID                string
	QueueType                eventingv1alpha1.QueueType
}

type Rabbit

type Rabbit struct {
	rabbitclientset.Interface
	// contains filtered or unexported fields
}

func New

func New(ctx context.Context) *Rabbit

func (*Rabbit) DeleteResource added in v0.34.0

func (r *Rabbit) DeleteResource(ctx context.Context, args *DeleteResourceArgs) error

func (*Rabbit) GetRabbitMQCASecret added in v0.35.0

func (r *Rabbit) GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)

func (*Rabbit) RabbitMQURL added in v0.33.0

func (r *Rabbit) RabbitMQURL(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)

func (*Rabbit) ReconcileBinding

func (r *Rabbit) ReconcileBinding(ctx context.Context, args *BindingArgs) (Result, error)

func (*Rabbit) ReconcileBrokerDLXPolicy added in v0.32.0

func (r *Rabbit) ReconcileBrokerDLXPolicy(ctx context.Context, args *QueueArgs) (Result, error)

func (*Rabbit) ReconcileDLQPolicy added in v0.33.0

func (r *Rabbit) ReconcileDLQPolicy(ctx context.Context, args *QueueArgs) (Result, error)

func (*Rabbit) ReconcileExchange

func (r *Rabbit) ReconcileExchange(ctx context.Context, args *ExchangeArgs) (Result, error)

func (*Rabbit) ReconcileQueue

func (r *Rabbit) ReconcileQueue(ctx context.Context, args *QueueArgs) (Result, error)

type RabbitMQBadConnectionMock added in v0.34.0

type RabbitMQBadConnectionMock struct{}

func (*RabbitMQBadConnectionMock) ChannelWrapper added in v0.34.0

func (*RabbitMQBadConnectionMock) Close added in v0.34.0

func (rm *RabbitMQBadConnectionMock) Close() error

func (*RabbitMQBadConnectionMock) IsClosed added in v0.34.0

func (rm *RabbitMQBadConnectionMock) IsClosed() bool

func (*RabbitMQBadConnectionMock) NotifyClose added in v0.34.0

func (rm *RabbitMQBadConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

type RabbitMQChannelInterface added in v0.34.0

type RabbitMQChannelInterface interface {
	IsClosed() bool
	NotifyClose(chan *amqp091.Error) chan *amqp091.Error
	Qos(int, int, bool) error
	Confirm(bool) error
	Consume(string, string, bool, bool, bool, bool, amqp091.Table) (<-chan amqp091.Delivery, error)
	PublishWithDeferredConfirm(string, string, bool, bool, amqp091.Publishing) (*amqp091.DeferredConfirmation, error)
	QueueInspect(string) (amqp091.Queue, error)
}

type RabbitMQChannelMock added in v0.34.0

type RabbitMQChannelMock struct {
	NotifyCloseChannel chan *amqp.Error
	ConsumeChannel     <-chan amqp.Delivery
}

func (*RabbitMQChannelMock) Confirm added in v0.34.0

func (rm *RabbitMQChannelMock) Confirm(a bool) error

func (*RabbitMQChannelMock) Consume added in v0.34.0

func (rm *RabbitMQChannelMock) Consume(a string, b string, c bool, d bool, e bool, f bool, t amqp.Table) (<-chan amqp.Delivery, error)

func (*RabbitMQChannelMock) IsClosed added in v0.36.0

func (rm *RabbitMQChannelMock) IsClosed() bool

func (*RabbitMQChannelMock) NotifyClose added in v0.34.0

func (rm *RabbitMQChannelMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

func (*RabbitMQChannelMock) PublishWithDeferredConfirm added in v0.34.0

func (rm *RabbitMQChannelMock) PublishWithDeferredConfirm(a string, b string, c bool, d bool, p amqp.Publishing) (*amqp.DeferredConfirmation, error)

func (*RabbitMQChannelMock) Qos added in v0.34.0

func (rm *RabbitMQChannelMock) Qos(a int, b int, c bool) error

func (*RabbitMQChannelMock) QueueInspect added in v0.34.0

func (rm *RabbitMQChannelMock) QueueInspect(string) (amqp.Queue, error)

type RabbitMQConnection added in v0.34.0

type RabbitMQConnection struct {
	// contains filtered or unexported fields
}

func NewConnection added in v0.34.0

func NewConnection(conn interface{}) *RabbitMQConnection

func (*RabbitMQConnection) ChannelWrapper added in v0.34.0

func (r *RabbitMQConnection) ChannelWrapper() (RabbitMQChannelInterface, error)

func (*RabbitMQConnection) Close added in v0.34.0

func (r *RabbitMQConnection) Close() error

func (*RabbitMQConnection) IsClosed added in v0.34.0

func (r *RabbitMQConnection) IsClosed() bool

func (*RabbitMQConnection) NotifyClose added in v0.34.0

func (r *RabbitMQConnection) NotifyClose(c chan *amqp091.Error) chan *amqp091.Error

type RabbitMQConnectionHandler added in v0.36.0

type RabbitMQConnectionHandler struct {
	Connection RabbitMQConnectionWrapperInterface
	Channel    RabbitMQChannelInterface
	// contains filtered or unexported fields
}

func (*RabbitMQConnectionHandler) GetChannel added in v0.36.0

func (*RabbitMQConnectionHandler) GetConnection added in v0.36.0

func (*RabbitMQConnectionHandler) Setup added in v0.36.0

type RabbitMQConnectionInterface added in v0.34.0

type RabbitMQConnectionInterface interface {
	NotifyClose(chan *amqp091.Error) chan *amqp091.Error
	Close() error
	IsClosed() bool
}

type RabbitMQConnectionMock added in v0.34.0

type RabbitMQConnectionMock struct{ NotifyCloseChannel chan *amqp.Error }

func (*RabbitMQConnectionMock) ChannelWrapper added in v0.34.0

func (rm *RabbitMQConnectionMock) ChannelWrapper() (RabbitMQChannelInterface, error)

func (*RabbitMQConnectionMock) Close added in v0.34.0

func (rm *RabbitMQConnectionMock) Close() error

func (*RabbitMQConnectionMock) IsClosed added in v0.34.0

func (rm *RabbitMQConnectionMock) IsClosed() bool

func (*RabbitMQConnectionMock) NotifyClose added in v0.34.0

func (rm *RabbitMQConnectionMock) NotifyClose(c chan *amqp.Error) chan *amqp.Error

type RabbitMQConnectionWrapperInterface added in v0.36.0

type RabbitMQConnectionWrapperInterface interface {
	RabbitMQConnectionInterface
	ChannelWrapper() (RabbitMQChannelInterface, error)
}

func BadChannelDial added in v0.36.0

func BadChannelDial(url string) (RabbitMQConnectionWrapperInterface, error)

func BadConnectionDial added in v0.36.0

func BadConnectionDial(url string) (RabbitMQConnectionWrapperInterface, error)

func DialWrapper added in v0.34.0

func ValidDial added in v0.34.0

type RabbitMQConnectionsHandlerInterface added in v0.36.0

type RabbitMQConnectionsHandlerInterface interface {
	GetConnection() RabbitMQConnectionInterface
	GetChannel() RabbitMQChannelInterface
	Setup(context.Context, string, func(RabbitMQConnectionInterface, RabbitMQChannelInterface) error, func(string) (RabbitMQConnectionWrapperInterface, error))
}

func NewRabbitMQConnectionHandler added in v0.36.0

func NewRabbitMQConnectionHandler(cycleDuration time.Duration, logger *zap.SugaredLogger) RabbitMQConnectionsHandlerInterface

type Result

type Result struct {
	Name  string
	Ready bool
}

type Service

type Service interface {
	RabbitMQURL(context.Context, *rabbitv1beta1.RabbitmqClusterReference) (*url.URL, error)
	ReconcileExchange(context.Context, *ExchangeArgs) (Result, error)
	ReconcileQueue(context.Context, *QueueArgs) (Result, error)
	ReconcileBinding(context.Context, *BindingArgs) (Result, error)
	ReconcileBrokerDLXPolicy(context.Context, *QueueArgs) (Result, error)
	ReconcileDLQPolicy(context.Context, *QueueArgs) (Result, error)
	DeleteResource(ctx context.Context, args *DeleteResourceArgs) error
	GetRabbitMQCASecret(ctx context.Context, clusterRef *rabbitv1beta1.RabbitmqClusterReference) (string, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL