rsocket

package module
v0.0.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2019 License: Apache-2.0 Imports: 19 Imported by: 30

README

rsocket-go

logo

Slack GoDoc Go Report Card License GitHub Release

rsocket-go is an implementation of the RSocket protocol in Go. It is still under development, APIs are unstable and maybe change at any time until release of v1.0.0. Please do not use it in a production environment.

Features

  • Design For Golang.
  • Thin reactive-streams implementation.
  • Simulate Java SDK API.

Getting started

Start an echo server

package main

import (
	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Create and serve
	err := rsocket.Receive().
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.EnhancedRSocket) rsocket.RSocket {
			// bind responder
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) rx.Mono {
					return rx.JustMono(msg)
				}),
			)
		}).
		Transport("tcp://127.0.0.1:7878").
		Serve()
	panic(err)
}

Connect to echo server

package main

import (
	"context"
	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"log"
)

func main() {
	// Connect to server
	client, err := rsocket.Connect().
		SetupPayload(payload.NewString("Hello", "World")).
		Transport("127.0.0.1:7878").
		Start()
	if err != nil {
		panic(err)
	}
	defer client.Close()
	// Send request
	client.RequestResponse(payload.NewString("你好", "世界")).
		DoOnSuccess(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			log.Println("receive response:", elem)
		}).
		Subscribe(context.Background())
}

NOTICE: more server examples are Here

Advanced

Load Balance

Basic load balance feature is in preview, please checkout current master branch. It's a client side load-balancer.

Here're some example codes:

package main

import (
	"context"
	"fmt"
	"log"
	"strings"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Use a chan as discovery for brokers. 
	// Writing a string slice will refresh the brokers of load balancer.  
	brokers := make(chan []string, 0)
	// Load Balance Example
	// It's EASY!!! Just set more than one transport URI.
	setup := payload.NewString("こんにちは、世界!", "Go")
	clientLB, err := rsocket.Connect().
		SetupPayload(setup).
		Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
			return rsocket.NewAbstractSocket(
				rsocket.FireAndForget(func(msg payload.Payload) {
					// For example:
					// You can refresh brokers list using FNF payload from server.
					brokers <- strings.Split(msg.DataUTF8(), ",")
				}),
            )
		}).
		Transports(brokers, rsocket.WithInitTransports("tcp://127.0.0.1:8000", "tcp://127.0.0.1:7878")).
		Start()
	if err != nil {
		panic(err)
	}
	defer func() {
		_ = clientLB.Close()
	}()
	for i := 0; i < 100; i++ {
		clientLB.RequestResponse(payload.NewString("Hello World!", fmt.Sprintf("%d", i))).
			DoOnError(func(ctx context.Context, err error) {
				log.Println("oops:", err)
			}).
			DoOnSuccess(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
				log.Println("okey:", elem)
			}).
			Subscribe(context.Background())
	}
}

NOTICE: example codes are here

Reactor API

Mono and Flux are two parts of Reactor API.

Mono

Mono completes successfully by emitting an element, or with an error. Here is a tiny example:

package main

import (
	"context"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
)

func main() {
	// Create a Mono which produce a simple payload.
	mono := rx.NewMono(func(ctx context.Context, sink rx.MonoProducer) {
		// Use context API if you want.
		sink.Success(payload.NewString("foo", "bar"))
	})

	done := make(chan struct{})

	mono.
		DoFinally(func(ctx context.Context, st rx.SignalType) {
			close(done)
		}).
		DoOnSuccess(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			// Handle and consume payload.
			// Do something here...
		}).
		SubscribeOn(rx.ElasticScheduler()).
		Subscribe(context.Background())

	<-done
}

Flux

Flux emits 0 to N elements, and then completes (successfully or with an error). Here is tiny example:

package main

import (
	"context"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"time"
)

func main() {
	// Create a Flux and produce 10 elements.
	flux := rx.NewFlux(func(ctx context.Context, producer rx.Producer) {
		for i := 0; i < 10; i++ {
			producer.Next(payload.NewString("hello", time.Now().String()))
		}
		producer.Complete()
	})
	flux.
		DoOnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
			// Handle and consume elements
			// Do something here...
		}).
		Subscribe(context.Background())
}

Backpressure & RequestN

Flux support backpressure.

You can call func Request in Subscription or use LimitRate before subscribe.

// Here is an example which consume Payload one by one.
flux.Subscribe(
    context.Background(),
    rx.OnSubscribe(func(ctx context.Context, s rx.Subscription) {
        // Init Request 1 element.
        s.Request(1)
    }),
    rx.OnNext(func(ctx context.Context, s rx.Subscription, elem payload.Payload) {
        // Consume element, do something...

        // Request for next one manually.
        s.Request(1)
    }),
)
Dependencies
TODO
Transport
  • TCP
  • Websocket
  • HTTP/2
Duplex Socket
  • MetadataPush
  • RequestFNF
  • RequestResponse
  • RequestStream
  • RequestChannel
Others
  • Resume
  • Keepalive
  • Fragmentation
  • Thin Reactor
  • Cancel
  • Error
  • Flow Control: RequestN
  • Flow Control: Lease
  • Load Balance
  • Reconnect

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus added in v0.0.6

type Bus interface {
	Put(id string, first RSocket, others ...RSocket)
	Get(id string) (socket RSocket, ok bool)
	Remove(id string, socket RSocket) bool
}

func NewBus added in v0.0.6

func NewBus() Bus

type ClientBuilder

type ClientBuilder interface {
	ClientTransportBuilder
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	Fragment(mtu int) ClientBuilder
	// KeepAlive defines current client keepalive settings.
	KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
	// DataMimeType is used to set payload data MIME type.
	// Default MIME type is `application/binary`.
	DataMimeType(mime string) ClientBuilder
	// MetadataMimeType is used to set payload metadata MIME type.
	// Default MIME type is `application/binary`.
	MetadataMimeType(mime string) ClientBuilder
	// SetupPayload set the setup payload.
	SetupPayload(setup payload.Payload) ClientBuilder
	// OnClose register handler when client socket closed.
	OnClose(fn func()) ClientBuilder
	// Acceptor set acceptor for RSocket client.
	Acceptor(acceptor ClientSocketAcceptor) ClientTransportBuilder
	// contains filtered or unexported methods
}

ClientBuilder can be used to build a RSocket client.

func Connect

func Connect() ClientBuilder

Connect create a new RSocket client builder with default settings.

type ClientSocket

type ClientSocket interface {
	io.Closer
	RSocket
}

ClientSocket is Client Side of a RSocket socket. Sends Frames to a RSocket Server.

type ClientSocketAcceptor

type ClientSocketAcceptor = func(socket RSocket) RSocket

ClientSocketAcceptor is alias for RSocket handler function.

type ClientStarter

type ClientStarter interface {
	// Start start a client socket.
	Start() (ClientSocket, error)
}

ClientStarter can be used to start a client.

type ClientTransportBuilder

type ClientTransportBuilder interface {
	// Transport set Transport for current RSocket client.
	// URI is used to create RSocket Transport:
	// Example:
	// "tcp://127.0.0.1:7878" means a TCP RSocket transport.
	// "ws://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport. (NOTICE: Websocket will be supported in the future).
	Transport(uri string) ClientStarter
	// Transports set transports with load balancer.
	// Client will watch discovery and change current transports.
	// You can custom balancer options use functions: WithInitTransports, WithQuantile, WithPendings and WithActives.
	Transports(discovery <-chan []string, options ...OptBalancer) ClientStarter
}

ClientTransportBuilder is used to build a RSocket client with custom Transport string.

type EnhancedRSocket added in v0.0.5

type EnhancedRSocket interface {
	io.Closer
	RSocket
	// OnClose bind handler when socket disconnected.
	OnClose(fn func())
}

EnhancedRSocket is a RSocket which support more events.

type OptAbstractSocket

type OptAbstractSocket func(*abstractRSocket)

OptAbstractSocket is option for abstract socket.

func FireAndForget

func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket

FireAndForget register request handler for FireAndForget.

func MetadataPush

func MetadataPush(fn func(payload payload.Payload)) OptAbstractSocket

MetadataPush register request handler for MetadataPush.

func RequestChannel

func RequestChannel(fn func(msgs rx.Publisher) rx.Flux) OptAbstractSocket

RequestChannel register request handler for RequestChannel.

func RequestResponse

func RequestResponse(fn func(msg payload.Payload) rx.Mono) OptAbstractSocket

RequestResponse register request handler for RequestResponse.

func RequestStream

func RequestStream(fn func(msg payload.Payload) rx.Flux) OptAbstractSocket

RequestStream register request handler for RequestStream.

type OptBalancer added in v0.0.4

type OptBalancer func(opts *balancerOpts)

OptBalancer can be used to set options for balancer.

func WithActives added in v0.0.4

func WithActives(min, max int) OptBalancer

WithActives limit amount of active sockets for a balancer. (default: 3 ~ 100)

func WithInitTransports added in v0.0.4

func WithInitTransports(uris ...string) OptBalancer

WithInitTransports sets initial transport URI.

func WithPendings added in v0.0.4

func WithPendings(min, max float64) OptBalancer

WithPendings sets pendings range for a balancer. (default: 1.0 ~ 2.0)

func WithQuantile added in v0.0.4

func WithQuantile(lower, higher float64) OptBalancer

WithQuantile sets quantile range of a balancer. (default: 0.2 ~ 0.8)

type RSocket

type RSocket interface {
	// FireAndForget is a single one-way message.
	FireAndForget(msg payload.Payload)
	// MetadataPush sends asynchronous Metadata frame.
	MetadataPush(msg payload.Payload)
	// RequestResponse request single response.
	RequestResponse(msg payload.Payload) rx.Mono
	// RequestStream request a completable stream.
	RequestStream(msg payload.Payload) rx.Flux
	// RequestChannel request a completable stream in both directions.
	RequestChannel(msgs rx.Publisher) rx.Flux
}

RSocket is a contract providing different interaction models for RSocket protocol.

func NewAbstractSocket

func NewAbstractSocket(opts ...OptAbstractSocket) RSocket

NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.

type ServerAcceptor

type ServerAcceptor = func(setup payload.SetupPayload, sendingSocket EnhancedRSocket) RSocket

ServerAcceptor is alias for server accepter.

type ServerBuilder

type ServerBuilder interface {
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	Fragment(mtu int) ServerBuilder
	// Acceptor register server acceptor which is used to handle incoming RSockets.
	Acceptor(acceptor ServerAcceptor) ServerTransportBuilder
}

ServerBuilder can be used to build a RSocket server.

func Receive

func Receive() ServerBuilder

Receive receives server connections from client RSockets.

type ServerTransportBuilder

type ServerTransportBuilder interface {
	// Transport specify transport string.
	Transport(transport string) Start
}

ServerTransportBuilder is used to build a RSocket server with custom Transport string.

type Start

type Start interface {
	// Serve serve RSocket server.
	Serve() error
}

Start start a RSocket server.

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL