call

package
v0.4.26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package call implements an RPC mechanism.

Index

Constants

View Source
const (
	// CommunicationError is the type of the error returned by a call when some
	// communication error is encountered, typically a process or network
	// error. Check for it via errors.Is(call.CommunicationError).
	CommunicationError transportError = iota

	// Unreachable is the type of the error returned by a call when every
	// server is unreachable. Check for it via errors.Is(call.Unreachable).
	Unreachable
)

Variables

View Source
var Missing = Version{"__tombstone__"}

Missing is the version associated with a value that does not exist in the store.

TODO(rgrandl): this should be the same as the one in gke/internal/store/store.go. Should we move the version into a common file? Right now we have a duplicate version struct that is used both by the gke/store and stub/resolver.

Functions

func NewStub

func NewStub(name string, reg *codegen.Registration, conn Connection, tracer trace.Tracer, injectRetries int) codegen.Stub

NewStub creates a client-side stub of the type matching reg. Calls on the stub are sent on conn to the component with the specified name.

func RoundRobin

func RoundRobin() *roundRobin

RoundRobin returns a round-robin balancer.

func Serve

func Serve(ctx context.Context, l Listener, opts ServerOptions) error

Serve starts listening for connections and requests on l. It always returns a non-nil error and closes l.

func ServeOn

func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)

ServeOn serves client requests received over an already established network connection with a client. This can be useful in tests or when using custom networking transports.

Types

type Balancer

type Balancer interface {
	// Add adds a ReplicaConnection to the set of connections.
	Add(ReplicaConnection)

	// Remove removes a ReplicaConnection from the set of connections.
	Remove(ReplicaConnection)

	// Pick picks a ReplicaConnection from the set of connections.
	// Pick returns _,false if no connections are available.
	Pick(CallOptions) (ReplicaConnection, bool)
}

Balancer manages a set of ReplicaConnections and picks one of them per call. A Balancer requires external synchronization (no concurrent calls should be made to the same Balancer).

TODO(mwhittaker): Right now, balancers have no load information about endpoints. In the short term, we can at least add information about the number of pending requests for every endpoint.

func BalancerFunc

func BalancerFunc(pick func([]ReplicaConnection, CallOptions) (ReplicaConnection, bool)) Balancer

BalancerFunc returns a stateless, purely functional load balancer that calls pick to pick the connection to use.

type CallOptions

type CallOptions struct {
	// Retry indicates whether or not calls that failed due to communication
	// errors should be retried.
	Retry bool

	// ShardKey, if not 0, is the shard key that a Balancer can use to route a
	// call. A Balancer can always choose to ignore the ShardKey.
	//
	// TODO(mwhittaker): Figure out a way to have 0 be a valid shard key. Could
	// change to *uint64 for example.
	ShardKey uint64
}

CallOptions are call-specific options.

type ClientOptions

type ClientOptions struct {
	// Load balancer. Defaults to RoundRobin() if nil.
	Balancer Balancer

	// Logger. Defaults to a logger that logs to stderr.
	Logger *slog.Logger

	// If non-zero, each call will optimistically spin for a given duration
	// before blocking, waiting for the results.
	OptimisticSpinDuration time.Duration

	// All writes smaller than this limit are flattened into a single
	// buffer before being written on the connection. If zero, an appropriate
	// value is picked automatically. If negative, no flattening is done.
	WriteFlattenLimit int
}

ClientOptions are the options to configure an RPC client.

type Connection

type Connection interface {
	// Call makes an RPC over a Connection.
	Call(context.Context, MethodKey, []byte, CallOptions) ([]byte, error)

	// Close closes a connection. Pending invocations of Call are cancelled and
	// return an error. All future invocations of Call fail and return an error
	// immediately. Close can be called more than once.
	Close()
}

Connection allows a client to send RPCs.

func Connect

func Connect(ctx context.Context, resolver Resolver, opts ClientOptions) (Connection, error)

Connect creates a connection to the servers at the endpoints returned by the resolver.

type Endpoint

type Endpoint interface {
	// Dial returns an network connection to the endpoint.
	Dial(ctx context.Context) (net.Conn, error)

	// Address returns the address of the endpoint. If two endpoints have the
	// same address, then they are guaranteed to represent the same endpoint.
	// But, two endpoints with different addresses may also represent the same
	// endpoint (e.g., TCP("golang.org:http") and TCP("golang.org:80")).
	Address() string
}

An endpoint is a dialable entity with an address. For example, TCP("localhost:8000") returns an endpoint that dials the TCP server running on localhost:8000, and Unix("/tmp/unix.sock") returns an endpoint that dials the Unix socket /tmp/unix.sock.

func MTLS

func MTLS(config *tls.Config, ep Endpoint) Endpoint

MTLS returns an endpoint that performs MTLS authentication over the underlying endpoint. For example:

MTLS(&tls.Config{...}, TCP("golang.org:http"))
MTLS(&tls.Config{...}, Unix("unix.sock"))

REQUIRES: config is not nil

type Handler

type Handler func(ctx context.Context, args []byte) ([]byte, error)

Handler is a function that handles remote procedure calls. Regular application errors should be serialized in the returned bytes. A Handler should only return a non-nil error if the handler was not able to execute successfully.

type HandlerMap

type HandlerMap struct {
	// contains filtered or unexported fields
}

HandlerMap is a mapping from MethodID to a Handler. The zero value for a HandlerMap is an empty map.

func NewHandlerMap

func NewHandlerMap() *HandlerMap

NewHandlerMap returns a handler map to which the server handlers can be added. A "ready" handler is automatically registered in the new returned map.

func (*HandlerMap) AddHandlers

func (hm *HandlerMap) AddHandlers(name string, impl any) error

AddHandlers adds handlers for all methods of the component with the specified name. The handlers invoke methods on the specified impl.

func (*HandlerMap) Set

func (hm *HandlerMap) Set(component, method string, handler Handler)

Set registers a handler for the specified method of component.

type Listener

type Listener interface {
	Accept() (net.Conn, *HandlerMap, error)
	Close() error
	Addr() net.Addr
}

Listener allows the server to accept RPCs.

type MethodKey

type MethodKey [16]byte

MethodKey identifies a particular method on a component (formed by fingerprinting the component and method name).

func MakeMethodKey

func MakeMethodKey(component, method string) MethodKey

MakeMethodKey returns the fingerprint for the specified method on component.

type NetEndpoint

type NetEndpoint struct {
	Net  string // e.g., "tcp", "udp", "unix"
	Addr string // e.g., "localhost:8000", "/tmp/unix.sock"
}

NetEndpoint is an Endpoint that implements Dial using net.Dial.

func ParseNetEndpoint

func ParseNetEndpoint(endpoint string) (NetEndpoint, error)

ParseNetEndpoint parses a string with a format of net://addr into a NetAddress. For example,

ParseNetEndpoint("tcp://localhost:80") // NetEndpoint{"tcp", "localhost:80"}
ParseNetEndpoint("unix://unix.sock")   // NetEndpoint{"unix", "unix.sock"}

func TCP

func TCP(address string) NetEndpoint

TCP returns a TCP endpoint. The provided address is passed to net.Dial. For example:

TCP("golang.org:http")
TCP("192.0.2.1:http")
TCP("198.51.100.1:80")
TCP("[2001:db8::1]:domain")
TCP("[fe80::1%lo0]:53")
TCP(":80")

func Unix

func Unix(filename string) NetEndpoint

Unix returns an endpoint that uses Unix sockets. The provided filename is passed to net.Dial. For example:

Unix("unix.sock")
Unix("/tmp/unix.sock")

func (NetEndpoint) Address

func (ne NetEndpoint) Address() string

Address implements the Endpoint interface.

func (NetEndpoint) Dial

func (ne NetEndpoint) Dial(ctx context.Context) (net.Conn, error)

Dial implements the Endpoint interface.

func (NetEndpoint) String

func (ne NetEndpoint) String() string

type ReplicaConnection

type ReplicaConnection interface {
	// Address returns the name of the endpoint to which the ReplicaConnection
	// is connected.
	Address() string
}

ReplicaConnection is a connection to a single replica. A single Connection may consist of many ReplicaConnections (typically one per replica).

type Resolver

type Resolver interface {
	// IsConstant returns whether a resolver is constant. A constant resolver
	// returns a fixed set of endpoints that doesn't change over time. A
	// non-constant resolver manages a set of endpoints that does change over
	// time.
	IsConstant() bool

	// Resolve returns a resolver's set of dialable endpoints. For non-constant
	// resolvers, this set of endpoints may change over time. Every snapshot of
	// the set of endpoints is assigned a unique version. If you call the
	// Resolve method with a nil version, Resolve returns the current set of
	// endpoints and its version. If you call the Resolve method with a non-nil
	// version, then a Resolver either:
	//    1. Blocks until the latest set of endpoints has a version newer than
	//       the one provided, returning the new set of endpoints and a new
	//       version.
	//    2. Returns the same version, indicating that the Resolve should
	//       be called again after an appropriate delay.
	//
	// Example:
	//     if !resolver.IsConstant() {
	//         // Perform an unversioned, non-blocking Resolve to get the the
	//         // latest set of endpoints and its version.
	//         endpoints, version, err := resolver.Resolve(ctx, nil)
	//
	//         // Perform a versioned Resolve that either (1) blocks until a set
	//         // of endpoints exists with a version newer than `version`, or
	//         // (2) returns `version`, indicating that the Resolve should be
	//         // called again after an appropriate delay.
	//         newEndpoints, newVersion, err := resolver.Resolve(ctx, version)
	//     }
	//
	// If the resolver is constant, then Resolve only needs to be called once
	// with a nil version. The returned set of endpoints will never change, and
	// the returned version is nil.
	//
	//     if resolver.IsConstant() {
	//         // endpoints1 == endpoints2, and version1 == version2 == nil.
	//         endpoints1, version1, err := resolver.Resolve(ctx, nil)
	//         endpoints2, version2, err := resolver.Resolve(ctx, nil)
	//     }
	Resolve(ctx context.Context, version *Version) ([]Endpoint, *Version, error)
}

A Resolver manages a potentially changing set of endpoints. For example:

  • A DNS resolver might resolve a hostname like "google.com" into a set of IP addresses like ["74.125.142.106", "74.125.142.103", "74.125.142.99"].
  • A Kubernetes Service resolver might resolve a service name like "backend" into the IP addresses of the pods that implement the service.
  • A unix resolver might resolve a directory name like "/tmp/workers" into the set of unix socket files within the directory.

A Resolver can safely be used concurrently from multiple goroutines.

Example usage:

func printAddrs(ctx context.Context, resolver Resolver) error {
    var version *Version
    for ctx.Err() == nil {
        endpoints, newVersion, err = resolver.Resolve(ctx, version)
        if err != nil {
            return err
        }
        version = newVersion

        for _, endpoint := range endpoints {
            fmt.Println(endpoint.Address())
        }

        if resolver.IsConstant() {
            return nil
        }
    }
    return ctx.Err()
}

func NewConstantResolver

func NewConstantResolver(endpoints ...Endpoint) Resolver

NewConstantResolver returns a new resolver that returns the provided set of endpoints.

type ServerOptions

type ServerOptions struct {
	// Logger. Defaults to a logger that logs to stderr.
	Logger *slog.Logger

	// Tracer. Defaults to a discarding tracer.
	Tracer trace.Tracer

	// If positive, calls on the server are inlined and a new goroutine is
	// launched only if the call takes longer than the provided duration.
	// If zero, the system inlines call execution and automatically picks a
	// reasonable delay before the new goroutine is launched.
	// If negative, handlers are always started in a new goroutine.
	InlineHandlerDuration time.Duration

	// All writes smaller than this limit are flattened into a single
	// buffer before being written on the connection. If zero, an appropriate
	// value is picked automatically. If negative, no flattening is done.
	WriteFlattenLimit int
}

ServerOption are the options to configure an RPC server.

type Version

type Version struct {
	Opaque string
}

Version is the version associated with a resolver's set of endpoints. Versions are opaque entities and should not be inspected or interpreted. Versions should only ever be constructed by calling a resolver's Resolve method and should only ever be used by being passed to the same resolver's Resolve method.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL