README ¶
Conduction
Description
Work in progress! Event based Kafka router between protocols. A centralized router that can send messages between protocols. This allows easy rerouting without changing client code and a high level view to understand the communication between clients.
./protoc -I=/Users/edmundfung/workspace/go/src/github.com/edfungus/conduction/messenger --go_out=/Users/edmundfung/workspace/go/src/github.com/edfungus/conduction/messenger/ /Users/edmundfung/workspace/go/src/github.com/edfungus/conduction/messenger/message.proto
Examples ... aka thinking out loud
This is for me to get my thoughts together to make a better architecture. There are a fair number of features and requirements I want to support and it is more complicated than I anticipated.
mqtt to mqtt (pass through, one step)
-
mqtt --> mqtt connector
-
mqtt connector --> conduction
MQTT connector gets message and makes this:
Message { Path origin = {route=in, type=1} bytes payload }
-
conduction <--> database
Using <origin.path>_<origin.type>, get list of Flows to execute:
[]Flows = [ Flow { Path path = {route=out, type=1} } ]
-
conduction --> mqtt connector
Using the Path given, create a new Message to pass back to the mqtt connector
Message { Path destination = {route=out, type=1} bytes payload }
-
mqtt connector --> mqtt
rest to rest (return call, two step)
-
rest --> rest connector
-
rest connector --> conduction
Coverts call to Message. REST will always have a return Path to identify the controller:
Message { Path origin = {route=GET_/home, type=0} Path return = {type=0, metadata.controllerID = 1} }
-
conduction <--> database
Get the list of Flow to execute:
[]Flows = [ Flow { Path path = {route=GET_/catpics, type=0} } ]
-
conduction --> rest connector
Making the call to get a cat picture. Notice return get passed to the connector because returns are handled by the connector (perhaps not...might just insert into
incomingTempPaths
table):Message { Path destination = {route=GET_/catpics, type=1} Path return = {type=0, metadata.controllerID = 1} }
-
rest connector --> conduction
Afer getting the cat piture, send back to conduction. If there was a return, the connector will place it in destination when completely its task:
Message { Path origin = {route=GET_/catpics, type=1} Path destination = {type=0, metadata.controllerID = 1} bytes payload = `catpic` }
-
conduction <--> database
Because there is an origin, conduction will check to see if this will launch a flow. Yes, infinite loops may be possible :(. In this case there are none
[]Flows = nil
-
conduction --> rest connector
When are no Flows, conduction would usually stop, but since there is a destination, it will send the Message to the destination
Message { Path destination = {type=0, metadata.controllerID = 1} bytes payload = `catpic` }
mqtt to rest to mqtt (two step, return call, two types)
- mqtt --> mqtt connector
- mqtt connector --> conduction
- conduction <--> database
- conduction --> rest connector
- rest connector <--> rest
- rest connector --> conduction
- conduction <--> database
- conduction --> mqtt connector
- mqtt connector --> mqtt
mqtt to 2 mqtt (multistep, dependents, two types)
This one is very similar to jst mqtt to mqtt
- mqtt --> mqtt connector
- mqtt connector --> conduction
- conduction <--> database
- conduction --> mqtt connector (x2)
- mqtt connector -- mqtt (x2)
rest to mqtt to rest (two step, return call, waiting)
-
rest --> rest connector
-
rest connector --> conduction Creates:
Message { Path origin = {route=GET_/home, type=0} Path return = {type=0, metadata.controllerID = 1} bytes payload = 255 }
-
conduction <--> database
[]Flows = [ Flow { Path path = {route=lightOn, type=1, dependentFlows=Flow { Path path = {route=turnOnLight, type=1} }, wait=true } }, Flow { Path path = {type=0, metadata.controllerID = 1} } ]
-
conduction --> database Because there is more than one Flow, find Flow we are going to do first and remove from the object. Check now that the next Flow's
wait
If it is true, put an entry to database with the rest of the FlowFor `tempFLows`, insert `Flow` (the rest of it) For `incomingTempPaths`, insert `path`, `type`, `identity`(created by conduction), `tempFlow` (uuid)
-
conduction --> mqtt connector Note that identity is set.. but MQTT doesn't support identity...so what ever next message matches the expect return will continue to Flow
Message { Path destination = {route=turnOnLight, type=0, identity=uuid} bytes payload = 255 }
-
mqtt connector --> mqtt Light sets to 255
-
mqtt --> mqtt connector Light sends back "DONE"
-
mqtt connector --> conduction
Message { Path origin = {route=lightOn, type=0} bytes payload = "DONE" }
-
conduction <--> database Note, there is no path to flow relationship for this incoming mqtt message. The reason it will get routed is because conduction put in a temporary entry to wait for response. Because we can tell that there are no more entries with emtpy payload for the
tempFlow
uuid inincomingTempPaths
, we know we can consolidate all the payload and pass them together to the parent Flow. I guess the combination will be JSON key-valueincomingPaths --> []Flow = nil incomingTempPaths --> tempFlows --> []Flow = [ Flow { Path path = {route=turnOnLight, type=1, wait=true} }, Flow { Path path = {type=0, metadata.controllerID = 1} } ]
-
conduction --> rest controller Because the next Flows is
wait
=true, we don't actually execute. We usually consolidate all the payloads and send to next Flow which is REST but since one iswait
=true, we onl take the payload of thewait
Flow. Note, if the parent was a Lambda, we would send the consolidated JSON to the lambda and then send the output of the Lambda to REST.Message { Path destination = {type=0, metadata.controllerID = 1} bytes payload = "DONE" }
-
rest controller --> rest
mqtt to rest (x2) to mqtt
This triggers two rest calls which then merges the results together and sends to mqtt. Ideally, the parent flow of the two rest calls would be a lamdba call which can merge the results. This is a far off feature...
mqtt to mqtt or mqtt
The []Flows has no logic and will jsut run the next one, but what if you want logic? Should that be imbedded in the Flow object or fired via lambda??