wrpc

package
v0.0.0-...-5b96247 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2020 License: MIT Imports: 10 Imported by: 0

README

wrpc as in Web RPC. A package that enables running any function implementing type RemoteCall func(in io.Reader, out io.WriteCloser) remotely on a mesh of web workers.

Communication between the workers is done through pipes. This allows combinations of any concurrently active pipelines. Since worker's can reschedule their work to another worker, automatic load balancing should be possible in theory.

All this is achieved by having the manager (browser main thread) and workers run the same binary and sending common function pointers defined throughout the codebase as calls are scheduled. The function must be statically declared and cannot access caller scope (where the call function is defined).

RemoteCall has 2 parameters: in and out as in input and output. If main thread would want to get a result from a call to a worker, it would have to create a pair of piped ports using wrpc.Pipe(). One end is given to the worker into which it writes the result and from the other end we can read it back.

It is possible for the worker to create a new pipe and call subworkers and so on in any combination by just connecting the pipes together using io.Copy for example.

The package provides easy interfaces for launching remote calls on workers: Go(in io.Reader, out io.WriteCloser, f RemoteCall) and for chaining workers by connecting each output to next input and passing out directly to the final worker: GoChain(in io.Reader, out io.WriteCloser, calls ...RemoteCall)

By having such an interface combined with the mesh network, it allows to implement any protocols on top of it. Even to go as far as to run a gRPC server as a worker call and having it schedule a call [containing the gRPC client calling the server back] to another worker. I tried a pure net.Conn approach at first and ran a gRPC setup on top of that. Although it worked well with gogoproto custom marshaling (In a raw audio application there was a noticable difference over reflection based marshaling). I instead implemented the MessagePort API directly as blocking io.ReadWriteCloser pipes keeping the higher abstractions open.

Demos available at: https://github.com/mgnsk/go-wasm-demos

Documentation

Rendered for js/wasm

Index

Constants

This section is empty.

Variables

View Source
var CallCount uint64 = 0

CallCount specifies how many calls are currently processing.

View Source
var CreateTimeout = 10 * time.Second

CreateTimeout specifies timeout for waiting for webworker hello.

View Source
var GlobalScheduler = NewScheduler()

GlobalScheduler is main scheduler to schedule to workers.

View Source
var IndexJS []byte

IndexJS boots up webworker go main.

Functions

func Go

func Go(in io.Reader, out io.WriteCloser, f RemoteCall)

Go provides a familiar interface for wRPC calls.

Here are some rules: 1) f runs in a new goroutine on the first worker that receives it. 2) f can call Go with a new RemoteCall. Workers can then act like a mesh where any chain of stream is concurrently active

func GoChain

func GoChain(in io.Reader, out io.WriteCloser, calls ...RemoteCall)

GoChain runs goroutines in a chain, piping each worker's output into next input.

func Pipe

func Pipe() (*MessagePort, *MessagePort)

Pipe returns a message channel pipe connection between ports.

func RunServer

func RunServer(ctx context.Context)

RunServer runs on the webworker side to start the server implementing the WebRPC.

Types

type Call

type Call struct {
	// RemoteCall will be run in a remote webworker.
	RemoteCall RemoteCall
	// InputReader is a port where the worker can read its input data from.
	Input *MessagePort
	// ResultPort is the port where the result gets written into.
	Output *MessagePort
}

Call is a remote call that can be scheduled to a worker.

type MessagePort

type MessagePort struct {
	// contains filtered or unexported fields
}

MessagePort enables duplex communication with any js object implementing the onmessage event and postMessage method.

func NewMessagePort

func NewMessagePort(value js.Value) *MessagePort

NewMessagePort constructor.

func (*MessagePort) Close

func (port *MessagePort) Close() error

Close the port.

func (*MessagePort) JSValue

func (port *MessagePort) JSValue() js.Value

JSValue returns the underlying js value.

func (*MessagePort) PostMessage

func (port *MessagePort) PostMessage(args ...interface{})

PostMessage sends a raw js message to remote end.

func (*MessagePort) Read

func (port *MessagePort) Read(p []byte) (n int, err error)

Read from port.

func (*MessagePort) RemoteReady

func (port *MessagePort) RemoteReady() <-chan struct{}

RemoteReady returns a channel that is closed when the remote end starts listening.

func (*MessagePort) Write

func (port *MessagePort) Write(p []byte) (n int, err error)

Write to port.

type RemoteCall

type RemoteCall func(in io.Reader, out io.WriteCloser)

RemoteCall is a function which must be statically declared so that it's pointer could be sent to another machine to run.

Arguments: input is a reader which is piped into the worker's input. outputPort is call's output that must be closed when not being written into anymore. All writes to out block until a corresponding read from its other side.

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler schedules calls to ports.

func NewScheduler

func NewScheduler() *Scheduler

NewScheduler constructor.

func (*Scheduler) Call

func (s *Scheduler) Call(ctx context.Context, call Call) error

Call sends the remote call to first worker who receives it.

func (*Scheduler) RunScheduler

func (s *Scheduler) RunScheduler(ctx context.Context, port *MessagePort) error

RunScheduler starts a scheduler to schedule calls to port. Runs sync on a single port.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is a browser thread that communicates through net.Conn interface.

func CreateWorkerFromSource

func CreateWorkerFromSource(indexJS []byte) (*Worker, error)

CreateWorkerFromSource creates a Worker from js source. The worker is terminated when context is canceled.

func SpawnWorker

func SpawnWorker(ctx context.Context) *Worker

SpawnWorker spawns and connects a new webworker.

func (*Worker) ACK

func (w *Worker) ACK() <-chan struct{}

ACK channel.

func (*Worker) JSValue

func (w *Worker) JSValue() js.Value

JSValue returns the underlying js value.

func (*Worker) MessagePort

func (w *Worker) MessagePort() *MessagePort

MessagePort returns the worker's port.

func (*Worker) StartRemoteScheduler

func (w *Worker) StartRemoteScheduler(to *MessagePort)

StartRemoteScheduler starts a scheduler on the remote end that schedules to 'to'.

func (*Worker) Terminate

func (w *Worker) Terminate()

Terminate the webworker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL