amqp

package
v0.0.0-...-51fd5d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueSNMPDataReq            = "snmp_metric_data_req"
	QueueSNMPMetricsDataReq     = "snmp_metrics_data_req"
	QueueSNMPActionReq          = "snmp_action_req"
	QueueRTSMetricDataReq       = "rts_metric_data_req"
	QueueRTSMetricData          = "rts_metric_data"
	QueueDHSMetricsDataRes      = "dhs_metrics_data_res"
	QueueDHSMetricCreated       = "dhs_metric_created"
	QueueDHSContainerCreated    = "dhs_container_created"
	QueueAlarmCheckMetricAlarm  = "alarm_check_metric_alarm"
	QueueAlarmCheckMetricsAlarm = "alarm_check_metrics_alarm"
	QueueAlarmMetricAlarmed     = "alarm_metric_alarmed"
	QueueAlarmMetricsAlarmed    = "alarm_metrics_alarmed"
	QueueAlarmAck               = "alarm_ack"
	QueueAlarmsCleared          = "alarms_cleared"
	QueueRTSPersistPulling      = "rt_persist_pulling"

	ExchangeContainerCreated   = "container_created"    // fanout
	ExchangeContainerUpdated   = "container_updated"    // fanout
	ExchangeContainerDeleted   = "container_deleted"    // fanout
	ExchangeMetricCreated      = "metric_created"       // fanout
	ExchangeMetricUpdated      = "metric_updated"       // fanout
	ExchangeMetricDeleted      = "metric_deleted"       // fanout
	ExchangeDataPolicyDeleted  = "datapolicy_deleted"   // fanout
	ExchangeServiceLogs        = "logs"                 // fanout
	ExchangeServicesStatus     = "services_status"      // fanout
	ExchangeServiceRegisterReq = "register_service_req" // fanout
	ExchangeServiceRegisterRes = "register_service_res" // fanout
	ExchangeServiceUnregister  = "unregister_service"   // fanout
	ExchangeCheckMetricsAlarm  = "check_metrics_alarm"  // fanout
	ExchangeCheckMetricAlarm   = "check_metric_alarm"   // fanout
	ExchangeMetricsAlarmed     = "metrics_alarmed"      // fanout
	ExchangeMetricAlarmed      = "metric_alarmed"       // fanout
	ExchangeActionReq          = "action_req"           // fanout
	ExchangeActionRes          = "action_res"           // fanout
	ExchangeAlarmAck           = "alarm_ack"            // fanout
	ExchangePersistPulling     = "persist_pulling"      // fanout
	ExchangeAlarmsCleared      = "alarm_cleared"        // fanout

	ExchangeServicePing    = "ping"             // direct
	ExchangeServicePong    = "pong"             // direct
	ExchangeMetricDataReq  = "metric_data_req"  // direct
	ExchangeMetricsDataReq = "metrics_data_req" // direct
	ExchangeMetricDataRes  = "metric_data_res"  // direct
	ExchangeMetricsDataRes = "metrics_data_res" // direct
)
View Source
const DefaultExp = "5000"

DefaultExp is the default expiration in milliseconds.

Variables

View Source
var ErrNoRoutingKey = errors.New("no routing key")

Functions

func Decode

func Decode(data []byte, v any) error

Decode decodes the MessagePack-encoded data and stores the result in the value pointed to by v.

func Dial

func Dial() (*amqp.Connection, error)

Use current enviroment to connect to amqp server.

func Encode

func Encode(v any) ([]byte, error)

Encode returns the MessagePack encoding of v using a custom configuration for amqp.

func GetDataRoutingKey

func GetDataRoutingKey(t container.ContainerType) (string, error)

func GetRoutingKeyFromHeader

func GetRoutingKeyFromHeader(h amqp091.Table) (string, error)

func HandleAPINotifications

func HandleAPINotifications(conn *Conn, handler *NotificationHandler)

func NotifyContainerCreated

func NotifyContainerCreated(conn *Conn, base models.BaseContainer, protocol any) (err error)

func NotifyContainerDeleted

func NotifyContainerDeleted(conn *Conn, id int32) (err error)

func NotifyContainerUpdated

func NotifyContainerUpdated(conn *Conn, base models.BaseContainer, protocol any) (err error)

func NotifyDataPolicyDeleted

func NotifyDataPolicyDeleted(conn *Conn, id int16) (err error)

func NotifyMetricCreated

func NotifyMetricCreated(conn *Conn, base models.BaseMetric, protocol any) (err error)

func NotifyMetricDeleted

func NotifyMetricDeleted(conn *Conn, containerId int32, id int64) (err error)

func NotifyMetricUpdated

func NotifyMetricUpdated(conn *Conn, base models.BaseMetric, protocol any) (err error)

func RouteHeader

func RouteHeader(serviceIdent string) amqp091.Table

func TestDecode

func TestDecode(t *testing.T)

func TestEncode

func TestEncode(t *testing.T)

Types

type Channel

type Channel struct {
	ReconnectTimeout time.Duration
	// contains filtered or unexported fields
}

func (*Channel) AutoReconnect

func (ch *Channel) AutoReconnect()

func (*Channel) Close

func (ch *Channel) Close()

func (*Channel) Connect

func (ch *Channel) Connect() error

func (*Channel) Listen

func (ch *Channel) Listen()

func (*Channel) ListenPublishs

func (ch *Channel) ListenPublishs()

func (*Channel) Publish

func (ch *Channel) Publish(publish models.DetailedPublishing) error

type Conn

type Conn struct {
	ReconnectTimeout time.Duration
	// contains filtered or unexported fields
}

func NewConn

func NewConn(infoCh chan string, errCh chan string) (conn *Conn, err error)

NewConn returns a new amqp connection.

func (*Conn) AutoReconnect

func (c *Conn) AutoReconnect()

AutoReconnect auto reconnects to the amqp server when the connection is closed. Will keep trying the reconnection until done channel is filled.

func (*Conn) Close

func (c *Conn) Close()

Close closes the amqp connection and stop the auto reconnection. It's safe to call multiple times

func (*Conn) Connect

func (c *Conn) Connect() error

Connect connects to the amqp server.

func (*Conn) CreatePublishers

func (c *Conn) CreatePublishers(n int) error

func (*Conn) DeclareExchages

func (c *Conn) DeclareExchages()

func (*Conn) Done

func (c *Conn) Done() <-chan any

func (*Conn) IsClosed

func (c *Conn) IsClosed() bool

func (*Conn) Listen

func (c *Conn) Listen(options models.ListenerOptions) (msgs <-chan amqp091.Delivery, err error)

func (*Conn) ListenPings

func (c *Conn) ListenPings(serviceIdent string)

func (*Conn) NewChannel

func (c *Conn) NewChannel(listener bool, options ...models.ListenerOptions) (ch *Channel, err error)

func (*Conn) NewPlublisher

func (c *Conn) NewPlublisher() error

func (*Conn) Publish

func (c *Conn) Publish(publish models.DetailedPublishing)

type ContainerNotification

type ContainerNotification struct {
	Base     models.BaseContainer
	Protocol any
}

type MetricNotification

type MetricNotification struct {
	Base     models.BaseMetric
	Protocol any
}

type NotificationHandler

type NotificationHandler struct {
	ContainerCreatedQueue string
	MetricCreatedQueue    string

	OnContainerCreated func(base models.BaseContainer, protocol any)
	OnContainerUpdated func(base models.BaseContainer, protocol any)
	OnContainerDeleted func(int32)

	OnMetricCreated func(base models.BaseMetric, protocol any)
	OnMetricUpdated func(base models.BaseMetric, protocol any)
	OnMetricDeleted func(containerId int32, metricId int64)

	OnDataPolicyDeleted func(int16)

	OnError func(error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL