Damrey provides a message-based distributed collaboration approach.
Damrey refers to the unit that processes the message as a worker.
The workflow of each worker is as follows:
message => worker => message
Workers work together to form a network. The typical network is as follows:
message(type_0) => worker_0 => worker_1 => worker_2 => worker_3
message(type_1) => worker_1 => worker_3 => worker_2
The processing of a message is done by a series of workers, different type of messages may have different collaboration lines.
The message format is as follows:
tag type [arg0] [arg1] ...
The tag is a pre-defined tag and currently has the following tags:
- [REQUEST] - user-defined request
- [RESPONSE] - user-defined response
- [ERROR] - an error occurred while processing the request
- [STOP] - worker has stopped, both type and arguments are empty
- [TIMEOUT] - request processing timeout, both type and arguments are empty
For REQUEST, RESPONSE and ERROR, type and arguments are user-defined.
programming interface
In order to create a worker, you need to call the following code:
w := worker.New(worker.Config{3, "test", 2 * time.Second}, logger.New(os.Stderr, nil))
The structure of config is as follows:
type Config struct {
Rc int
Name string
Timeout time.Duration
Rc is the number of attempts by the worker to push the message,
Name is the name of the worker,
and timeout is the time when the worker processes the message.
w is an interface provided by the worker is as follows:
type Worker interface {
SendChannel() chan<- *Message // channel should close after Stop by caller
RecvChannel() <-chan *Message // channel will automatically close after stop
you can run a worker with the following code:
you can stop a worker with the following code:
note. before stopping a worker, you need to remove the worker from the proxy.
Message is the message format accepted and send by the worker,
the structure is as follows:
type Message struct {
Msg []string
Rch chan []string
Timeout time.Duration
If you want to receive a reply(response, error, stop or timeout), you need to set Rch and
Timeout(which represents the maximum time limit for processing the Msg).
you can send a message with the following code:
msg := &Message{
Timeout: 1 * time.Second,
Rch: make(chan []string),
Msg: []string{REQUEST, "hello", "hello world"},
w.SendChannel() <- msg
resp := <- msg.Rch
you can recv a message with the following code:
msg := <- w.RecvChannel()
example code
testworker is a example module for send and recv message, code of testworker looks like follows:
type testWorker struct{
ch chan struct{}
wk worker.Worker
func New() *testWorker{
log := logger.New(os.Stderr, nil)
return &testWorker{
Log: log,
ch: make(chan struct{}),
wk: worker.New(worker.Config{3, "testWorker", 5 * time.Second}, log),
func (t *testWorker) Run() {
go t.wk.Run()
for {
select {
case <-t.ch:
t.ch <- struct{}{}
case msg := <-t.wk.RecvChannel():
msg.Rch <- t.process(msg.Msg)
case <-time.After(10 * time.Second):
t.wk.SendChannel() <- &message.Message{
Msg: []string{message.REQUEST, "test"},
func (t *testWorker) Stop() {
t.ch <- struct{}{}
func (t *testWorker) process(msg []string) []string {
return []string{message.ERROR, "don't accept any message"}
Proxy of damrey is used to maintain configuration information, monitoring information,
and alarms for each worker.
cd cmd/proxy
go build
./proxy -p port
The following is an example of a worker's configuration information:
"mpu": "100",
"name": "entry",
"agent": "",
"commands": [
{"name":"test", "worker": "testWorker", "mpu": "1"}
mpu is not used temporarily, name is the name of the worker,
agent is the address of the agent on the machine.
commands is a list of commands supported by the worker.
Each member consists of three elements, name is the type name of the command,
worker is the worker that accepts the result, and mpu is not used.
Not support now.
Not support now.
restful api
- add worker - Post - http://ip:port/worker/add - post body(worker's json, whole)
- del worker - Post - http://ip:port/worker/del - post body(worker's json, only name need)
- chg worker - Post - http://ip:port/worker/chg - post body(worker's json, whole)
- list workers - Get - http://ip:port/workers/get
- the current version uses a single database, you can replace it with a distributed database for robustness.
- in the current version, a registered worker is a unit of work, and in future worker will be a group of units of work.
- the reason why it is called proxy is because the service is mainly used to provide load balancing services,
although the current version of the service does not show this aspect of the service.
Each machine must have an agent, and then there is one or more workers,
the agent is used to communicate with remote agents and local workers.
cd cmd/agent
go build
./agent -p port -a ip:port -pr proxy's ip:port -r push retries
This example is used to solve for the square of the sum of two values.
There are three workers for this example, as follows:
http request => entry => add => square
cd example
sh example/run
cd example
sh example/stop