cloud-event-proxy

module
v0.0.0-...-421ad94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: Apache-2.0

README

cloud-event-proxy

The cloud-event-proxy project provides a mechanism for events from the K8s infrastructure to be delivered to CNFs with low-latency.
The initial event functionality focuses on the operation of the PTP synchronization protocol, but the mechanism can be extended for any infrastructure event that requires low-latency.
The mechanism is an integral part of k8s/OCP RAN deployments where the PTP protocol is used to provide timing synchronization for the RAN software elements

go-doc Go Report Card LICENSE

Contents

Event Transporter

Cloud event proxy support two types of transport protocol

  1. AMQ Protocol
  2. HTTP Protocol
AMQ Protocol

AMQ protocol required qdr router deployed and running manually or via AMQ interconnect operator. Have transport-host variable set to AMQ instance example amqp://localhost:5672 The Producer will be sending events to bus address specified by the publisher and consumers connect to those bus address. AMQ producer example

 - name: cloud-event-sidecar
          image: quay.io/redhat-cne/cloud-event-proxy
          args:
            - "--metrics-addr=127.0.0.1:9091"
            - "--store-path=/store"
            - "--transport-host=amqp://amq-router.amq-router.svc.cluster.local"
            - "--api-port=9085"

AMQ consumer example

 - name: cloud-event-sidecar
          image: quay.io/redhat-cne/cloud-event-proxy
          args:
            - "--metrics-addr=127.0.0.1:9091"
            - "--store-path=/store"
            - "--transport-host=amqp://amq-router.amq-router.svc.cluster.local"
            - "--api-port=8089"
HTTP Protocol
Producer

CloudEvents HTTP Protocol will be enabled based on url in transport-host. If HTTP is identified then the publisher will start a publisher rest service, which is accessible outside the container via k8s service name. The Publisher service will have the ability to register consumer endpoints to publish events.

The transport URL is defined in the format of

- "--transport-host=$(TRANSPORT_PROTOCAL)://$(TRANSPORT_SERVICE).$(TRANSPORT_NAMESPACE).svc.cluster.local:$(TRANSPORT_PORT)"

HTTP producer example

 - name: cloud-event-sidecar
          image: quay.io/redhat-cne/cloud-event-proxy
          args:
            - "--metrics-addr=127.0.0.1:9091"
            - "--store-path=/store"
            - "--transport-host=http://ptp-event-publisher-service-NODE_NAME.openshift-ptp.svc.cluster.local:9043
            - "--api-port=9085"

The event producer uses a pubsubstore to store Subscriber information, including clientID, consumer service endpoint URI, resource ID etc. These are stored as one json file per registered subscriber. The pubsubstore needs to be mounted to a persistent volume in order to survive a pod reboot.

Example for configuring persistent storage

     spec:
      nodeSelector:
        node-role.kubernetes.io/worker: ""
      serviceAccountName: hw-event-proxy-sa
      containers:
        - name: cloud-event-sidecar
          volumeMounts:
            - name: pubsubstore
              mountPath: /store
      volumes:
        - name: pubsubstore
          persistentVolumeClaim:
            claimName: cloud-event-proxy-store-storage-class-http-events

Example PersistentVolumeClaim

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: cloud-event-proxy-store-storage-class-http-events
spec:
  storageClassName: storage-class-http-events
  resources:
    requests:
      storage: 10Mi
  accessModes:
  - ReadWriteOnce
Consumer

Consumer application will also set its own transport-host, which enabled cloud event proxy to run a service to listen to incoming events posted by the publisher. Consumer will also use http-event-publishers variable to request for registering publisher endpoints for consuming events.

HTTP consumer example

 - name: cloud-event-sidecar
          image: quay.io/redhat-cne/cloud-event-proxy
          args:
            - "--metrics-addr=127.0.0.1:9091"
            - "--store-path=/store"
            - "--transport-host=consumer-events-subscription-service.cloud-events.svc.cluster.local:9043"
            - "--http-event-publishers=ptp-event-publisher-service-NODE_NAME.openshift-ptp.svc.cluster.local:9043"
            - "--api-port=8089"

Creating Publisher

Publisher JSON Example

Create Publisher Resource: JSON request

{
  "Resource": "/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state",
  "UriLocation": "http://localhost:9090/ack/event"
}

Create Publisher Resource: JSON response

{
  "Id": "789be75d-7ac3-472e-bbbc-6d62878aad4a",
  "Resource": "/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state",
  "UriLocation": "http://localhost:9090/ack/event" ,
  "EndpointUri ": "http://localhost:9085/api/ocloudNotifications/v1/publishers/{publisherid}"
}
Creating Publisher Golang Eexample
Creating publisher golang example with AMQ as transporter protocol
package main
import (
	v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
    v1amqp "github.com/redhat-cne/sdk-go/v1/amqp"
	"github.com/redhat-cne/sdk-go/pkg/types"
)
func main(){
  //channel for the transport handler subscribed to get and set events  
    eventInCh := make(chan *channel.DataChan, 10)
    pubSubInstance = v1pubsub.GetAPIInstance(".")
    endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:9085", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
    // create publisher 
    pub, err := pubSubInstance.CreatePublisher(v1pubsub.NewPubSub(endpointURL, "test/test"))
    // once the publisher response is received, create a transport sender object to send events.
    if err==nil{
        v1amqp.CreateSender(eventInCh, pub.GetResource())
    }
}
Creating publisher golang example with HTTP as transporter protocol
package main
import (
	v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
	"github.com/redhat-cne/sdk-go/pkg/types"
)
func main(){
  //channel for the transport handler subscribed to get and set events  
    eventInCh := make(chan *channel.DataChan, 10)
    pubSubInstance = v1pubsub.GetAPIInstance(".")
    endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:9085", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
    // create publisher 
    pub, err := pubSubInstance.CreatePublisher(v1pubsub.NewPubSub(endpointURL, "test/test"))

}

Creating Subscriptions

Subscription JSON Example

Create Subscription Resource: JSON request

{
  "Resource": "/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state",
  "UriLocation”: “http://localhost:9090/event"
}

Example Create Subscription Resource: JSON response

{
  "Id": "789be75d-7ac3-472e-bbbc-6d62878aad4a",
  "Resource": "/east-edge-10/vdu3/o-ran-sync/sync-group/sync-status/sync-state",
  "UriLocation": "http://localhost:9090/ack/event",
  "EndpointUri": "http://localhost:8089/api/ocloudNotifications/v1/subscriptions/{subscriptionid}"
}
Creating Subscription Golang Example
Creating subscription golang example with AMQ as transporter protocol
package main
import (
	v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
    v1amqp "github.com/redhat-cne/sdk-go/v1/amqp"
	"github.com/redhat-cne/sdk-go/pkg/types"
)
func main(){
    //channel for the transport handler subscribed to get and set events  
    eventInCh := make(chan *channel.DataChan, 10)
    
    pubSubInstance = v1pubsub.GetAPIInstance(".")
    endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:8089", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
    // create subscription 
    pub, err := pubSubInstance.CreateSubscription(v1pubsub.NewPubSub(endpointURL, "test/test"))
    // once the subscription response is received, create a transport listener object to receive events.
    if err==nil{
        v1amqp.CreateListener(eventInCh, pub.GetResource())
    }
}
Creating subscription golang example with HTTP as transporter protocol
package main
import (
	v1pubsub "github.com/redhat-cne/sdk-go/v1/pubsub"
	"github.com/redhat-cne/sdk-go/pkg/types"
)
func main(){
    //channel for the transport handler subscribed to get and set events  
    eventInCh := make(chan *channel.DataChan, 10)
    
    pubSubInstance = v1pubsub.GetAPIInstance(".")
    endpointURL := &types.URI{URL: url.URL{Scheme: "http", Host: "localhost:8089", Path: fmt.Sprintf("%s%s", apiPath, "dummy")}}
    // create subscription 
    pub, err := pubSubInstance.CreateSubscription(v1pubsub.NewPubSub(endpointURL, "test/test"))
    
}

Rest-API

Rest-API to create a Publisher and Subscription

Cloud-Event-Proxy container running with rest api plugin will be running a webservice and exposing following end points.


POST /api/ocloudNotifications/v1/subscriptions
POST /api/ocloudNotifications/v1/publishers
GET /api/ocloudNotifications/v1/subscriptions
GET /api/ocloudNotifications/v1/publishers
GET /api/ocloudNotifications/v1/subscriptions/$subscriptionid
GET /api/ocloudNotifications/v1/publishers/$publisherid
GET /api/ocloudNotifications/v1/health
POST /api/ocloudNotifications/v1/log
POST /api/ocloudNotifications/v1/create/event

Create Status Listener
// 1.Create Status Listener Fn (onStatusRequestFn is action to be performed on status ping received)
onStatusRequestFn := func(e v2.Event) error {
log.Printf("got status check call,fire events for above publisher")
event, _ := createPTPEvent(pub)
_ = common.PublishEvent(config, event)
return nil
}
// 2. Create Listener object  
v1amqp.CreateNewStatusListener(config.EventInCh, fmt.Sprintf("%s/%s", pub.Resource, "status"), onStatusRequestFn, nil)

Cloud Native Events

The following example shows a Cloud Native Events serialized as JSON: (Following json should be validated with Cloud native events' event_spec.json schema)

{
    "id": "5ce55d17-9234-4fee-a589-d0f10cb32b8e",
    "type": "event.synchronization-state-chang",
    "time": "2021-02-05T17:31:00Z",
    "data": {
    "version": "v1.0",
    "values": [{
        "resource": "/cluster/node/ptp", 
        "dataType": "notification",
        "valueType": "enumeration",
        "value": "ACQUIRING-SYNC"
    }, {

        "resource": "/cluster/node/clock",
        "dataType": "metric",
        "valueType": "decimal64.3",
        "value": 100.3
    }]
    }
}

Event can be created via rest-api or calling sdk methods To produce or consume an event, the producer and consumer should have created publisher and subscription objects and should have access to the id of the publisher/subscription data objects.

import (
   v1event "github.com/redhat-cne/sdk-go/v1/event"
   cneevent "github.com/redhat-cne/sdk-go/pkg/event"
   cneevent "github.com/redhat-cne/sdk-go/pkg/event/ptp"
)

// create an event
event := v1event.CloudNativeEvent()
event.SetID(pub.ID)
event.Type = string(ptp.PtpStateChange)
event.SetTime(types.Timestamp{Time: time.Now().UTC()}.Time)
event.SetDataContentType(cneevent.ApplicationJSON)
data := cneevent.Data{
Version: "v1",
Values: []cneevent.DataValue{
	    {
        Resource:  "/cluster/node/ptp",
        DataType:  cneevent.NOTIFICATION,
        ValueType: cneevent.ENUMERATION,
        Value:     cneevent.GNSS_ACQUIRING_SYNC,
        },
    },
}
data.SetVersion("v1") 
event.SetData(data)

Publisher event create via go-sdk
cloudEvent, _ := v1event.CreateCloudEvents(event, pub)
//send event to AMQP (rest API does this action by default)
v1event.SendNewEventToDataChannel(eventInCh, pub.Resource, cloudEvent)

Publisher event create via rest-api

//POST /api/ocloudNotifications/v1/create/event
if pub,err:=pubSubInstance.GetPublisher(publisherID);err==nil {
    url = fmt.SPrintf("%s%s", server.HostPath, "create/event")
    restClient.PostEvent(pub.EndPointURI.String(), event)
}

Metrics

sdk-go metrics

Cloud native events sdk-go comes with following metrics collectors .

  1. Number of events received by the transport
  2. Number of events published by the transport.
  3. Number of connection resets.
  4. Number of sender created
  5. Number of receiver created
rest-api metrics

Cloud native events rest API comes with following metrics collectors .

  1. Number of events published by the rest api.
  2. Number of active subscriptions.
  3. Number of active publishers.
cloud-event-proxy metrics
  1. Number of events produced.
  2. Number of events received.

Metrics details

Plugin

Plugin details

Directories

Path Synopsis
Package main ...
Package main ...
examples
producer
Package main ...
Package main ...
simplehttp
Package main ...
Package main ...
simplehttp/common
Package common ...
Package common ...
pkg
common
Package common ...
Package common ...
plugins
Package plugins ...
Package plugins ...
version
Package version provides information about the current semantic version for cloud-event-proxy
Package version provides information about the current semantic version for cloud-event-proxy
plugins
amqp
Package main ...
Package main ...
http
Package main ...
Package main ...
ptp_operator
Package main defines ptp-operator plugin
Package main defines ptp-operator plugin
ptp_operator/stats
Package stats ...
Package stats ...
ptp_operator/types
Package types ...
Package types ...
test
cne
Package cne ...
Package cne ...
utils/client
Package client ...
Package client ...
utils/execute
Package execute ...
Package execute ...
utils/namespaces
Package namespaces ...
Package namespaces ...
utils/nodes
Package nodes ...
Package nodes ...
utils/pods
Package pods ...
Package pods ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL