communications

package
v1.7.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 2, 2021 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Store is a reference to the Storage being used

Functions

func ActivateObjects

func ActivateObjects()

ActivateObjects looks for objects that are ready to be activated, marks them as active, and sends object notifications to their destinations

func IsTransportError

func IsTransportError(pResp *http.Response, err error) bool

func PrepareDeleteNotifications

func PrepareDeleteNotifications(metaData common.MetaData) ([]common.NotificationInfo, common.SyncServiceError)

PrepareDeleteNotifications prepares the delete notification message

func PrepareNotificationsForDestinations

func PrepareNotificationsForDestinations(metaData common.MetaData, destinations []common.StoreDestinationStatus, topic string) ([]common.NotificationInfo,
	common.SyncServiceError)

PrepareNotificationsForDestinations prepares notification messages for the destinations if necessary

func PrepareObjectNotifications

func PrepareObjectNotifications(metaData common.MetaData) ([]common.NotificationInfo, common.SyncServiceError)

PrepareObjectNotifications sends notifications to object’s destinations

func PrepareObjectStatusNotification

func PrepareObjectStatusNotification(metaData common.MetaData, status string) ([]common.NotificationInfo, common.SyncServiceError)

PrepareObjectStatusNotification sends an object status message to the other side

func PrepareUpdateNotification

func PrepareUpdateNotification(metaData common.MetaData, destinations []common.Destination) ([]common.NotificationInfo, common.SyncServiceError)

PrepareUpdateNotification prepares the notification message from object's meta data

func ResendNotifications

func ResendNotifications() common.SyncServiceError

ResendNotifications resends notications that haven't been acknowledged

func ResendObjects

func ResendObjects() common.SyncServiceError

ResendObjects requests to resend all the relevant objects

func SendErrorResponse

func SendErrorResponse(writer http.ResponseWriter, err error, message string, statusCode int)

SendErrorResponse common code to send HTTP error codes

func SendNotifications

func SendNotifications(notifications []common.NotificationInfo) common.SyncServiceError

SendNotifications calls the communication to send the notification messages

Types

type Communicator

type Communicator interface {
	// StartCommunication starts communications
	StartCommunication() common.SyncServiceError

	// StopCommunication stops communications
	StopCommunication() common.SyncServiceError

	// SendNotificationMessage sends a notification message from the CSS to the ESS or from the ESS to the CSS
	SendNotificationMessage(notificationTopic string, destType string, destID string, instanceID int64, dataID int64, metaData *common.MetaData) common.SyncServiceError

	// SendFeedbackMessage sends a feedback message from the ESS to the CSS or from the CSS to the ESS
	SendFeedbackMessage(code int, retryInterval int32, reason string, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

	// SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS
	SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

	// Register sends a registration message to be sent by an ESS
	Register() common.SyncServiceError

	// RegisterAck sends a registration acknowledgement message from the CSS
	RegisterAck(destination common.Destination) common.SyncServiceError

	// HandleRegAck handles a registration acknowledgement message from the CSS
	HandleRegAck()

	// RegisterAsNew send a notification from a CSS to a ESS that the ESS has to send a registerNew message in order
	// to register
	RegisterAsNew(destination common.Destination) common.SyncServiceError

	// RegisterNew sends a new registration message to be sent by an ESS
	RegisterNew() common.SyncServiceError

	// Unregister ESS
	Unregister() common.SyncServiceError

	// SendPing sends a ping message from ESS to CSS
	SendPing() common.SyncServiceError

	// GetData requests data to be sent from the CSS to the ESS or from the ESS to the CSS
	GetData(metaData common.MetaData, offset int64) common.SyncServiceError

	// SendData sends data from the CSS to the ESS or from the ESS to the CSS
	SendData(orgID string, destType string, destID string, message []byte, chunked bool) common.SyncServiceError

	// ResendObjects requests to resend all the relevant objects
	ResendObjects() common.SyncServiceError

	// SendAckResendObjects sends ack to resend objects request
	SendAckResendObjects(destination common.Destination) common.SyncServiceError

	// UpdateOrganization adds or updates an organization
	UpdateOrganization(org common.Organization, timestamp time.Time) common.SyncServiceError

	// DeleteOrganization removes an organization
	DeleteOrganization(orgID string) common.SyncServiceError

	// LockDataChunks locks one of the data chunks locks
	LockDataChunks(index uint32, metadata *common.MetaData)

	// UnlockDataChunks unlocks one of the data chunks locks
	UnlockDataChunks(index uint32, metadata *common.MetaData)
}

Communicator defines the interface for communications between the CSS and the CSS

var Comm Communicator

Comm is the selected communications struct

type DestinationRequestQueue

type DestinationRequestQueue struct {
	// contains filtered or unexported fields
}
var DestReqQueue *DestinationRequestQueue

func NewDestinationRequestQueue

func NewDestinationRequestQueue(bufferSize uint64) *DestinationRequestQueue

func (*DestinationRequestQueue) Close

func (q *DestinationRequestQueue) Close()

func (*DestinationRequestQueue) SendDestReqToQueue

func (q *DestinationRequestQueue) SendDestReqToQueue(destReqInQueue common.DestinationRequestInQueue)

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error is the error struct used by the communications code

func (*Error) Error

func (e *Error) Error() string

type HTTP

type HTTP struct {
	// contains filtered or unexported fields
}

HTTP is the struct for the HTTP communications layer

func (*HTTP) ChangeLeadership

func (communication *HTTP) ChangeLeadership(isLeader bool) common.SyncServiceError

ChangeLeadership changes the leader

func (*HTTP) DeleteOrganization

func (communication *HTTP) DeleteOrganization(orgID string) common.SyncServiceError

DeleteOrganization removes an organization

func (*HTTP) GetData

func (communication *HTTP) GetData(metaData common.MetaData, offset int64) common.SyncServiceError

GetData requests data to be sent from the CSS to the ESS

func (*HTTP) HandleRegAck

func (communication *HTTP) HandleRegAck()

HandleRegAck handles a registration acknowledgement message from the CSS

func (*HTTP) LockDataChunks

func (communication *HTTP) LockDataChunks(index uint32, metadata *common.MetaData)

LockDataChunks locks one of the data chunks locks

func (*HTTP) Poll

func (communication *HTTP) Poll() bool

Poll polls the CSS for updates, notifications, etc

func (*HTTP) Register

func (communication *HTTP) Register() common.SyncServiceError

Register sends a registration message to be sent by an ESS

func (*HTTP) RegisterAck

func (communication *HTTP) RegisterAck(destination common.Destination) common.SyncServiceError

RegisterAck sends a registration acknowledgement message from the CSS

func (*HTTP) RegisterAsNew

func (communication *HTTP) RegisterAsNew(destination common.Destination) common.SyncServiceError

RegisterAsNew send a notification from a CSS to a ESS that the ESS has to send a registerNew message in order to register

func (*HTTP) RegisterNew

func (communication *HTTP) RegisterNew() common.SyncServiceError

RegisterNew sends a new registration message to be sent by an ESS

func (*HTTP) ResendObjects

func (communication *HTTP) ResendObjects() common.SyncServiceError

ResendObjects requests to resend all the relevant objects

func (*HTTP) SendAckResendObjects

func (communication *HTTP) SendAckResendObjects(destination common.Destination) common.SyncServiceError

SendAckResendObjects sends ack to resend objects request

func (*HTTP) SendData

func (communication *HTTP) SendData(orgID string, destType string, destID string, message []byte, chunked bool) common.SyncServiceError

SendData sends data from the CSS to the ESS or from the ESS to the CSS

func (*HTTP) SendErrorMessage

func (communication *HTTP) SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS

func (*HTTP) SendFeedbackMessage

func (communication *HTTP) SendFeedbackMessage(code int, retryInterval int32, reason string, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendFeedbackMessage sends a feedback message from the ESS to the CSS or from the CSS to the ESS

func (*HTTP) SendNotificationMessage

func (communication *HTTP) SendNotificationMessage(notificationTopic string, destType string, destID string, instanceID int64, dataID int64,
	metaData *common.MetaData) common.SyncServiceError

SendNotificationMessage sends a notification message from the CSS to the ESS or from the ESS to the CSS

func (*HTTP) SendPing

func (communication *HTTP) SendPing() common.SyncServiceError

SendPing sends a ping message from ESS to CSS

func (*HTTP) StartCommunication

func (communication *HTTP) StartCommunication() common.SyncServiceError

StartCommunication starts communications

func (*HTTP) StopCommunication

func (communication *HTTP) StopCommunication() common.SyncServiceError

StopCommunication stops communications

func (*HTTP) UnlockDataChunks

func (communication *HTTP) UnlockDataChunks(index uint32, metadata *common.MetaData)

UnlockDataChunks unlocks one of the data chunks locks

func (*HTTP) Unregister

func (communication *HTTP) Unregister() common.SyncServiceError

Unregister ESS

func (*HTTP) Unsubscribe

func (communication *HTTP) Unsubscribe() common.SyncServiceError

Unsubscribe unsubcribes the node from its MQTT subscriptions TODO: Maybe we should do something for HTTP too

func (*HTTP) UpdateOrganization

func (communication *HTTP) UpdateOrganization(org common.Organization, timestamp time.Time) common.SyncServiceError

UpdateOrganization adds or updates an organization

type MQTT

type MQTT struct {
	// contains filtered or unexported fields
}

MQTT is the struct for MQTT based communications between a CSS and an ESS

func (*MQTT) DeleteOrganization

func (communication *MQTT) DeleteOrganization(orgID string) common.SyncServiceError

DeleteOrganization removes an organization

func (*MQTT) GetData

func (communication *MQTT) GetData(metaData common.MetaData, offset int64) common.SyncServiceError

GetData requests data to be sent from the CSS to the ESS or from the ESS to the CSS

func (*MQTT) HandleRegAck

func (communication *MQTT) HandleRegAck()

HandleRegAck handles a registration acknowledgement message from the CSS

func (*MQTT) LockDataChunks

func (communication *MQTT) LockDataChunks(index uint32, metadata *common.MetaData)

LockDataChunks locks one of the data chunks locks

func (*MQTT) Register

func (communication *MQTT) Register() common.SyncServiceError

Register sends a registration message to be sent by an ESS or from the CSS to the ESS

func (*MQTT) RegisterAck

func (communication *MQTT) RegisterAck(destination common.Destination) common.SyncServiceError

RegisterAck sends a registration acknowledgement message from the CSS

func (*MQTT) RegisterAsNew

func (communication *MQTT) RegisterAsNew(destination common.Destination) common.SyncServiceError

RegisterAsNew send a notification from a CSS to a ESS that the ESS has to send a registerNew message in order to register

func (*MQTT) RegisterNew

func (communication *MQTT) RegisterNew() common.SyncServiceError

RegisterNew sends a new registration message to be sent by an ESS

func (*MQTT) ResendObjects

func (communication *MQTT) ResendObjects() common.SyncServiceError

ResendObjects requests to resend all the relevant objects

func (*MQTT) SendAckResendObjects

func (communication *MQTT) SendAckResendObjects(destination common.Destination) common.SyncServiceError

SendAckResendObjects sends ack to resend objects request

func (*MQTT) SendData

func (communication *MQTT) SendData(orgID string, destType string, destID string, message []byte, chunked bool) common.SyncServiceError

SendData sends data from the CSS to the ESS or from the ESS to the CSS

func (*MQTT) SendErrorMessage

func (communication *MQTT) SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS

func (*MQTT) SendFeedbackMessage

func (communication *MQTT) SendFeedbackMessage(code int, retryInterval int32, reason string, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendFeedbackMessage sends a feedback message from the ESS to the CSS or from the CSS to the ESS

func (*MQTT) SendNotificationMessage

func (communication *MQTT) SendNotificationMessage(notificationTopic string, destType string, destID string, instanceID int64, dataID int64,
	metaData *common.MetaData) common.SyncServiceError

SendNotificationMessage sends a notification message from the CSS to the ESS or from the ESS to the CSS

func (*MQTT) SendPing

func (communication *MQTT) SendPing() common.SyncServiceError

SendPing sends a ping message from ESS to CSS

func (*MQTT) StartCommunication

func (communication *MQTT) StartCommunication() common.SyncServiceError

StartCommunication starts communications

func (*MQTT) StopCommunication

func (communication *MQTT) StopCommunication() common.SyncServiceError

StopCommunication stops communications

func (*MQTT) UnlockDataChunks

func (communication *MQTT) UnlockDataChunks(index uint32, metadata *common.MetaData)

UnlockDataChunks unlocks one of the data chunks locks

func (*MQTT) Unregister

func (communication *MQTT) Unregister() common.SyncServiceError

Unregister ESS

func (*MQTT) UpdateOrganization

func (communication *MQTT) UpdateOrganization(org common.Organization, timestamp time.Time) common.SyncServiceError

UpdateOrganization adds or updates an organization

type ObjectWorkQueue

type ObjectWorkQueue struct {
	// contains filtered or unexported fields
}

func NewObjectWorkQueue

func NewObjectWorkQueue(bufferSize uint64) *ObjectWorkQueue

func (*ObjectWorkQueue) Close

func (q *ObjectWorkQueue) Close()

func (*ObjectWorkQueue) SendObjectToQueue

func (q *ObjectWorkQueue) SendObjectToQueue(objectInQueue common.ObjectInQueue)

type TestComm

type TestComm struct {
}

TestComm is a communicator used for unit testing

func (*TestComm) DeleteOrganization

func (communication *TestComm) DeleteOrganization(orgID string) common.SyncServiceError

DeleteOrganization removes an organization

func (*TestComm) GetData

func (communication *TestComm) GetData(metaData common.MetaData, offset int64) common.SyncServiceError

GetData requests data to be sent from the CSS to the ESS or from the ESS to the CSS

func (*TestComm) HandleRegAck

func (communication *TestComm) HandleRegAck()

HandleRegAck handles a registration acknowledgement message from the CSS

func (*TestComm) LockDataChunks

func (communication *TestComm) LockDataChunks(index uint32, metadata *common.MetaData)

LockDataChunks locks one of the data chunks locks

func (*TestComm) Register

func (communication *TestComm) Register() common.SyncServiceError

Register sends a registration message to be sent by an ESS

func (*TestComm) RegisterAck

func (communication *TestComm) RegisterAck(destination common.Destination) common.SyncServiceError

RegisterAck sends a registration acknowledgement message from the CSS

func (*TestComm) RegisterAsNew

func (communication *TestComm) RegisterAsNew(destination common.Destination) common.SyncServiceError

RegisterAsNew send a notification from a CSS to a ESS that the ESS has to send a registerNew message in order to register

func (*TestComm) RegisterNew

func (communication *TestComm) RegisterNew() common.SyncServiceError

RegisterNew sends a new registration message to be sent by an ESS

func (*TestComm) ResendObjects

func (communication *TestComm) ResendObjects() common.SyncServiceError

ResendObjects requests to resend all the relevant objects

func (*TestComm) SendAckResendObjects

func (communication *TestComm) SendAckResendObjects(destination common.Destination) common.SyncServiceError

SendAckResendObjects sends ack to resend objects request

func (*TestComm) SendData

func (communication *TestComm) SendData(orgID string, destType string, destID string, message []byte, chunked bool) common.SyncServiceError

SendData sends data from the CSS to the ESS or from the ESS to the CSS

func (*TestComm) SendErrorMessage

func (communication *TestComm) SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS

func (*TestComm) SendFeedbackMessage

func (communication *TestComm) SendFeedbackMessage(code int, retryInterval int32, reason string, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendFeedbackMessage sends a feedback message from the ESS to the CSS or from the CSS to the ESS

func (*TestComm) SendNotificationMessage

func (communication *TestComm) SendNotificationMessage(notificationTopic string, destType string,
	destID string, instanceID int64, dataID int64, metaData *common.MetaData) common.SyncServiceError

SendNotificationMessage sends a notification message from the CSS to the ESS or from the ESS to the CSS

func (*TestComm) SendPing

func (communication *TestComm) SendPing() common.SyncServiceError

SendPing sends a ping message from ESS to CSS

func (*TestComm) StartCommunication

func (communication *TestComm) StartCommunication() common.SyncServiceError

StartCommunication starts communications

func (*TestComm) StopCommunication

func (communication *TestComm) StopCommunication() common.SyncServiceError

StopCommunication stops communications

func (*TestComm) UnlockDataChunks

func (communication *TestComm) UnlockDataChunks(index uint32, metadata *common.MetaData)

UnlockDataChunks unlocks one of the data chunks locks

func (*TestComm) Unregister

func (communication *TestComm) Unregister() common.SyncServiceError

Unregister ESS

func (*TestComm) UpdateOrganization

func (communication *TestComm) UpdateOrganization(org common.Organization, timestamp time.Time) common.SyncServiceError

UpdateOrganization adds or updates an organization

type Wrapper

type Wrapper struct {
	// contains filtered or unexported fields
}

Wrapper is the struct for a wrapper around the MQTT and HTTP communications between the CSS and ESS

func NewWrapper

func NewWrapper(httpComm *HTTP, mqttComm *MQTT) *Wrapper

NewWrapper creates a new Wrapper struct

func (*Wrapper) DeleteOrganization

func (communication *Wrapper) DeleteOrganization(orgID string) common.SyncServiceError

DeleteOrganization removes an organization

func (*Wrapper) GetData

func (communication *Wrapper) GetData(metaData common.MetaData, offset int64) common.SyncServiceError

GetData requests data to be sent from the CSS to the ESS or from the ESS to the CSS

func (*Wrapper) HandleRegAck

func (communication *Wrapper) HandleRegAck()

HandleRegAck handles a registration acknowledgement message from the CSS

func (*Wrapper) LockDataChunks

func (communication *Wrapper) LockDataChunks(index uint32, metadata *common.MetaData)

LockDataChunks locks one of the data chunks locks

func (*Wrapper) Register

func (communication *Wrapper) Register() common.SyncServiceError

Register sends a registration message to be sent by an ESS

func (*Wrapper) RegisterAck

func (communication *Wrapper) RegisterAck(destination common.Destination) common.SyncServiceError

RegisterAck sends a registration acknowledgement message from the CSS

func (*Wrapper) RegisterAsNew

func (communication *Wrapper) RegisterAsNew(destination common.Destination) common.SyncServiceError

RegisterAsNew send a notification from a CSS to a ESS that the ESS has to send a registerNew message in order to register

func (*Wrapper) RegisterNew

func (communication *Wrapper) RegisterNew() common.SyncServiceError

RegisterNew sends a new registration message to be sent by an ESS

func (*Wrapper) ResendObjects

func (communication *Wrapper) ResendObjects() common.SyncServiceError

ResendObjects requests to resend all the relevant objects

func (*Wrapper) SendAckResendObjects

func (communication *Wrapper) SendAckResendObjects(destination common.Destination) common.SyncServiceError

SendAckResendObjects sends ack to resend objects request

func (*Wrapper) SendData

func (communication *Wrapper) SendData(orgID string, destType string, destID string, message []byte, chunked bool) common.SyncServiceError

SendData sends data from the CSS to the ESS or from the ESS to the CSS

func (*Wrapper) SendErrorMessage

func (communication *Wrapper) SendErrorMessage(err common.SyncServiceError, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendErrorMessage sends an error message from the ESS to the CSS or from the CSS to the ESS

func (*Wrapper) SendFeedbackMessage

func (communication *Wrapper) SendFeedbackMessage(code int, retryInterval int32, reason string, metaData *common.MetaData, sendToOrigin bool) common.SyncServiceError

SendFeedbackMessage sends a feedback message from the ESS to the CSS or from the CSS to the ESS

func (*Wrapper) SendNotificationMessage

func (communication *Wrapper) SendNotificationMessage(notificationTopic string, destType string, destID string, instanceID int64, dataID int64,
	metaData *common.MetaData) common.SyncServiceError

SendNotificationMessage sends a notification message from the CSS to the ESS or from the ESS to the CSS

func (*Wrapper) SendPing

func (communication *Wrapper) SendPing() common.SyncServiceError

SendPing sends a ping message from ESS to CSS

func (*Wrapper) StartCommunication

func (communication *Wrapper) StartCommunication() common.SyncServiceError

StartCommunication starts communications

func (*Wrapper) StopCommunication

func (communication *Wrapper) StopCommunication() common.SyncServiceError

StopCommunication stops communications

func (*Wrapper) UnlockDataChunks

func (communication *Wrapper) UnlockDataChunks(index uint32, metadata *common.MetaData)

UnlockDataChunks unlocks one of the data chunks locks

func (*Wrapper) Unregister

func (communication *Wrapper) Unregister() common.SyncServiceError

Unregister ESS TODO: implement Unregister() method for mqttCommunication

func (*Wrapper) UpdateOrganization

func (communication *Wrapper) UpdateOrganization(org common.Organization, timestamp time.Time) common.SyncServiceError

UpdateOrganization adds or updates an organization

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL