Documentation ¶
Index ¶
- Constants
- Variables
- func Decode(data []byte, v any) error
- func Dial() (*amqp.Connection, error)
- func Encode(v any) ([]byte, error)
- func GetDataRoutingKey(t container.ContainerType) (string, error)
- func GetRoutingKeyFromHeader(h amqp091.Table) (string, error)
- func HandleAPINotifications(conn *Conn, handler *NotificationHandler)
- func NotifyContainerCreated(conn *Conn, base models.BaseContainer, protocol any) (err error)
- func NotifyContainerDeleted(conn *Conn, id int32) (err error)
- func NotifyContainerUpdated(conn *Conn, base models.BaseContainer, protocol any) (err error)
- func NotifyDataPolicyDeleted(conn *Conn, id int16) (err error)
- func NotifyMetricCreated(conn *Conn, base models.BaseMetric, protocol any) (err error)
- func NotifyMetricDeleted(conn *Conn, containerId int32, id int64) (err error)
- func NotifyMetricUpdated(conn *Conn, base models.BaseMetric, protocol any) (err error)
- func RouteHeader(serviceIdent string) amqp091.Table
- func TestDecode(t *testing.T)
- func TestEncode(t *testing.T)
- type Channel
- type Conn
- func (c *Conn) AutoReconnect()
- func (c *Conn) Close()
- func (c *Conn) Connect() error
- func (c *Conn) CreatePublishers(n int) error
- func (c *Conn) DeclareExchages()
- func (c *Conn) Done() <-chan any
- func (c *Conn) IsClosed() bool
- func (c *Conn) Listen(options models.ListenerOptions) (msgs <-chan amqp091.Delivery, err error)
- func (c *Conn) ListenPings(serviceIdent string)
- func (c *Conn) NewChannel(listener bool, options ...models.ListenerOptions) (ch *Channel, err error)
- func (c *Conn) NewPlublisher() error
- func (c *Conn) Publish(publish models.DetailedPublishing)
- type ContainerNotification
- type MetricNotification
- type NotificationHandler
Constants ¶
View Source
const ( QueueSNMPDataReq = "snmp_metric_data_req" QueueSNMPMetricsDataReq = "snmp_metrics_data_req" QueueSNMPActionReq = "snmp_action_req" QueueRTSMetricDataReq = "rts_metric_data_req" QueueRTSMetricData = "rts_metric_data" QueueDHSMetricsDataRes = "dhs_metrics_data_res" QueueDHSMetricCreated = "dhs_metric_created" QueueDHSContainerCreated = "dhs_container_created" QueueAlarmCheckMetricAlarm = "alarm_check_metric_alarm" QueueAlarmCheckMetricsAlarm = "alarm_check_metrics_alarm" QueueAlarmMetricAlarmed = "alarm_metric_alarmed" QueueAlarmMetricsAlarmed = "alarm_metrics_alarmed" QueueAlarmAck = "alarm_ack" QueueAlarmsCleared = "alarms_cleared" QueueRTSPersistPulling = "rt_persist_pulling" ExchangeContainerCreated = "container_created" // fanout ExchangeContainerUpdated = "container_updated" // fanout ExchangeContainerDeleted = "container_deleted" // fanout ExchangeMetricCreated = "metric_created" // fanout ExchangeMetricUpdated = "metric_updated" // fanout ExchangeMetricDeleted = "metric_deleted" // fanout ExchangeDataPolicyDeleted = "datapolicy_deleted" // fanout ExchangeServiceLogs = "logs" // fanout ExchangeServicesStatus = "services_status" // fanout ExchangeServiceRegisterReq = "register_service_req" // fanout ExchangeServiceRegisterRes = "register_service_res" // fanout ExchangeServiceUnregister = "unregister_service" // fanout ExchangeCheckMetricsAlarm = "check_metrics_alarm" // fanout ExchangeCheckMetricAlarm = "check_metric_alarm" // fanout ExchangeMetricsAlarmed = "metrics_alarmed" // fanout ExchangeMetricAlarmed = "metric_alarmed" // fanout ExchangeActionReq = "action_req" // fanout ExchangeActionRes = "action_res" // fanout ExchangeAlarmAck = "alarm_ack" // fanout ExchangePersistPulling = "persist_pulling" // fanout ExchangeAlarmsCleared = "alarm_cleared" // fanout ExchangeServicePing = "ping" // direct ExchangeServicePong = "pong" // direct ExchangeMetricDataReq = "metric_data_req" // direct ExchangeMetricsDataReq = "metrics_data_req" // direct ExchangeMetricDataRes = "metric_data_res" // direct ExchangeMetricsDataRes = "metrics_data_res" // direct )
View Source
const DefaultExp = "5000"
DefaultExp is the default expiration in milliseconds.
Variables ¶
View Source
var ErrNoRoutingKey = errors.New("no routing key")
Functions ¶
func Decode ¶
Decode decodes the MessagePack-encoded data and stores the result in the value pointed to by v.
func GetDataRoutingKey ¶
func GetDataRoutingKey(t container.ContainerType) (string, error)
func GetRoutingKeyFromHeader ¶
func HandleAPINotifications ¶
func HandleAPINotifications(conn *Conn, handler *NotificationHandler)
func NotifyContainerCreated ¶
func NotifyContainerCreated(conn *Conn, base models.BaseContainer, protocol any) (err error)
func NotifyContainerDeleted ¶
func NotifyContainerUpdated ¶
func NotifyContainerUpdated(conn *Conn, base models.BaseContainer, protocol any) (err error)
func NotifyDataPolicyDeleted ¶
func NotifyMetricCreated ¶
func NotifyMetricCreated(conn *Conn, base models.BaseMetric, protocol any) (err error)
func NotifyMetricDeleted ¶
func NotifyMetricUpdated ¶
func NotifyMetricUpdated(conn *Conn, base models.BaseMetric, protocol any) (err error)
func RouteHeader ¶
func RouteHeader(serviceIdent string) amqp091.Table
func TestDecode ¶
func TestEncode ¶
Types ¶
type Channel ¶
func (*Channel) AutoReconnect ¶
func (ch *Channel) AutoReconnect()
func (*Channel) ListenPublishs ¶
func (ch *Channel) ListenPublishs()
type Conn ¶
func (*Conn) AutoReconnect ¶
func (c *Conn) AutoReconnect()
AutoReconnect auto reconnects to the amqp server when the connection is closed. Will keep trying the reconnection until done channel is filled.
func (*Conn) Close ¶
func (c *Conn) Close()
Close closes the amqp connection and stop the auto reconnection. It's safe to call multiple times
func (*Conn) CreatePublishers ¶
func (*Conn) DeclareExchages ¶
func (c *Conn) DeclareExchages()
func (*Conn) Listen ¶
func (c *Conn) Listen(options models.ListenerOptions) (msgs <-chan amqp091.Delivery, err error)
func (*Conn) ListenPings ¶
func (*Conn) NewChannel ¶
func (*Conn) NewPlublisher ¶
func (*Conn) Publish ¶
func (c *Conn) Publish(publish models.DetailedPublishing)
type ContainerNotification ¶
type ContainerNotification struct { Base models.BaseContainer Protocol any }
type MetricNotification ¶
type MetricNotification struct { Base models.BaseMetric Protocol any }
type NotificationHandler ¶
type NotificationHandler struct { ContainerCreatedQueue string MetricCreatedQueue string OnContainerCreated func(base models.BaseContainer, protocol any) OnContainerUpdated func(base models.BaseContainer, protocol any) OnContainerDeleted func(int32) OnMetricCreated func(base models.BaseMetric, protocol any) OnMetricUpdated func(base models.BaseMetric, protocol any) OnMetricDeleted func(containerId int32, metricId int64) OnDataPolicyDeleted func(int16) OnError func(error) }
Click to show internal directories.
Click to hide internal directories.