damrey

module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 11, 2019 License: GPL-3.0

README

Damrey

Damrey provides a message-based distributed collaboration approach.

Worker

Damrey refers to the unit that processes the message as a worker. The workflow of each worker is as follows:

    message => worker => message

Workers work together to form a network. The typical network is as follows:

    message(type_0) => worker_0 =>  worker_1 => worker_2 => worker_3
    message(type_1) => worker_1 =>  worker_3 => worker_2

The processing of a message is done by a series of workers, different type of messages may have different collaboration lines.

The message format is as follows:

tag type [arg0] [arg1] ...

The tag is a pre-defined tag and currently has the following tags:

  • [REQUEST] - user-defined request
  • [RESPONSE] - user-defined response
  • [ERROR] - an error occurred while processing the request
  • [STOP] - worker has stopped, both type and arguments are empty
  • [TIMEOUT] - request processing timeout, both type and arguments are empty

For REQUEST, RESPONSE and ERROR, type and arguments are user-defined.

programming interface

In order to create a worker, you need to call the following code:

w := worker.New(worker.Config{3, "test", 2 * time.Second}, logger.New(os.Stderr))

The structure of config is as follows:

type Config struct {
	Rc      int
	Name    string
	Timeout time.Duration
}

Rc is the number of attempts by the worker to push the message, Name is the name of the worker, and timeout is the time when the worker processes the message.

w is an interface provided by the worker is as follows:

type Worker interface {
	Run()
	Stop()
	SendChannel() chan<- *Message // channel should close after Stop by caller
	RecvChannel() <-chan *Message // channel will automatically close after stop
}

you can run a worker with the following code:

w.Run()

you can stop a worker with the following code:

w.Stop()

note. before stopping a worker, you need to remove the worker from the proxy.

Message is the message format accepted and send by the worker, the structure is as follows:

type Message struct {
	Msg     []string
	Rch     chan []string
	Timeout time.Duration
}

If you want to receive a reply(response, error, stop or timeout), you need to set Rch and Timeout(which represents the maximum time limit for processing the Msg).

you can send a message with the following code:

msg := &Message{
    Timeout: 1 * time.Second,
    Rch: make(chan []string),
    Msg: []string{REQUEST, "hello", "hello world"}, 
}
w.SendChannel() <- msg
resp := <- msg.Rch

you can recv a message with the following code:

msg := <- w.RecvChannel()
example code

testworker is a example module for send and recv message, code of testworker looks like follows:

type testWorker struct{
    logger.Log
    ch chan struct{}
    wk worker.Worker
}

func New() *testWorker{
    log := logger.New(os.Stderr)
    return &testWorker{
        Log: log, 
        ch: make(chan struct{}), 
        wk: worker.New(worker.Config{3, "testWorker", 5 * time.Second}, log),
    }
}

func (t *testWorker) Run() {
	go t.wk.Run()
	for {
		select {
		case <-t.ch:
			t.wk.Stop()
			t.ch <- struct{}{}
			return
		case msg := <-t.wk.RecvChannel():
			msg.Rch <- t.process(msg.Msg)
		case <-time.After(10 * time.Second):
            t.wk.SendChannel() <- &message.Message{
                Msg: []string{message.REQUEST, "test"},
            }
		}
	}
}

func (t *testWorker) Stop() {
	t.ch <- struct{}{}
	<-t.ch
	close(t.ch)
}

func (t *testWorker) process(msg []string) []string {
	return []string{message.ERROR, "don't accept any message"}
}

Proxy

Proxy of damrey is used to maintain configuration information, monitoring information, and alarms for each worker.

start
cd cmd/proxy
go build
./proxy -p port
configuration information

The following is an example of a worker's configuration information:

{
    "mpu": "100",
    "name": "entry",
    "agent": "10.1.2.3:8080",
    "commands": [
        {"name":"test", "worker": "testWorker", "mpu": "1"}
    ]
}

mpu is not used temporarily, name is the name of the worker, agent is the address of the agent on the machine. commands is a list of commands supported by the worker. Each member consists of three elements, name is the type name of the command, worker is the worker that accepts the result, and mpu is not used.

monitoring information

Not support now.

alarms

Not support now.

restful api
  • add worker - Post - http://ip:port/worker/add - post body(worker's json, whole)
  • del worker - Post - http://ip:port/worker/del - post body(worker's json, only name need)
  • chg worker - Post - http://ip:port/worker/chg - post body(worker's json, whole)
  • list workers - Get - http://ip:port/workers/get
note
  • the current version uses a single database, you can replace it with a distributed database for robustness.
  • in the current version, a registered worker is a unit of work, and in future worker will be a group of units of work.
  • the reason why it is called proxy is because the service is mainly used to provide load balancing services, although the current version of the service does not show this aspect of the service.

Agent

Each machine must have an agent, and then there is one or more workers, the agent is used to communicate with remote agents and local workers.

start
cd cmd/agent
go build
./agent -p port -a ip:port -pr proxy's ip:port -r push retries

Example

This example is used to solve for the square of the sum of two values. There are three workers for this example, as follows:

http request => entry => add => square
start
cd example
sh example/run
stop
cd example
sh example/stop

Directories

Path Synopsis
cmd
example
add

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL