balancer

package
v0.8.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Overview

Package balancer defines APIs for load balancing in RSocket.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Balancer

type Balancer interface {
	io.Closer
	// Put puts a new client.
	Put(client rsocket.Client) error
	// PutLabel puts a new client with a label.
	PutLabel(label string, client rsocket.Client) error
	// Next returns next balanced RSocket client.
	Next(context.Context) (rsocket.Client, bool)
	// OnLeave handle events when a client exit.
	OnLeave(fn func(label string))
	//Returns the balancer length
	Len() int
}

Balancer manage input RSocket clients.

func NewRoundRobinBalancer

func NewRoundRobinBalancer() Balancer

NewRoundRobinBalancer returns a new Round-Robin Balancer.

type Group

type Group struct {
	// contains filtered or unexported fields
}

Group manage a group of Balancer. Group can be used to create a simple RSocket Broker.

func NewGroup

func NewGroup(gen func() Balancer) *Group

NewGroup returns a new Group.

Example
group := NewGroup(func() Balancer {
	return NewRoundRobinBalancer()
})
defer func() {
	_ = group.Close()
}()
// Create a broker with resume.
err := rsocket.Receive().
	Resume(rsocket.WithServerResumeSessionDuration(10 * time.Second)).
	Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
		// Register service using Setup Metadata as service ID.
		if serviceID, ok := setup.MetadataUTF8(); ok {
			_ = group.Get(serviceID).Put(sendingSocket)
		}
		// Proxy requests by group.
		return rsocket.NewAbstractSocket(rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
			requestServiceID, ok := msg.MetadataUTF8()
			if !ok {
				panic(errors.New("missing service ID in metadata"))
			}
			fmt.Println("[broker] redirect request to service", requestServiceID)
			upstream, _ := group.Get(requestServiceID).Next(context.Background())
			fmt.Println("[broker] choose upstream:", upstream)
			return upstream.RequestResponse(msg)
		})), nil
	}).
	Transport(rsocket.TCPServer().SetAddr(":7878").Build()).
	Serve(context.Background())
if err != nil {
	panic(err)
}
Output:

func (*Group) Close

func (p *Group) Close() (err error)

Close close current RSocket group.

func (*Group) Get

func (p *Group) Get(id string) Balancer

Get returns a Balancer with custom id.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL