Documentation ¶
Index ¶
- type AbstractRSocket
- func (p AbstractRSocket) FireAndForget(message payload.Payload)
- func (p AbstractRSocket) MetadataPush(message payload.Payload)
- func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux
- func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
- func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
- type ClientSocket
- type Closeable
- type DuplexRSocket
- func (p *DuplexRSocket) Close() error
- func (p *DuplexRSocket) FireAndForget(sending payload.Payload)
- func (p *DuplexRSocket) MetadataPush(payload payload.Payload)
- func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux)
- func (p *DuplexRSocket) RequestResponse(pl payload.Payload) (mo mono.Mono)
- func (p *DuplexRSocket) RequestStream(sending payload.Payload) (ret flux.Flux)
- func (p *DuplexRSocket) SetError(e error)
- func (p *DuplexRSocket) SetResponder(responder Responder)
- func (p *DuplexRSocket) SetTransport(tp *transport.Transport)
- type Responder
- type ServerSocket
- type SetupInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AbstractRSocket ¶
type AbstractRSocket struct { FF func(payload.Payload) MP func(payload.Payload) RR func(payload.Payload) mono.Mono RS func(payload.Payload) flux.Flux RC func(rx.Publisher) flux.Flux }
AbstractRSocket represents an abstract RSocket.
func (AbstractRSocket) FireAndForget ¶
func (p AbstractRSocket) FireAndForget(message payload.Payload)
FireAndForget starts a request of FireAndForget.
func (AbstractRSocket) MetadataPush ¶
func (p AbstractRSocket) MetadataPush(message payload.Payload)
MetadataPush starts a request of MetadataPush.
func (AbstractRSocket) RequestChannel ¶
func (p AbstractRSocket) RequestChannel(messages rx.Publisher) flux.Flux
RequestChannel starts a request of RequestChannel.
func (AbstractRSocket) RequestResponse ¶
func (p AbstractRSocket) RequestResponse(message payload.Payload) mono.Mono
RequestResponse starts a request of RequestResponse.
func (AbstractRSocket) RequestStream ¶
func (p AbstractRSocket) RequestStream(message payload.Payload) flux.Flux
RequestStream starts a request of RequestStream.
type ClientSocket ¶
type ClientSocket interface { Closeable Responder // Setup setups current socket. Setup(ctx context.Context, setup *SetupInfo) (err error) }
ClientSocket represents a client-side socket.
func NewClient ¶
func NewClient(uri *transport.URI, socket *DuplexRSocket, tc *tls.Config, headers map[string][]string) ClientSocket
NewClient create a simple client-side socket.
func NewClientResume ¶
func NewClientResume(uri *transport.URI, socket *DuplexRSocket, tc *tls.Config, headers map[string][]string) ClientSocket
NewClientResume creates a client-side socket with resume support.
type Closeable ¶
type Closeable interface { io.Closer // OnClose bind a handler when closing. OnClose(closer func(error)) }
Closeable represents a closeable target.
type DuplexRSocket ¶
type DuplexRSocket struct {
// contains filtered or unexported fields
}
DuplexRSocket represents a socket of RSocket which can be a requester or a responder.
func NewClientDuplexRSocket ¶
func NewClientDuplexRSocket( mtu int, keepaliveInterval time.Duration, ) (s *DuplexRSocket)
NewClientDuplexRSocket creates a new client-side DuplexRSocket.
func NewServerDuplexRSocket ¶
func NewServerDuplexRSocket(mtu int, leases lease.Leases) *DuplexRSocket
NewServerDuplexRSocket creates a new server-side DuplexRSocket.
func (*DuplexRSocket) FireAndForget ¶
func (p *DuplexRSocket) FireAndForget(sending payload.Payload)
FireAndForget start a request of FireAndForget.
func (*DuplexRSocket) MetadataPush ¶
func (p *DuplexRSocket) MetadataPush(payload payload.Payload)
MetadataPush start a request of MetadataPush.
func (*DuplexRSocket) RequestChannel ¶
func (p *DuplexRSocket) RequestChannel(publisher rx.Publisher) (ret flux.Flux)
RequestChannel start a request of RequestChannel.
func (*DuplexRSocket) RequestResponse ¶
func (p *DuplexRSocket) RequestResponse(pl payload.Payload) (mo mono.Mono)
RequestResponse start a request of RequestResponse.
func (*DuplexRSocket) RequestStream ¶
func (p *DuplexRSocket) RequestStream(sending payload.Payload) (ret flux.Flux)
RequestStream start a request of RequestStream.
func (*DuplexRSocket) SetError ¶ added in v0.4.0
func (p *DuplexRSocket) SetError(e error)
SetError sets error for current socket.
func (*DuplexRSocket) SetResponder ¶
func (p *DuplexRSocket) SetResponder(responder Responder)
SetResponder sets a responder for current socket.
func (*DuplexRSocket) SetTransport ¶
func (p *DuplexRSocket) SetTransport(tp *transport.Transport)
SetTransport sets a transport for current socket.
type Responder ¶
type Responder interface { // FireAndForget is a single one-way message. FireAndForget(message payload.Payload) // MetadataPush sends asynchronous Metadata frame. MetadataPush(message payload.Payload) // RequestResponse request single response. RequestResponse(message payload.Payload) mono.Mono // RequestStream request a completable stream. RequestStream(message payload.Payload) flux.Flux // RequestChannel request a completable stream in both directions. RequestChannel(messages rx.Publisher) flux.Flux }
Responder is a contract providing different interaction models for RSocket protocol.
type ServerSocket ¶
type ServerSocket interface { Closeable Responder // SetResponder sets a responder for current socket. SetResponder(responder Responder) // SetTransport sets a transport for current socket. SetTransport(tp *transport.Transport) // Pause pause current socket. Pause() bool // Start starts current socket. Start(ctx context.Context) error // Token returns token of socket. Token() (token []byte, ok bool) }
ServerSocket represents a server-side socket.
func NewServer ¶
func NewServer(socket *DuplexRSocket) ServerSocket
NewServer creates a new server-side socket.
func NewServerResume ¶
func NewServerResume(socket *DuplexRSocket, token []byte) ServerSocket
NewServerResume creates a new server-side socket with resume support.