datachannel

package
v0.0.0-...-955c50f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 25 Imported by: 10

Documentation

Overview

Package datachannel implements data channel which is used to interactively run commands.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DataChannel

type DataChannel struct {
	Service    service.Service
	ChannelId  string
	ClientId   string
	InstanceId string
	Role       string
	Pause      bool
	//records sequence number of last acknowledged message received over data channel
	ExpectedSequenceNumber int64
	//records sequence number of last stream data message sent over data channel
	StreamDataSequenceNumber int64
	//ensure only one goroutine sending message with current StreamDataSequenceNumber in data channel
	//check carefully for deadlock when not reusing of SendStreamDataMessage
	StreamDataSequenceNumberMutex *sync.Mutex
	//buffer to store outgoing stream messages until acknowledged
	//using linked list for this buffer as access to oldest message is required and it support faster deletion from any position of list
	OutgoingMessageBuffer ListMessageBuffer
	//buffer to store incoming stream messages if received out of sequence
	//using map for this buffer as incoming messages can be out of order and retrieval would be faster by sequenceId
	IncomingMessageBuffer MapMessageBuffer
	//round trip time of latest acknowledged message
	RoundTripTime float64
	//round trip time variation of latest acknowledged message
	RoundTripTimeVariation float64
	//timeout used for resending unacknowledged message
	RetransmissionTimeout time.Duration
	// contains filtered or unexported fields
}

DataChannel used for session communication between the message gateway service and the agent.

func NewDataChannel

func NewDataChannel(context context.T,
	channelId string,
	clientId string,
	inputStreamMessageHandler InputStreamMessageHandler,
	cancelFlag task.CancelFlag) (*DataChannel, error)

NewDataChannel constructs datachannel objects.

func (*DataChannel) AddDataToIncomingMessageBuffer

func (dataChannel *DataChannel) AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)

AddDataToIncomingMessageBuffer adds given message to IncomingMessageBuffer if it has capacity.

func (*DataChannel) AddDataToOutgoingMessageBuffer

func (dataChannel *DataChannel) AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)

AddDataToOutgoingMessageBuffer adds given message at the end of OutputMessageBuffer if it has capacity.

func (*DataChannel) Close

func (dataChannel *DataChannel) Close(log log.T) error

Close closes datachannel - its web socket connection.

func (*DataChannel) GetClientVersion

func (dataChannel *DataChannel) GetClientVersion() string

GetClientVersion returns version of the client

func (*DataChannel) GetInstanceId

func (dataChannel *DataChannel) GetInstanceId() string

GetInstanceId returns id of the target

func (*DataChannel) GetRegion

func (dataChannel *DataChannel) GetRegion() string

GetRegion returns aws region of the target

func (*DataChannel) GetSeparateOutputPayload

func (dataChannel *DataChannel) GetSeparateOutputPayload() bool

GetSeparateOutputPayload returns boolean value indicating separate stdout/stderr output for non-interactive session or not

func (*DataChannel) Initialize

func (dataChannel *DataChannel) Initialize(context context.T,
	mgsService service.Service,
	sessionId string,
	clientId string,
	instanceId string,
	role string,
	cancelFlag task.CancelFlag,
	inputStreamMessageHandler InputStreamMessageHandler)

Initialize populates datachannel object.

func (*DataChannel) IsActive

func (dataChannel *DataChannel) IsActive() bool

IsActive returns a boolean value indicating the datachannel is actively listening and communicating with service

func (*DataChannel) Open

func (dataChannel *DataChannel) Open(log log.T) error

Open opens the websocket connection and sends the token for service to acknowledge the connection.

func (*DataChannel) PerformHandshake

func (dataChannel *DataChannel) PerformHandshake(log log.T,
	kmsKeyId string,
	encryptionEnabled bool,
	sessionTypeRequest mgsContracts.SessionTypeRequest) (err error)

PerformHandshake performs handshake to share version string and encryption information with clients like cli/console

func (*DataChannel) PrepareToCloseChannel

func (dataChannel *DataChannel) PrepareToCloseChannel(log log.T)

PrepareToCloseChannel waits for all messages to be sent to MGS

func (*DataChannel) ProcessAcknowledgedMessage

func (dataChannel *DataChannel) ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent mgsContracts.AcknowledgeContent)

ProcessAcknowledgedMessage processes acknowledge messages by deleting them from OutgoingMessageBuffer.

func (*DataChannel) Reconnect

func (dataChannel *DataChannel) Reconnect(log log.T) error

Reconnect reconnects datachannel to service endpoint.

func (*DataChannel) RemoveDataFromIncomingMessageBuffer

func (dataChannel *DataChannel) RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)

RemoveDataFromIncomingMessageBuffer removes given sequence number message from IncomingMessageBuffer.

func (*DataChannel) RemoveDataFromOutgoingMessageBuffer

func (dataChannel *DataChannel) RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)

RemoveDataFromOutgoingMessageBuffer removes given element from OutgoingMessageBuffer.

func (*DataChannel) ResendStreamDataMessageScheduler

func (dataChannel *DataChannel) ResendStreamDataMessageScheduler(log log.T) error

ResendStreamDataMessageScheduler spawns a separate go thread which keeps checking OutgoingMessageBuffer at fixed interval and resends first message if time elapsed since lastSentTime of the message is more than acknowledge wait time

func (*DataChannel) SendAcknowledgeMessage

func (dataChannel *DataChannel) SendAcknowledgeMessage(log log.T, streamDataMessage mgsContracts.AgentMessage) error

SendAcknowledgeMessage sends acknowledge message for stream data over data channel

func (*DataChannel) SendAgentSessionStateMessage

func (dataChannel *DataChannel) SendAgentSessionStateMessage(log log.T, sessionStatus mgsContracts.SessionStatus) error

SendAgentSessionStateMessage sends agent session state to MGS

func (*DataChannel) SendMessage

func (dataChannel *DataChannel) SendMessage(log log.T, input []byte, inputType int) error

SendMessage sends a message to the service through datachannel.

func (*DataChannel) SendStreamDataMessage

func (dataChannel *DataChannel) SendStreamDataMessage(log log.T, payloadType mgsContracts.PayloadType, inputData []byte) (err error)

SendStreamDataMessage sends a data message in a form of AgentMessage for streaming.

func (*DataChannel) SetSeparateOutputPayload

func (dataChannel *DataChannel) SetSeparateOutputPayload(separateOutputPayload bool)

SetSeparateOutputPayload set separateOutputPayload value

func (*DataChannel) SetWebSocket

func (dataChannel *DataChannel) SetWebSocket(context context.T,
	mgsService service.Service,
	sessionId string,
	clientId string,
	onMessageHandler func(input []byte)) error

SetWebSocket populates webchannel object.

func (*DataChannel) SkipHandshake

func (dataChannel *DataChannel) SkipHandshake(log log.T)

SkipHandshake is used to skip handshake if the plugin decides it is not necessary

type Handshake

type Handshake struct {
	// contains filtered or unexported fields
}

type IDataChannel

type IDataChannel interface {
	Initialize(context context.T, mgsService service.Service, sessionId string, clientId string, instanceId string, role string, cancelFlag task.CancelFlag, inputStreamMessageHandler InputStreamMessageHandler)
	SetWebSocket(context context.T, mgsService service.Service, sessionId string, clientId string, onMessageHandler func(input []byte)) error
	Open(log log.T) error
	Close(log log.T) error
	Reconnect(log log.T) error
	SendMessage(log log.T, input []byte, inputType int) error
	SendStreamDataMessage(log log.T, dataType mgsContracts.PayloadType, inputData []byte) error
	ResendStreamDataMessageScheduler(log log.T) error
	ProcessAcknowledgedMessage(log log.T, acknowledgeMessageContent mgsContracts.AcknowledgeContent)
	SendAcknowledgeMessage(log log.T, agentMessage mgsContracts.AgentMessage) error
	SendAgentSessionStateMessage(log log.T, sessionStatus mgsContracts.SessionStatus) error
	AddDataToOutgoingMessageBuffer(streamMessage StreamingMessage)
	RemoveDataFromOutgoingMessageBuffer(streamMessageElement *list.Element)
	AddDataToIncomingMessageBuffer(streamMessage StreamingMessage)
	RemoveDataFromIncomingMessageBuffer(sequenceNumber int64)
	SkipHandshake(log log.T)
	PerformHandshake(log log.T, kmsKeyId string, encryptionEnabled bool, sessionTypeRequest mgsContracts.SessionTypeRequest) (err error)
	GetClientVersion() string
	GetInstanceId() string
	GetRegion() string
	IsActive() bool
	PrepareToCloseChannel(log log.T)
	GetSeparateOutputPayload() bool
	SetSeparateOutputPayload(separateOutputPayload bool)
}

type InputStreamMessageHandler

type InputStreamMessageHandler func(log log.T, streamDataMessage mgsContracts.AgentMessage) error

type ListMessageBuffer

type ListMessageBuffer struct {
	Messages *list.List
	Capacity int
	Mutex    *sync.Mutex
}

type MapMessageBuffer

type MapMessageBuffer struct {
	Messages map[int64]StreamingMessage
	Capacity int
	Mutex    *sync.Mutex
}

type StreamingMessage

type StreamingMessage struct {
	Content        []byte
	SequenceNumber int64
	LastSentTime   time.Time
}

Directories

Path Synopsis
Code generated by mockery 2.7.4.
Code generated by mockery 2.7.4.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL