zbus

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2023 License: Apache-2.0 Imports: 12 Imported by: 30

README

travis codecov GoDoc

Motivation

A light weight bus replacement for local inter-process communication. The main goal is to decouple separate components from each other, by using a light-weight message bus (current implemented redis), to queue and send message to the separate component that can serve it.

The keyword here is local zbus is not intended to be used over the network because it's intended ONLY for local inter process communication. Allows local processes to talk to each other.

To keep it light, the ZBUS does not do any authentication or permissions

A public API then can expose a public API then internally make calls to other local components.

Overview

  • Each module has a name, a single module can host one or more objects
  • While it's not required an object can implement one or more interfaces
  • Each object must have a name and a version
  • interfaces are mainly used to generate client stubs, but it's totally fine to not have one. In that case the client must know precisely the method signature (name and arguments number and types); same for the return value.
  • A consumer who has connection to the message broker can call methods on the remote objects, knowing only the module name, object name, method name, and argument list. The current implementation of the client supports only synchronous calls. In that matter it's similar to RPC.
  • A consumer of the component can use a stub to abstract the calls to the remote module
  • Support for events where clients can listen to announcements from different components

zbus was built with golang only in mind so zbusc was only built to generate client stubs for golang from the service interface witch is golang interface! But the underlying protocol itself is simple enough to implement client and servers in other languages. For example rust hence we also have rbus which is 100% compatible with the go implementation. Hence it's possible for services built in rust to be called from golang and the vise versa.

Installation

Installing the zbus compiler zbusc

go install github.com/threefoldtech/zbus/zbusc

The zbusc is only needed to generate stub code.

Walk-through

Let's build a service from scratch say a calculator service. First we create a project and init it

mkdir calc
cd calc
go mod init github.com/example/calc
go get github.com/threefoldtech/zbus

this initialize the directory to be a go project (module)

please use a proper module name when doing mod init

All new files are created under the calc directory let's create the service file create new file api.go

package calc

type Calculator interface {
	Add(a, b float64) float64
	Multiply(n ...float64) float64
	Divide(a, b float64) (float64, error)
}

while it's very simple, it shows that implementation supports variadic arguments, and also returning multiple arguments

zbus can also return channels for event streams but let's leave that for another example

The next step is simple is to actually implement this interface and start our zbus server

we create file server/server.go

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/example/calc"
	"github.com/threefoldtech/zbus"
)

// the service implementation structure
type myCalculator struct{}

// this is just to verify that that the myCalculator actually
// implements calc.Calculator interface ! if the interface
// changes this should give a compile error
var _ calc.Calculator = (*myCalculator)(nil)

func (c *myCalculator) Add(a, b float64) float64 {
	return a + b
}

func (c *myCalculator) Multiply(n ...float64) float64 {
	var v float64
	if len(n) > 0 {
		v = n[0]
	}

	for _, x := range n[1:] {
		v *= x
	}

	return v
}

func (c *myCalculator) Divide(a, b float64) (float64, error) {
	if b == 0 {
		return 0, fmt.Errorf("cannot divide by zero")
	}

	return a / b, nil
}

func app() error {
	const module = "calc"
	const address = "tcp://localhost:6379"
	server, err := zbus.NewRedisServer(module, address, 10)
	if err != nil {
		return err
	}

	impl := &myCalculator{}

	// a single module (in this case calc) can serve multiple objects
	// also it can serve multiple objects with same name but different version
	// hence it's important when u register an object to give it a name and a version
	// it's not possible to register the same name@version twice.
	server.Register(zbus.ObjectIDFromString("calculator@1.0.0"), impl)

	// once you are done registering ALL your objects it's time to start your server

	return server.Run(context.Background())
}

func main() {
	if err := app(); err != nil {
		fmt.Fprintf(os.Stderr, "%v", err)
		os.Exit(1)
	}
}

You can now run the zbus server simply by doing

go run server/server.go

Generating a stub client and using it

While you still inside the calc project create a stubs directory

mkdir stubs

Then run the following command

zbusc -module calc -name calculator -version 1.0.0 -package stubs github.com/example/calc+Calculator stubs/calculator_stub.go

The command line is simple it takes the module name, object name, object version, and the package name to use in the generated code. The it needs to know which interface to generate code for (in that case it's the Calculator interface) but it requires to know the full path hence it's provided as github.com/example/calc+Calculator. Finally where to output the generated code. We output the generated stub to stubs/calculator_stubs.go

To avoid typing this command every time you change the interface or you add new methods, instead edit the api.go file by adding this line above the Calculator interface


//go:generate mkdir -p stubs
//go:generate zbusc -module calc -name calculator -version 1.0.0 -package stubs github.com/example/calc+Calculator stubs/calculator_stub.go

type Calculator interface {

Now each time you want to regenerate the stubs do

go generate ./...
Testing the generated stub

while under the calc project create a client directory

mkdir client

then create file client/client.go

package main

import (
	"context"
	"fmt"
	"os"

	"github.com/example/calc/stubs"
	"github.com/threefoldtech/zbus"
)

func app(ctx context.Context) error {
	const address = "tcp://localhost:6379"

	client, err := zbus.NewRedisClient(address)
	if err != nil {
		return err
	}

	stub := stubs.NewCalculatorStub(client)

	// calling the add function
	fmt.Printf("adding 2 numbers: %f \n", stub.Add(ctx, 20, 30))

	_, err = stub.Divide(ctx, 100, 0)
	if err != nil {
		fmt.Println("got error: ", err)
	}

	return nil
}

func main() {
	if err := app(context.Background()); err != nil {
		fmt.Fprintf(os.Stderr, "%v", err)
		os.Exit(1)
	}
}

You notice the following:

  • You create a generic low level client to zbus, then you can use that to create as many stubs (to other services and modules) as you want
  • The client does not have to know about the interface, just the stub and then it can do calls normally like any other service.
  • Generated stubs calls always take ctx as first argument which allows you to control timeouts and cancellation if call is taking to long (service down?!)

To test this first to this

go run server/server.go

Then in another terminal do

go run client/client.go

this should output this

adding 2 numbers: 50.000000
got error:  cannot divide by zero

Specs

Please check specs here

Usage

It's very simple, check the examples

The api.go have some go generate lines that runs the zbusc tool

Projects using zbus

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseServer

type BaseServer struct {
	// contains filtered or unexported fields
}

BaseServer implements the basic server functionality In case you are building your own zbus server

func (*BaseServer) Register

func (s *BaseServer) Register(id ObjectID, object interface{}) error

Register registers an object on server

func (*BaseServer) Start

func (s *BaseServer) Start(ctx context.Context, wg *sync.WaitGroup, workers uint, cb Callback) chan<- *Request

Start starts the workers. Workers will call cb with results of requests. the call will feed requests to workers by feeding requests to channel. panics if workers number is zero.

func (*BaseServer) StartStreams

func (s *BaseServer) StartStreams(ctx context.Context, cb EventCallback)

StartStreams start the stream (events) workers in the background use the ctx to cancel the streams workers

func (*BaseServer) Status

func (s *BaseServer) Status() Status

Status returns a copy of the internal worker status

type CallError added in v1.0.0

type CallError struct {
	Message string
}

CallError is a concrete type used to wrap all errors returned by services for example, if a method `f` returns `error` the return.Error() is stored in a CallError struct

func (*CallError) Error added in v1.0.0

func (r *CallError) Error() string

type Callback

type Callback func(request *Request, response *Response)

Callback defines a callback method signature for responses

type Client

type Client interface {
	// Request [DEPRECATED] makes a request and return the response data
	Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)

	RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error)

	// Stream listens to a stream of events from the server
	Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error)

	Status(ctx context.Context, module string) (Status, error)
}

Client defines client interface

func NewRedisClient

func NewRedisClient(address string) (Client, error)

NewRedisClient creates a new redis client

type Event

type Event []byte

func (Event) Unmarshal

func (e Event) Unmarshal(o interface{}) error

type EventCallback

type EventCallback func(key string, event interface{})

EventCallback is calld by the base server once an event is available

type Loader added in v1.0.0

type Loader []interface{}

type ObjectID

type ObjectID struct {
	Name    string
	Version Version
}

ObjectID defines an object id

func ObjectIDFromString

func ObjectIDFromString(id string) ObjectID

ObjectIDFromString parses an object id from string

func (ObjectID) String

func (o ObjectID) String() string

type Output added in v1.0.0

type Output struct {
	Data  []byte
	Error *CallError
}

Output results from a call

func (*Output) Unmarshal added in v1.0.0

func (t *Output) Unmarshal(v *Loader) error

Unmarshal argument at position i into value

type RedisClient

type RedisClient struct {
	// contains filtered or unexported fields
}

RedisClient is client implementation for redis broker

func (*RedisClient) Request

func (c *RedisClient) Request(module string, object ObjectID, method string, args ...interface{}) (*Response, error)

Request makes a request to object.Method hosted by module. A module name is the queue name used in the server part.

func (*RedisClient) RequestContext

func (c *RedisClient) RequestContext(ctx context.Context, module string, object ObjectID, method string, args ...interface{}) (*Response, error)

RequestContext makes a request to object.Method hosted by module. A module name is the queue name used in the server part.

func (*RedisClient) Status

func (c *RedisClient) Status(ctx context.Context, module string) (Status, error)

Status return module status

func (*RedisClient) Stream

func (c *RedisClient) Stream(ctx context.Context, module string, object ObjectID, event string) (<-chan Event, error)

Stream listens to a stream of events from the server

type RedisServer

type RedisServer struct {
	BaseServer
	// contains filtered or unexported fields
}

RedisServer implementation for Redis

func (*RedisServer) Run

func (s *RedisServer) Run(ctx context.Context) error

Run starts the ZBus server

type Request

type Request struct {
	ID      string
	Inputs  Tuple
	Object  ObjectID
	ReplyTo string
	Method  string
}

Request is carrier of byte data. It does not assume any encoding types used for individual objects

var (
	// NoOP request will cause the worker to try polling again from the queue
	// without doing anything. The idea is that we can use this to check
	// if there are any free workers, by pusing this to the channel in a select
	// and see if any of the workers receives it.
	NoOP Request
)

func LoadRequest

func LoadRequest(data []byte) (*Request, error)

LoadRequest from bytes

func NewRequest

func NewRequest(id, replyTo string, object ObjectID, method string, args ...interface{}) (*Request, error)

NewRequest creates a message that carries the given values

func (*Request) Argument added in v1.0.0

func (m *Request) Argument(i int, t reflect.Type) (value reflect.Value, err error)

Argument loads an argument into a reflect.Value of type t

func (*Request) Encode

func (m *Request) Encode() ([]byte, error)

Encode converts a message into byte data suitable to send over the wire Encode will always use msgpack.

func (*Request) NumArguments added in v1.0.0

func (m *Request) NumArguments() int

NumArguments returns the length of the argument list

func (*Request) Unmarshal added in v1.0.0

func (m *Request) Unmarshal(i int, v interface{}) error

Unmarshal argument at position i into value

func (*Request) Value added in v1.0.0

func (m *Request) Value(i int, t reflect.Type) (interface{}, error)

Value gets the concrete value stored at argument index i

type Response

type Response struct {
	// ID of response
	ID string
	// Output is returned data by call
	Output Output
	// Error here is any protocol error that is
	// not related to error returned by the remote call
	Error *string
}

Response object

func LoadResponse

func LoadResponse(data []byte) (*Response, error)

LoadResponse loads response from data

func NewResponse

func NewResponse(id string, ret Output, errMsg string) *Response

NewResponse creates a response with id, and errMsg and return values note that errMsg is the protocol level errors (no such method, unknown object, etc...) errors returned by the service method itself should be encapsulated in the values

func (*Response) CallError added in v1.0.0

func (m *Response) CallError() error

func (*Response) Encode

func (m *Response) Encode() ([]byte, error)

Encode converts a response into byte data suitable to send over the wire Encode will always use msgpack.

func (*Response) PanicOnError added in v1.0.0

func (m *Response) PanicOnError()

Panic causes this response to panic in case of a protocol error. It's an indication to a problem with code hence a panic is okay

func (*Response) Unmarshal added in v1.0.0

func (m *Response) Unmarshal(v *Loader) error

Unmarshal argument at position i into value

type Server

type Server interface {
	Register(id ObjectID, object interface{}) error
	Run(ctx context.Context) error
}

Server is server interface

func NewRedisServer

func NewRedisServer(module, address string, workers uint) (Server, error)

NewRedisServer builds a new ZBus server that uses disque as message broker

type Status

type Status struct {
	Objects []ObjectID     `json:"objects" yaml:"objects"`
	Workers []WorkerStatus `json:"workers" yaml:"workers"`
}

Status is returned by the server Status method

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream represents a channel of events

func (*Stream) Name

func (s *Stream) Name() string

func (*Stream) Run

func (s *Stream) Run(ctx context.Context) <-chan interface{}

Run stream to completion

type Surrogate

type Surrogate struct {
	// contains filtered or unexported fields
}

Surrogate a wrapper around an object to support dynamic method calls

func NewSurrogate

func NewSurrogate(object interface{}) *Surrogate

NewSurrogate crates a new surrogate object

func (*Surrogate) Call

func (s *Surrogate) Call(name string, args ...interface{}) (ret Output, err error)

Call dynamically call a method

func (*Surrogate) CallRequest

func (s *Surrogate) CallRequest(request *Request) (ret Output, err error)

CallRequest calls a method defined by request

func (*Surrogate) Streams

func (s *Surrogate) Streams() []Stream

Streams return all stream objects associated with this object stream methods only take one method (context) and must return a single value a chan of a static type (struct, or primitive)

type Tuple added in v1.0.0

type Tuple [][]byte

func (Tuple) Unmarshal added in v1.0.0

func (t Tuple) Unmarshal(i int, v interface{}) error

Unmarshal argument at position i into value

type Version

type Version string

Version defines the object version

type WorkerState

type WorkerState string

WorkerState represents curret worker state (free, or busy)

const (
	// WorkerFree free state
	WorkerFree WorkerState = "free"
	// WorkerBusy busy state
	WorkerBusy WorkerState = "busy"
)

type WorkerStatus

type WorkerStatus struct {
	State     WorkerState `json:"state" yaml:"state"`
	StartTime time.Time   `json:"time,omitempty" yaml:"time,omitempty"`
	Action    string      `json:"action,omitempty" yaml:"action,omitempty"`
}

WorkerStatus represents the full worker status including request time and method that it is working on.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL