rsocket

package module
v0.5.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2020 License: Apache-2.0 Imports: 16 Imported by: 30

README

rsocket-go

logo

Build Status Slack GoDoc Go Report Card License GitHub Release

rsocket-go is an implementation of the RSocket protocol in Go.
🚧🚧🚧 IT IS UNDER ACTIVE DEVELOPMENT, APIs are unstable and maybe change at any time until release of v1.0.0.
⚠️⚠️⚠️ DO NOT USE IN ANY PRODUCTION ENVIRONMENT!!!

Features

  • Design For Golang.
  • Thin reactive-streams implementation.
  • Simulate Java SDK API.
  • Fast CLI (Compatible with https://github.com/rsocket/rsocket-cli).
    • Installation: go get github.com/rsocket/rsocket-go/cmd/rsocket-cli
    • Example: rsocket-cli --request -i hello_world --setup setup_me tcp://127.0.0.1:7878

Getting started

Start an echo server

package main

import (
	"context"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx/mono"
)

func main() {
	err := rsocket.Receive().
		Resume().
		Fragment(1024).
		Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
			// bind responder
			return rsocket.NewAbstractSocket(
				rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
					return mono.Just(msg)
				}),
			), nil
		}).
		Transport("tcp://127.0.0.1:7878").
		Serve(context.Background())
	panic(err)
}

Connect to echo server

package main

import (
	"context"
	"log"

	"github.com/rsocket/rsocket-go"
	"github.com/rsocket/rsocket-go/payload"
)

func main() {
	// Connect to server
	cli, err := rsocket.Connect().
		Resume().
		Fragment(1024).
		SetupPayload(payload.NewString("Hello", "World")).
		Transport("tcp://127.0.0.1:7878").
		Start(context.Background())
	if err != nil {
		panic(err)
	}
	defer cli.Close()
	// Send request
	result, err := cli.RequestResponse(payload.NewString("你好", "世界")).Block(context.Background())
	if err != nil {
		panic(err)
	}
	log.Println("response:", result)
}

NOTICE: more server examples are Here

Advanced

Load Balance

Basic load balance feature, see here.

Reactor API

Mono and Flux are two parts of Reactor API. They are based on my another project reactor-go.

Mono

Mono completes successfully by emitting an element, or with an error. Here is a tiny example:

package main

import (
	"context"
	"fmt"

	"github.com/jjeffcaii/reactor-go/scheduler"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"github.com/rsocket/rsocket-go/rx/mono"
)

func main() {
	// Create a Mono using Just.
	m := mono.Just(payload.NewString("Hello World!", "text/plain"))

	// More create
	//m := mono.Create(func(i context.Context, sink mono.Sink) {
	//	sink.Success(payload.NewString("Hello World!", "text/plain"))
	//})

	done := make(chan struct{})

	m.
		DoFinally(func(s rx.SignalType) {
			close(done)
		}).
		DoOnSuccess(func(input payload.Payload) {
			// Handle and consume payload.
			// Do something here...
			fmt.Println("bingo:", input)
		}).
		SubscribeOn(scheduler.Elastic()).
		Subscribe(context.Background())

	<-done
}
Flux

Flux emits 0 to N elements, and then completes (successfully or with an error). Here is tiny example:

package main

import (
	"context"
	"fmt"

	flxx "github.com/jjeffcaii/reactor-go/flux"
	"github.com/rsocket/rsocket-go/extension"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx/flux"
)

func main() {
	// Create a Flux and produce 10 elements.
	f := flux.Create(func(ctx context.Context, sink flux.Sink) {
		for i := 0; i < 10; i++ {
			sink.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
		}
		sink.Complete()
	})

	// Or use Just.
	//f := flux.Just(
	//	payload.NewString("foo", extension.TextPlain.String()),
	//	payload.NewString("bar", extension.TextPlain.String()),
	//	payload.NewString("qux", extension.TextPlain.String()),
	//)

	f.
		DoOnNext(func(elem payload.Payload) {
			// Handle and consume elements
			// Do something here...
			fmt.Println("bingo:", elem)
		}).
		Subscribe(context.Background())

	// Or you can use Raw reactor-go API. :-D
	f2 := flux.Raw(flxx.Range(0, 10).Map(func(i interface{}) interface{} {
		return payload.NewString(fmt.Sprintf("Hello@%d", i.(int)), extension.TextPlain.String())
	}))
	f2.
		DoOnNext(func(input payload.Payload) {
			fmt.Println("bingo:", input)
		}).
		BlockLast(context.Background())
}
Backpressure & RequestN

Flux support backpressure.

You can call func Request in Subscription or use LimitRate before subscribe.

package main

import (
	"context"
	"fmt"

	"github.com/rsocket/rsocket-go/extension"
	"github.com/rsocket/rsocket-go/payload"
	"github.com/rsocket/rsocket-go/rx"
	"github.com/rsocket/rsocket-go/rx/flux"
)

func main() {
	// Here is an example which consume Payload one by one.
	f := flux.Create(func(ctx context.Context, s flux.Sink) {
		for i := 0; i < 5; i++ {
			s.Next(payload.NewString(fmt.Sprintf("Hello@%d", i), extension.TextPlain.String()))
		}
		s.Complete()
	})

	var su rx.Subscription
	f.
		DoOnRequest(func(n int) {
			fmt.Printf("requesting next %d element......\n", n)
		}).
		Subscribe(
			context.Background(),
			rx.OnSubscribe(func(s rx.Subscription) {
				// Init Request 1 element.
				su = s
				su.Request(1)
			}),
			rx.OnNext(func(elem payload.Payload) {
				// Consume element, do something...
				fmt.Println("bingo:", elem)
				// Request for next one manually.
				su.Request(1)
			}),
		)
}

Logging

We do not use a specific log implementation. You can register your own log implementation. For example:

package main

import (
	"log"

	"github.com/rsocket/rsocket-go/logger"
)

func init() {
	logger.SetFunc(logger.LevelDebug|logger.LevelInfo|logger.LevelWarn|logger.LevelError, func(template string, args ...interface{}) {
		// Implement your own logger here...
		log.Printf(template, args...)
	})
	logger.SetLevel(logger.LevelInfo)
}

Dependencies
TODO
Transport
  • TCP
  • Websocket
Duplex Socket
  • MetadataPush
  • RequestFNF
  • RequestResponse
  • RequestStream
  • RequestChannel
Others
  • Resume
  • Keepalive
  • Fragmentation
  • Thin Reactor
  • Cancel
  • Error
  • Flow Control: RequestN
  • Flow Control: Lease
  • Load Balance

Documentation

Overview

Example
// Serve a server
err := rsocket.Receive().
	Resume(). // Enable RESUME
	//Lease().
	Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
		return rsocket.NewAbstractSocket(
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				log.Println("incoming request:", msg)
				return mono.Just(payload.NewString("Pong", time.Now().String()))
			}),
		), nil
	}).
	Transport("tcp://127.0.0.1:7878").
	Serve(context.Background())
if err != nil {
	panic(err)
}

// Connect to a server.
cli, err := rsocket.Connect().
	SetupPayload(payload.NewString("Hello World", "From Golang")).
	Transport("tcp://127.0.0.1:7878").
	Start(context.Background())
if err != nil {
	panic(err)
}
defer func() {
	_ = cli.Close()
}()
cli.RequestResponse(payload.NewString("Ping", time.Now().String())).
	DoOnSuccess(func(elem payload.Payload) {
		log.Println("incoming response:", elem)
	}).
	Subscribe(context.Background())
Output:

Index

Examples

Constants

View Source
const (
	// ErrorCodeInvalidSetup means the setup frame is invalid for the server.
	ErrorCodeInvalidSetup = common.ErrorCodeInvalidSetup
	// ErrorCodeUnsupportedSetup means some (or all) of the parameters specified by the client are unsupported by the server.
	ErrorCodeUnsupportedSetup = common.ErrorCodeUnsupportedSetup
	// ErrorCodeRejectedSetup means server rejected the setup, it can specify the reason in the payload.
	ErrorCodeRejectedSetup = common.ErrorCodeRejectedSetup
	// ErrorCodeRejectedResume means server rejected the resume, it can specify the reason in the payload.
	ErrorCodeRejectedResume = common.ErrorCodeRejectedResume
	// ErrorCodeConnectionError means the connection is being terminated.
	ErrorCodeConnectionError = common.ErrorCodeConnectionError
	// ErrorCodeConnectionClose means the connection is being terminated.
	ErrorCodeConnectionClose = common.ErrorCodeConnectionClose
	// ErrorCodeApplicationError means application layer logic generating a Reactive Streams onError event.
	ErrorCodeApplicationError = common.ErrorCodeApplicationError
	// ErrorCodeRejected means Responder reject it.
	ErrorCodeRejected = common.ErrorCodeRejected
	// ErrorCodeCanceled means the Responder canceled the request but may have started processing it (similar to REJECTED but doesn't guarantee lack of side-effects).
	ErrorCodeCanceled = common.ErrorCodeCanceled
	// ErrorCodeInvalid means the request is invalid.
	ErrorCodeInvalid = common.ErrorCodeInvalid
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client added in v0.2.0

type Client interface {
	CloseableRSocket
}

Client is Client Side of a RSocket socket. Sends Frames to a RSocket Server.

type ClientBuilder

type ClientBuilder interface {
	ClientTransportBuilder
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	Fragment(mtu int) ClientBuilder

	// KeepAlive defines current client keepalive settings.
	KeepAlive(tickPeriod, ackTimeout time.Duration, missedAcks int) ClientBuilder
	// Resume enable resume for current RSocket.
	Resume(opts ...ClientResumeOptions) ClientBuilder
	Lease() ClientBuilder
	// DataMimeType is used to set payload data MIME type.
	// Default MIME type is `application/binary`.
	DataMimeType(mime string) ClientBuilder
	// MetadataMimeType is used to set payload metadata MIME type.
	// Default MIME type is `application/binary`.
	MetadataMimeType(mime string) ClientBuilder
	// SetupPayload set the setup payload.
	SetupPayload(setup payload.Payload) ClientBuilder
	// OnClose register handler when client socket closed.
	OnClose(fn func(error)) ClientBuilder
	// Acceptor set acceptor for RSocket client.
	Acceptor(acceptor ClientSocketAcceptor) ClientTransportBuilder
}

ClientBuilder can be used to build a RSocket client.

func Connect

func Connect() ClientBuilder

Connect create a new RSocket client builder with default settings.

Example
cli, err := rsocket.Connect().
	Resume(). // Enable RESUME.
	Lease().  // Enable LEASE.
	Fragment(4096).
	SetupPayload(payload.NewString("Hello", "World")).
	Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
		return rsocket.NewAbstractSocket(
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				return mono.Just(payload.NewString("Pong", time.Now().String()))
			}),
		)
	}).
	Transport("tcp://127.0.0.1:7878").
	Start(context.Background())
if err != nil {
	panic(err)
}
defer func() {
	_ = cli.Close()
}()
// Simple FireAndForget.
cli.FireAndForget(payload.NewString("This is a FNF message.", ""))
// Simple RequestResponse.
cli.RequestResponse(payload.NewString("This is a RequestResponse message.", "")).
	DoOnSuccess(func(elem payload.Payload) {
		log.Println("response:", elem)
	}).
	Subscribe(context.Background())
var s rx.Subscription
// RequestStream with backpressure. (one by one)
cli.RequestStream(payload.NewString("This is a RequestStream message.", "")).
	DoOnNext(func(elem payload.Payload) {
		log.Println("next element in stream:", elem)
		s.Request(1)
	}).
	Subscribe(context.Background(), rx.OnSubscribe(func(s rx.Subscription) {
		s.Request(1)
	}))
// Simple RequestChannel.
sendFlux := flux.Create(func(ctx context.Context, s flux.Sink) {
	for i := 0; i < 3; i++ {
		s.Next(payload.NewString(fmt.Sprintf("This is a RequestChannel message #%d.", i), ""))
	}
	s.Complete()
})
cli.RequestChannel(sendFlux).
	DoOnNext(func(elem payload.Payload) {
		log.Println("next element in channel:", elem)
	}).
	Subscribe(context.Background())
Output:

type ClientResumeOptions added in v0.2.0

type ClientResumeOptions func(opts *resumeOpts)

ClientResumeOptions represents resume options for client.

func WithClientResumeToken added in v0.2.0

func WithClientResumeToken(gen func() []byte) ClientResumeOptions

WithClientResumeToken creates a resume token generator.

type ClientSocketAcceptor

type ClientSocketAcceptor = func(socket RSocket) RSocket

ClientSocketAcceptor is alias for RSocket handler function.

type ClientStarter

type ClientStarter interface {
	// Start start a client socket.
	Start(ctx context.Context) (Client, error)
	// Start start a client socket with TLS.
	// Here's an example:
	// tc := &tls.Config {
	//	InsecureSkipVerify: true,
	// }
	StartTLS(ctx context.Context, tc *tls.Config) (Client, error)
}

ClientStarter can be used to start a client.

type ClientTransportBuilder

type ClientTransportBuilder interface {
	// Transport set Transport for current RSocket client.
	// URI is used to create RSocket Transport:
	// Example:
	// "tcp://127.0.0.1:7878" means a TCP RSocket transport.
	// "ws://127.0.0.1:8080/a/b/c" means a Websocket RSocket transport.
	// "wss://127.0.0.1:8080/a/b/c" means a  Websocket RSocket transport with HTTPS.
	Transport(uri string, opts ...TransportOpts) ClientStarter
}

ClientTransportBuilder is used to build a RSocket client with custom Transport string.

type CloseableRSocket added in v0.2.0

type CloseableRSocket interface {
	socket.Closeable
	RSocket
}

CloseableRSocket is a RSocket which support more events.

type Error added in v0.5.10

type Error = common.CustomError

Error provides a method of accessing code and data.

type ErrorCode added in v0.5.10

type ErrorCode = common.ErrorCode

ErrorCode is code for RSocket error.

type OpServerResume added in v0.2.0

type OpServerResume func(o *serverResumeOptions)

OpServerResume represents resume options for RSocket server.

func WithServerResumeSessionDuration added in v0.2.0

func WithServerResumeSessionDuration(duration time.Duration) OpServerResume

WithServerResumeSessionDuration sets resume session duration for RSocket server.

type OptAbstractSocket

type OptAbstractSocket func(*socket.AbstractRSocket)

OptAbstractSocket is option for abstract socket.

func FireAndForget

func FireAndForget(fn func(msg payload.Payload)) OptAbstractSocket

FireAndForget register request handler for FireAndForget.

func MetadataPush

func MetadataPush(fn func(msg payload.Payload)) OptAbstractSocket

MetadataPush register request handler for MetadataPush.

func RequestChannel

func RequestChannel(fn func(msgs rx.Publisher) flux.Flux) OptAbstractSocket

RequestChannel register request handler for RequestChannel.

func RequestResponse

func RequestResponse(fn func(msg payload.Payload) mono.Mono) OptAbstractSocket

RequestResponse register request handler for RequestResponse.

func RequestStream

func RequestStream(fn func(msg payload.Payload) flux.Flux) OptAbstractSocket

RequestStream register request handler for RequestStream.

type RSocket

type RSocket interface {
	// FireAndForget is a single one-way message.
	FireAndForget(message payload.Payload)
	// MetadataPush sends asynchronous Metadata frame.
	MetadataPush(message payload.Payload)
	// RequestResponse request single response.
	RequestResponse(message payload.Payload) mono.Mono
	// RequestStream request a completable stream.
	RequestStream(message payload.Payload) flux.Flux
	// RequestChannel request a completable stream in both directions.
	RequestChannel(messages rx.Publisher) flux.Flux
}

RSocket is a contract providing different interaction models for RSocket protocol.

func NewAbstractSocket

func NewAbstractSocket(opts ...OptAbstractSocket) RSocket

NewAbstractSocket returns an abstract implementation of RSocket. You can specify the actual implementation of any request.

type ServerAcceptor

type ServerAcceptor = func(setup payload.SetupPayload, sendingSocket CloseableRSocket) (RSocket, error)

ServerAcceptor is alias for server acceptor.

type ServerBuilder

type ServerBuilder interface {
	// Fragment set fragmentation size which default is 16_777_215(16MB).
	Fragment(mtu int) ServerBuilder
	// Lease enable feature of Lease.
	Lease(leases lease.Leases) ServerBuilder
	// Resume enable resume for current server.
	Resume(opts ...OpServerResume) ServerBuilder
	// Acceptor register server acceptor which is used to handle incoming RSockets.
	Acceptor(acceptor ServerAcceptor) ServerTransportBuilder
	// OnStart register a handler when serve success.
	OnStart(onStart func()) ServerBuilder
}

ServerBuilder can be used to build a RSocket server.

func Receive

func Receive() ServerBuilder

Receive receives server connections from client RSockets.

Example
err := rsocket.Receive().
	Resume(rsocket.WithServerResumeSessionDuration(30 * time.Second)).
	Fragment(65535).
	Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
		// Handle close.
		sendingSocket.OnClose(func(err error) {
			log.Println("sending socket is closed")
		})

		// You can reject connection. For example, do some authorization.
		// return nil, errors.New("ACCESS_DENY")

		// Request to client.
		sendingSocket.RequestResponse(payload.NewString("Ping", time.Now().String())).
			DoOnSuccess(func(elem payload.Payload) {
				log.Println("response of Ping from client:", elem)
			}).
			SubscribeOn(scheduler.Elastic()).
			Subscribe(context.Background())
		// Return responser which just echo.
		return rsocket.NewAbstractSocket(
			rsocket.FireAndForget(func(msg payload.Payload) {
				log.Println("receive fnf:", msg)
			}),
			rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
				return mono.Just(msg)
			}),
			rsocket.RequestStream(func(msg payload.Payload) flux.Flux {
				return flux.Create(func(ctx context.Context, s flux.Sink) {
					for i := 0; i < 3; i++ {
						s.Next(payload.NewString(msg.DataUTF8(), fmt.Sprintf("This is response #%04d", i)))
					}
					s.Complete()
				})
			}),
			rsocket.RequestChannel(func(msgs rx.Publisher) flux.Flux {
				return msgs.(flux.Flux)
			}),
		), nil
	}).
	Transport("tcp://0.0.0.0:7878").
	Serve(context.Background())
panic(err)
Output:

type ServerTransportBuilder

type ServerTransportBuilder interface {
	// Transport specify transport string.
	Transport(transport string) Start
}

ServerTransportBuilder is used to build a RSocket server with custom Transport string.

type Start

type Start interface {
	// Serve serve RSocket server.
	Serve(ctx context.Context) error
	// Serve serve RSocket server with TLS.
	//
	// You can generate cert.pem and key.pem for local testing:
	//
	//	 go run $GOROOT/src/crypto/tls/generate_cert.go --host localhost
	//
	//	 Load X509
	//	cert, err := tls.LoadX509KeyPair("cert.pem", "key.pem")
	//	if err != nil {
	//		panic(err)
	//	}
	//	// Init TLS configuration.
	//	tc := &tls.Config{
	//		MinVersion:               tls.VersionTLS12,
	//		CurvePreferences:         []tls.CurveID{tls.CurveP521, tls.CurveP384, tls.CurveP256},
	//		PreferServerCipherSuites: true,
	//		CipherSuites: []uint16{
	//			tls.TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,
	//			tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
	//			tls.TLS_RSA_WITH_AES_256_GCM_SHA384,
	//			tls.TLS_RSA_WITH_AES_256_CBC_SHA,
	//		},
	//		Certificates: []tls.Certificate{cert},
	//	}
	ServeTLS(ctx context.Context, c *tls.Config) error
}

Start start a RSocket server.

type TransportOpts added in v0.4.0

type TransportOpts = func(*transportOpts)

TransportOpts represents options of transport.

func WithWebsocketHeaders added in v0.4.0

func WithWebsocketHeaders(headers map[string][]string) TransportOpts

WithWebsocketHeaders attach headers for websocket transport.

Directories

Path Synopsis
Package balancer defines APIs for load balancing in RSocket.
Package balancer defines APIs for load balancing in RSocket.
cmd
examples
internal
rx

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL