Documentation ¶
Overview ¶
Package net is responsible for abstracting away all of the inter-replica communication in the system. This module doesn't handle communication between replica and client.
This package provides a Demuxer and a Sender. The Demuxer is responsible for handling incoming connections, from which it receives messages and passes them on to the interested parties. The Sender is responsible for establishing connections to other replicas, as well as the sending of messages to other replicas.
For the demuxer to know which parties are interested in receiving different types of messages, they must be first registered before the networking subsystem starts up:
prepareChan := make(chan px.PrepareMsg, 8) a.dmx.RegisterChannel(prepareChan)
The communication from the Demuxer to an actor happens through the use of channels, as shown above. First a channel is created by the actor, in this case the channel is for receiving PrepareMsg messages. Then, the RegisterChannel method is called with the channel. This tells the Demuxer that if it receives any messages of the specified type, it will send it over this channel. Multiple channels of the same type may be registered with the Demuxer.
NB: The channels should all be registered before the networking subsystem is started up, no more channels should be registered after. This is due to the fact that the GxConnection goroutines pass messages to the Demuxer, and if channels are registered after network start-up, bad things could happen.
Index ¶
- Variables
- func AddToConnections(gc *GxConnection, lrArEnabled bool) error
- func CheckConnections(conf []grp.ID) (notconn []grp.ID)
- func IsSocketClosed(err error) bool
- func SetHeartbeatChan(hbChan chan<- grp.ID)
- func UpdateConnID(oldID grp.ID, newEpoch grp.Epoch) bool
- type Connection
- type Demuxer
- type GxConnection
- type IDExchange
- type IDResponse
- type MockDemuxer
- type Packet
- type Sender
- type TcpDemuxer
Constants ¶
This section is empty.
Variables ¶
var ( ErrIDIsEqual = errors.New("id is equal to this node") ErrIDOutOfBounds = errors.New("id is out of bounds for current cluster size") )
Functions ¶
func AddToConnections ¶
func AddToConnections(gc *GxConnection, lrArEnabled bool) error
Add a GxConnection to the connection map.
func IsSocketClosed ¶
func SetHeartbeatChan ¶
Types ¶
type Connection ¶
type Connection struct { Dec *gob.Decoder Enc *gob.Encoder // contains filtered or unexported fields }
A Connection represents a base connection between two replicas in Goxos.
func ConnectToAddr ¶
func ConnectToAddr(addr string) (*Connection, error)
Connect to another replica based on the address of the replica in the form hostname:port. Returns a Connection.
func ConnectToNode ¶
func ConnectToNode(node grp.Node) (*Connection, error)
Connect to another replica based on the address in the configuration file.
func GxConnectEphemeral ¶
func NewConnection ¶
func NewConnection(conn net.Conn) *Connection
Creates a new base connection between two replicas. The required argument is a low-level connection to another replica.
func NewMockConnection ¶
func NewMockConnection(conn io.ReadWriteCloser) *Connection
Create a new mock Connection, used for testing purposes.
func (*Connection) Read ¶
func (c *Connection) Read(msg interface{}) error
Read a message off of the connection. The message is placed in the location passed to the method. Returns nil or an error.
func (Connection) String ¶
func (c Connection) String() string
Returns a string-based representation of the Connection.
func (*Connection) Write ¶
func (c *Connection) Write(msg interface{}) error
Write a message to the connection. Returns nil or an error.
type Demuxer ¶
type Demuxer interface { Start() Stop() RegisterChannel(ch interface{}) HandleMessage(msg interface{}) }
type GxConnection ¶
type GxConnection struct { *Connection // contains filtered or unexported fields }
A GxConnection represents a connection between two replicas. A GxConnection is setup after the replica has been properly validated.
func GxConnectTo ¶
Connect to another replica, and verify the ids are correct. Returns a GxConnection.
func NewGxConnection ¶
func NewGxConnection(conn *Connection, id grp.ID, dmx Demuxer) *GxConnection
Create a new GxConnection. The low-level connection as well as the Goxos id and Demuxer must be passed in as arguments.
func (*GxConnection) Outgoing ¶
func (gc *GxConnection) Outgoing() chan<- interface{}
func (GxConnection) String ¶
func (gc GxConnection) String() string
Returns a string-based representation of the GxConnection.
type IDExchange ¶
The IdExchange message is used for verifying a replica in the Connection phase.
type IDResponse ¶
The IdResponse message is used for verifying a replica in the Connection phase.
type MockDemuxer ¶
type MockDemuxer struct{}
func NewMockDemuxer ¶
func NewMockDemuxer() *MockDemuxer
func (*MockDemuxer) HandleMessage ¶
func (dmx *MockDemuxer) HandleMessage(msg interface{})
func (*MockDemuxer) RegisterChannel ¶
func (dmx *MockDemuxer) RegisterChannel(ch interface{})
func (*MockDemuxer) RegisterFD ¶
func (dmx *MockDemuxer) RegisterFD() <-chan interface{}
func (*MockDemuxer) Start ¶
func (dmx *MockDemuxer) Start()
func (*MockDemuxer) Stop ¶
func (dmx *MockDemuxer) Stop()
type Packet ¶
A Packet contains a message, as well as the destination id of the replica. Used to send a unicast message to a replica.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
A Sender is responsible for outward communication from one replica to all others. Goroutines in a replica communicate with the Sender through channels, which are then sent out onto the network.
func NewSender ¶
func NewSender(id grp.ID, gm grp.GroupManager, outU <-chan Packet, outB, outP, outA, outL <-chan interface{}, dmx Demuxer, stopCheckIn *sync.WaitGroup) (snd *Sender)
Create a new Sender for the given replica id. Also passed in are channels which the sender receives messages from.
func (*Sender) InitialConnect ¶
func (snd *Sender) InitialConnect()
Start the initial connection phase, where the sender tries to connect to all other replicas in the system with a higher id than ours. We wait for connections from all other replicas with ids less than or equal to our own.
This function blocks until we establish connections to all replicas in the Goxos configuration.
type TcpDemuxer ¶
type TcpDemuxer struct {
// contains filtered or unexported fields
}
The Demuxer handles all of the incoming messages to a replica -- not including the client communication.
func NewTcpDemuxer ¶
func NewTcpDemuxer(id grp.ID, gm grp.GroupManager, stopCheckIn *sync.WaitGroup) *TcpDemuxer
NewDemuxer creates a new Demuxer for a replica. A valid id from the configuration must be passed in.
func (*TcpDemuxer) HandleMessage ¶
func (dmx *TcpDemuxer) HandleMessage(msg interface{})
Delegate handling of specific messages to a priori registered channels
func (*TcpDemuxer) RegisterChannel ¶
func (dmx *TcpDemuxer) RegisterChannel(ch interface{})
Register channel for receiving messages of the type defined by the channel
func (*TcpDemuxer) Start ¶
func (dmx *TcpDemuxer) Start()
Start handling new connections from other replicas.
func (*TcpDemuxer) Stop ¶
func (dmx *TcpDemuxer) Stop()
Shut the Demuxer down. Stops the main Demuxer goroutine.