p2p

package
v0.0.0-...-49d3c07 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewMessageServer

func NewMessageServer(
	selfID NodeID,
	_credential *security.Credential,
	opts ...MessageServerOpt,
) *p2pImpl.MessageServer

NewMessageServer creates a new message server from given configs

Types

type Config

Config alias to p2pImpl.MessageServerConfig

type HandlerFunc

type HandlerFunc = func(sender NodeID, value MessageValue) error

HandlerFunc alias to message handler function

type MessageHandlerManager

type MessageHandlerManager interface {
	RegisterHandler(ctx context.Context, topic Topic, tpi TypeInformation, fn HandlerFunc) (bool, error)
	UnregisterHandler(ctx context.Context, topic Topic) (bool, error)
	CheckError(ctx context.Context) error

	// Clean unregisters all existing handlers.
	Clean(ctx context.Context) error

	// SetTimeout sets the timeout for handler operations.
	// A timeout is needed because the underlying handler operations are
	// asynchronous in the MessageServer.
	SetTimeout(timeout time.Duration)
}

MessageHandlerManager is for managing message topic handlers. NOTE: for each topic, only one handler is allowed.

type MessageRPCService

type MessageRPCService struct {
	// contains filtered or unexported fields
}

MessageRPCService is a background service wrapping a MessageServer instance.

func NewDependentMessageRPCService

func NewDependentMessageRPCService(
	selfID NodeID,
	_credential *security.Credential,
	grpcSvr *grpc.Server,
	opts ...MessageServerOpt,
) (*MessageRPCService, error)

NewDependentMessageRPCService creates a new MessageRPCService that DOES NOT own a `grpc.Server`. TODO refactor the design.

func NewMessageRPCService

func NewMessageRPCService(
	selfID NodeID,
	_credential *security.Credential,
	opts ...MessageServerOpt,
) (*MessageRPCService, error)

NewMessageRPCService creates a new MessageRPCService. Note: TLS is not supported for now.

func NewMessageRPCServiceWithRPCServer

func NewMessageRPCServiceWithRPCServer(
	selfID NodeID,
	_credential *security.Credential,
	grpcSvr *grpc.Server,
	opts ...MessageServerOpt,
) *MessageRPCService

NewMessageRPCServiceWithRPCServer creates a new MessageRPCService with an existing gRPC server.

func (*MessageRPCService) GetMessageServer

func (s *MessageRPCService) GetMessageServer() *p2pImpl.MessageServer

GetMessageServer returns the internal message server

func (*MessageRPCService) MakeHandlerManager

func (s *MessageRPCService) MakeHandlerManager() MessageHandlerManager

MakeHandlerManager returns a MessageHandlerManager

func (*MessageRPCService) Serve

Serve listens on `l` and creates the background goroutine for the message server.

type MessageRouter

type MessageRouter = p2p.MessageRouter

MessageRouter alias to p2p.MessageRouter

func NewMessageRouter

func NewMessageRouter(nodeID NodeID, advertisedAddr string) MessageRouter

NewMessageRouter creates a new MessageRouter instance via tiflow p2p API

type MessageSender

type MessageSender interface {

	// SendToNode sends a message to a given node. Returns whether it is successful and a possible error.
	// A `would-block` error will not be returned. (false, nil) would be returned instead.
	SendToNode(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) (bool, error)

	// SendToNodeB sends a message to a given node in a blocking way
	SendToNodeB(ctx context.Context, targetNodeID NodeID, topic Topic, message interface{}) error
}

MessageSender is used to send a message of a given topic to a given node.

func NewMessageSender

func NewMessageSender(router MessageRouter) MessageSender

NewMessageSender returns a new message sender.

type MessageServerOpt

type MessageServerOpt = func(*Config)

MessageServerOpt alias to the option setter function

type MessageValue

type MessageValue = interface{}

MessageValue is used to hold message object

type MockMessageHandlerManager

type MockMessageHandlerManager struct {
	// contains filtered or unexported fields
}

MockMessageHandlerManager is used in unit-test only, it simulates a message handler manager

func NewMockMessageHandlerManager

func NewMockMessageHandlerManager() *MockMessageHandlerManager

NewMockMessageHandlerManager creates a new MockMessageHandlerManager instance

func (*MockMessageHandlerManager) AssertHasHandler

func (m *MockMessageHandlerManager) AssertHasHandler(t *testing.T, topic Topic, tpi TypeInformation)

AssertHasHandler checks the given topic is registered

func (*MockMessageHandlerManager) AssertNoHandler

func (m *MockMessageHandlerManager) AssertNoHandler(t *testing.T, topic Topic)

AssertNoHandler checks the given topic is not registered

func (*MockMessageHandlerManager) CheckError

func (m *MockMessageHandlerManager) CheckError(ctx context.Context) error

CheckError implements MessageHandlerManager.CheckError

func (*MockMessageHandlerManager) Clean

Clean implements MessageHandlerManager.Clean

func (*MockMessageHandlerManager) InjectError

func (m *MockMessageHandlerManager) InjectError(err error)

InjectError injects an error into the mock message handler

func (*MockMessageHandlerManager) InvokeHandler

func (m *MockMessageHandlerManager) InvokeHandler(t *testing.T, topic Topic, senderID NodeID, message interface{}) error

InvokeHandler gets the handler of given topic and invoke the handler to simulate to send message from given sender

func (*MockMessageHandlerManager) RegisterHandler

func (m *MockMessageHandlerManager) RegisterHandler(
	ctx context.Context,
	topic Topic,
	tpi TypeInformation,
	fn HandlerFunc,
) (bool, error)

RegisterHandler implements MessageHandlerManager.RegisterHandler

func (*MockMessageHandlerManager) SetTimeout

func (m *MockMessageHandlerManager) SetTimeout(timeout time.Duration)

SetTimeout implements MessageHandlerManager.SetTimeout

func (*MockMessageHandlerManager) UnregisterHandler

func (m *MockMessageHandlerManager) UnregisterHandler(ctx context.Context, topic Topic) (bool, error)

UnregisterHandler implements MessageHandlerManager.UnregisterHandler

type MockMessageSender

type MockMessageSender struct {
	// contains filtered or unexported fields
}

MockMessageSender defines a mock message sender

func NewMockMessageSender

func NewMockMessageSender() *MockMessageSender

NewMockMessageSender creates a new MockMessageSender instance

func (*MockMessageSender) InjectError

func (m *MockMessageSender) InjectError(err error)

InjectError injects error to simulate error scenario

func (*MockMessageSender) MarkNodeOffline

func (m *MockMessageSender) MarkNodeOffline(nodeID NodeID)

MarkNodeOffline marks a node as offline.

func (*MockMessageSender) MarkNodeOnline

func (m *MockMessageSender) MarkNodeOnline(nodeID NodeID)

MarkNodeOnline marks a node as online. Note that by default all nodes are treated as online to facilitate testing.

func (*MockMessageSender) SendToNode

func (m *MockMessageSender) SendToNode(
	_ context.Context,
	targetNodeID NodeID,
	topic Topic,
	message interface{},
) (bool, error)

SendToNode implements pkg/p2p.MessageSender.SendToNode

func (*MockMessageSender) SendToNodeB

func (m *MockMessageSender) SendToNodeB(
	ctx context.Context,
	targetNodeID NodeID,
	topic Topic,
	message interface{},
) error

SendToNodeB implements pkg/p2p.MessageSender.SendToNodeB

func (*MockMessageSender) SetBlocked

func (m *MockMessageSender) SetBlocked(isBlocked bool)

SetBlocked makes the message send blocking

func (*MockMessageSender) TryPop

func (m *MockMessageSender) TryPop(targetNodeID NodeID, topic Topic) (interface{}, bool)

TryPop tries to get a message from message sender

type NodeID

type NodeID = p2pImpl.NodeID

NodeID alias to p2pImpl.NodeID

type Topic

type Topic = p2pImpl.Topic

Topic alias to p2pImpl.Topic

type TypeInformation

type TypeInformation = interface{}

TypeInformation is used to hold type data

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL