mdp

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 8, 2021 License: MIT Imports: 7 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// MdpcClient is the version of MDP/Client we implement
	MdpcClient = "MDPC01"

	// MdpwWorker is the version of MDP/Worker we implement
	MdpwWorker = "MDPW01"

	// HeartbeatLiveness is the number of heartbeat cycles a worker is deemed to be dead after, initially set to 3, 5
	// is reasonable
	HeartbeatLiveness = 3

	// HeartbeatInterval is the interval at which the broker sends heartbeats to workers, initially set to 2.500 ms
	HeartbeatInterval = 2500 * time.Millisecond

	// HeartbeatExpiry is the total duration for a worker until it is deemed to be dead
	HeartbeatExpiry = HeartbeatInterval * HeartbeatLiveness
)
View Source
const (
	MdpwReady = string(rune(iota + 1))
	MdpwRequest
	MdpwReply
	MdpwHeartbeat
	MdpwDisconnect
)

MDP/Server commands, as strings

View Source
const (
	// BrokerEvent indicates that the event originated from the broker.
	BrokerEvent = iota + 1

	// ClientEvent indicates that the event originated from the client.
	ClientEvent

	// WorkerEvent indicates that the event originated from the worker.
	WorkerEvent
)

Variables

View Source
var (
	// MdpsCommands are the commands that are understood by the broker devices
	MdpsCommands = map[string]string{
		MdpwReady:      "READY",
		MdpwRequest:    "REQUEST",
		MdpwReply:      "REPLY",
		MdpwHeartbeat:  "HEARTBEAT",
		MdpwDisconnect: "DISCONNECT",
	}
)

Functions

This section is empty.

Types

type Broker

type Broker struct {
	Socket *czmq.Sock // Socket for clients & workers

	Waiting     []*brokerWorker // list of waiting workers
	HeartbeatAt time.Time       // when to send HEARTBEAT

	ErrorChannel chan error
	EventChannel chan Event
	// contains filtered or unexported fields
}

Broker defines a single broker instance

func NewBroker

func NewBroker(endpoint string) (broker *Broker, err error)

NewBroker creates a new broker instance.

func (*Broker) Bind

func (b *Broker) Bind() (err error)

Bind the broker instance to an endpoint. We can call this multiple times. Note that MDP uses a single socket for both clients and workers:

func (*Broker) ClientMsg

func (b *Broker) ClientMsg(sender string, msg []string)

ClientMsg processes a request coming from a client. We implement MMI requests directly here (at present, we implement only the mmi.service request).

func (*Broker) Close

func (b *Broker) Close() (err error)

Close is used to terminate the broker socket.

func (*Broker) GetWorkerInfo

func (b *Broker) GetWorkerInfo() []WorkerInfo

GetWorkerInfo is used to request all information about connected workers.

func (*Broker) Purge

func (b *Broker) Purge()

Purge deletes any idle workers that haven't pinged us in a while. We hold workers from oldest to most recent, so we can stop scanning whenever we find a live worker. This means we'll mainly stop at the first worker, which is essential when we have large numbers of workers (since we call this method in our critical path).

func (*Broker) Run

func (b *Broker) Run(done chan bool)

Run the service

func (*Broker) ServiceRequire

func (b *Broker) ServiceRequire(serviceFrame string) (service *Service)

ServiceRequire is a lazy constructor that locates a service by name, or creates a new service if there is no service already with that name.

func (*Broker) WorkerMsg

func (b *Broker) WorkerMsg(sender string, msg []string)

WorkerMsg processes one READY, REPLY, HEARTBEAT or DISCONNECT message sent to the broker by a worker.

type BrokerMetrics

type BrokerMetrics struct {
}

BrokerMetrics holds metric values about a broker instance.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client defines a single MDP client instance.

func NewClient

func NewClient(broker string) (c *Client, err error)

NewClient creates a new instance of an MDP client.

func (*Client) Close

func (c *Client) Close() (err error)

Close the client socket.

func (*Client) ConnectToBroker

func (c *Client) ConnectToBroker() (err error)

ConnectToBroker is used to connect or reconnect to a broker. In this asynchronous class we use a DEALER socket instead of a REQ socket; this lets us send any number of requests without waiting for a reply.

func (*Client) Recv

func (c *Client) Recv() (msg []string, err error)

Recv waits for a reply message and returns that to the caller. Returns the reply message or NULL if there was no reply. Does not attempt to recover from a broker failure, this is not possible without storing all unanswered requests and resending them all...

func (*Client) Send

func (c *Client) Send(service string, request ...string) (err error)

Send just sends one message, without waiting for a reply. Since we're using a DEALER socket we have to send an empty frame at the start, to create the same envelope that the REQ socket would normally make for us:

func (*Client) SetTimeout

func (c *Client) SetTimeout(timeout time.Duration)

SetTimeout requests the timeout.

type ClientMetrics

type ClientMetrics struct {
}

ClientMetrics holds metric values about a single client instance.

type Event

type Event struct {
	Type    int    `json:"type"`
	Message string `json:"message"`
}

Event instances are passed up through a channel.

func NewBrokerEvent

func NewBrokerEvent(message string) Event

NewBrokerEvent instantiates an event with the type set to broker.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service defines a single service instance.

func (*Service) Dispatch

func (s *Service) Dispatch(msg []string)

Dispatch sends requests to waiting workers.

type ServiceMetrics

type ServiceMetrics struct {
}

ServiceMetrics holds metrics for a service that is connected to a broker and one or more workers.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker defines a single MDP worker instance.

func NewWorker

func NewWorker(broker, service string) (w *Worker, err error)

NewWorker creates a new instance of the worker class.

func (*Worker) Close

func (w *Worker) Close()

Close the worker socket.

func (*Worker) ConnectToBroker

func (w *Worker) ConnectToBroker() (err error)

ConnectToBroker connects or reconnects to the broker.

func (*Worker) Recv

func (w *Worker) Recv(reply []string) (msg []string, err error)

Recv send a reply, if any, to broker and waits for the next request.

func (*Worker) SendToBroker

func (w *Worker) SendToBroker(command string, option string, msg []string) (err error)

SendToBroker sends a message to the broker.

func (*Worker) SetHeartbeat

func (w *Worker) SetHeartbeat(heartbeat time.Duration)

SetHeartbeat sets the heartbeat delay.

func (*Worker) SetReconnect

func (w *Worker) SetReconnect(reconnect time.Duration)

SetReconnect sets the reconnection delay.

type WorkerInfo

type WorkerInfo struct {
	ID            string `json:"id"`
	Identity      string `json:"identity"`
	ServiceName   string `json:"service-name"`
	TotalRequests int64  `json:"total-requests"`
}

WorkerInfo is used to return certain information about a worker.

type WorkerMetrics

type WorkerMetrics struct {
}

WorkerMetrics holds metric values about a single worker instance.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL