Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Bind() (err error)
- func (b *Broker) ClientMsg(sender string, msg []string)
- func (b *Broker) Close() (err error)
- func (b *Broker) GetWorkerInfo() []WorkerInfo
- func (b *Broker) Purge()
- func (b *Broker) Run(done chan bool)
- func (b *Broker) ServiceRequire(serviceFrame string) (service *Service)
- func (b *Broker) WorkerMsg(sender string, msg []string)
- type BrokerMetrics
- type Client
- type ClientMetrics
- type Event
- type Service
- type ServiceMetrics
- type Worker
- func (w *Worker) Close()
- func (w *Worker) ConnectToBroker() (err error)
- func (w *Worker) Recv(reply []string) (msg []string, err error)
- func (w *Worker) SendToBroker(command string, option string, msg []string) (err error)
- func (w *Worker) SetHeartbeat(heartbeat time.Duration)
- func (w *Worker) SetReconnect(reconnect time.Duration)
- type WorkerInfo
- type WorkerMetrics
Constants ¶
const ( // MdpcClient is the version of MDP/Client we implement MdpcClient = "MDPC01" // MdpwWorker is the version of MDP/Worker we implement MdpwWorker = "MDPW01" // HeartbeatLiveness is the number of heartbeat cycles a worker is deemed to be dead after, initially set to 3, 5 // is reasonable HeartbeatLiveness = 3 // HeartbeatInterval is the interval at which the broker sends heartbeats to workers, initially set to 2.500 ms HeartbeatInterval = 2500 * time.Millisecond // HeartbeatExpiry is the total duration for a worker until it is deemed to be dead HeartbeatExpiry = HeartbeatInterval * HeartbeatLiveness )
const ( MdpwReady = string(rune(iota + 1)) MdpwRequest MdpwReply MdpwHeartbeat MdpwDisconnect )
MDP/Server commands, as strings
const ( // BrokerEvent indicates that the event originated from the broker. BrokerEvent = iota + 1 // ClientEvent indicates that the event originated from the client. ClientEvent // WorkerEvent indicates that the event originated from the worker. WorkerEvent )
Variables ¶
var ( // MdpsCommands are the commands that are understood by the broker devices MdpsCommands = map[string]string{ MdpwReady: "READY", MdpwRequest: "REQUEST", MdpwReply: "REPLY", MdpwHeartbeat: "HEARTBEAT", MdpwDisconnect: "DISCONNECT", } )
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct { Socket *czmq.Sock // Socket for clients & workers Waiting []*brokerWorker // list of waiting workers HeartbeatAt time.Time // when to send HEARTBEAT ErrorChannel chan error EventChannel chan Event // contains filtered or unexported fields }
Broker defines a single broker instance
func (*Broker) Bind ¶
Bind the broker instance to an endpoint. We can call this multiple times. Note that MDP uses a single socket for both clients and workers:
func (*Broker) ClientMsg ¶
ClientMsg processes a request coming from a client. We implement MMI requests directly here (at present, we implement only the mmi.service request).
func (*Broker) GetWorkerInfo ¶
func (b *Broker) GetWorkerInfo() []WorkerInfo
GetWorkerInfo is used to request all information about connected workers.
func (*Broker) Purge ¶
func (b *Broker) Purge()
Purge deletes any idle workers that haven't pinged us in a while. We hold workers from oldest to most recent, so we can stop scanning whenever we find a live worker. This means we'll mainly stop at the first worker, which is essential when we have large numbers of workers (since we call this method in our critical path).
func (*Broker) ServiceRequire ¶
ServiceRequire is a lazy constructor that locates a service by name, or creates a new service if there is no service already with that name.
type BrokerMetrics ¶
type BrokerMetrics struct { }
BrokerMetrics holds metric values about a broker instance.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client defines a single MDP client instance.
func (*Client) ConnectToBroker ¶
ConnectToBroker is used to connect or reconnect to a broker. In this asynchronous class we use a DEALER socket instead of a REQ socket; this lets us send any number of requests without waiting for a reply.
func (*Client) Recv ¶
Recv waits for a reply message and returns that to the caller. Returns the reply message or NULL if there was no reply. Does not attempt to recover from a broker failure, this is not possible without storing all unanswered requests and resending them all...
func (*Client) Send ¶
Send just sends one message, without waiting for a reply. Since we're using a DEALER socket we have to send an empty frame at the start, to create the same envelope that the REQ socket would normally make for us:
func (*Client) SetTimeout ¶
SetTimeout requests the timeout.
type ClientMetrics ¶
type ClientMetrics struct { }
ClientMetrics holds metric values about a single client instance.
type Event ¶
Event instances are passed up through a channel.
func NewBrokerEvent ¶
NewBrokerEvent instantiates an event with the type set to broker.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service defines a single service instance.
type ServiceMetrics ¶
type ServiceMetrics struct { }
ServiceMetrics holds metrics for a service that is connected to a broker and one or more workers.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker defines a single MDP worker instance.
func (*Worker) ConnectToBroker ¶
ConnectToBroker connects or reconnects to the broker.
func (*Worker) SendToBroker ¶
SendToBroker sends a message to the broker.
func (*Worker) SetHeartbeat ¶
SetHeartbeat sets the heartbeat delay.
func (*Worker) SetReconnect ¶
SetReconnect sets the reconnection delay.
type WorkerInfo ¶
type WorkerInfo struct { ID string `json:"id"` Identity string `json:"identity"` ServiceName string `json:"service-name"` TotalRequests int64 `json:"total-requests"` }
WorkerInfo is used to return certain information about a worker.
type WorkerMetrics ¶
type WorkerMetrics struct { }
WorkerMetrics holds metric values about a single worker instance.