rinq

package
v0.0.0-...-335d32b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2018 License: MIT Imports: 11 Imported by: 2

Documentation

Overview

Package rinq is a cross-language command bus and distributed ephemeral data store.

Example (MathService)

This example shows how to issue a command call from one peer to another.

There is a "server" peer, which performs basic mathematical operations, and a "client" peer which invokes those operations.

In the example both the client peer and the server peer are running in the same process. Outside of an example, these peers would typically be running on separate servers.

//go:build !without_amqp && !without_examples
// +build !without_amqp,!without_examples

package main

import (
	"context"
	"fmt"

	"github.com/rinq/rinq-go/src/rinq"
	"github.com/rinq/rinq-go/src/rinqamqp"
)

// arguments contains the parameters for the commands in the "math" namespace
type arguments struct {
	Left, Right int
}

// mathHandler is the command handler for the "math" namespace
func mathHandler(
	ctx context.Context,
	req rinq.Request,
	res rinq.Response,
) {
	defer req.Payload.Close()

	// decode the request payload into the arguments struct
	var args arguments
	if err := req.Payload.Decode(&args); err != nil {
		res.Fail("invalid-arguments", "could not decode arguments")
		return
	}

	var result int

	switch req.Command {
	case "add":
		result = args.Left + args.Right
	case "sub":
		result = args.Left - args.Right
	default:
		res.Fail("unknown-command", "no such command: "+req.Command)
		return
	}

	// send the result in the response payload
	payload := rinq.NewPayload(result)
	defer payload.Close()

	res.Done(payload)
}

// This example shows how to issue a command call from one peer to another.
//
// There is a "server" peer, which performs basic mathematical operations,
// and a "client" peer which invokes those operations.
//
// In the example both the client peer and the server peer are running in the
// same process. Outside of an example, these peers would typically be running
// on separate servers.
func main() {
	// create a new peer to act as the "server" and start listening for commands
	// in the "math" namespace.
	serverPeer, err := rinqamqp.DialEnv()
	if err != nil {
		panic(err)
	}
	defer serverPeer.Stop()
	serverPeer.Listen("math", mathHandler)

	// create a new peer to act as the "client", and a session to make the
	// call.
	clientPeer, err := rinqamqp.DialEnv()
	if err != nil {
		panic(err)
	}
	defer clientPeer.Stop()

	sess := clientPeer.Session()
	defer sess.Destroy()

	// call the "math::add" command
	ctx := context.Background()
	args := rinq.NewPayload(arguments{1, 2})
	result, err := sess.Call(ctx, "math", "add", args)
	if err != nil {
		panic(err)
	}

	fmt.Printf("1 + 2 = %s\n", result)
}
Output:

1 + 2 = 3

Index

Examples

Constants

View Source
const Version = "0.7.0"

Version is the rinq-go library version.

Variables

This section is empty.

Functions

func FailureType

func FailureType(err error) string

FailureType returns the failure type of err; or an empty string if err is not a Failure.

func IsCommandError

func IsCommandError(err error) bool

IsCommandError returns true if err was sent in response to a command request, as opposed to a local error that occurred when attempting to send the request.

func IsFailure

func IsFailure(err error) bool

IsFailure returns true if err is a Failure.

func IsFailureType

func IsFailureType(t string, err error) bool

IsFailureType returns true if err is a Failure with a type of t.

func IsNotFound

func IsNotFound(err error) bool

IsNotFound returns true if err is a NotFoundError.

func ShouldRetry

func ShouldRetry(err error) bool

ShouldRetry returns true if a call to Revision.Get(), GetMany(), Update() or Destroy() failed because the revision is out of date.

The operation should be retried on the latest revision of the session, which can be retrieved with Revision.Refresh().

Types

type AsyncHandler

type AsyncHandler func(
	ctx context.Context,
	sess Session, msgID ident.MessageID,
	ns, cmd string,
	in *Payload, err error,
)

AsyncHandler is a call-back function invoked when a response is received to a command call made with Session.CallAsync()

If err is non-nil, it always represents a server-side error.

IsFailure(err) returns true if the error is an application-defined failure. Failures are server-side errors that are part of the command's public API, as opposed to unexpected errors. If err is a failure, in contains the failure's application-defined payload; for this reason in.Close() must be called, even if err is non-nil.

The handler is responsible for closing the in payload, however there is no requirement that the payload be closed during the execution of the handler.

type Attr

type Attr struct {
	// Key is an application-defined identifier for the attribute. Keys are
	// unique within a session. Any valid UTF-8 string can be used a key,
	// including the empty string.
	Key string `json:"k"`

	// Value is the attribute's value. Any valid UTF-8 string can be used as a
	// value, including the empty string.
	Value string `json:"v,omitempty"`

	// IsFrozen is true if the attribute is "frozen" such that it can never be
	// altered again (for a given session).
	IsFrozen bool `json:"f,omitempty"`
}

Attr is a sesssion attribute.

Sessions contain a versioned key/value store. See the Session interface for more information.

func Freeze

func Freeze(key, value string) Attr

Freeze is a convenience method that returns an Attr with the specified key and value, and the IsFrozen flag set to true.

Example
attr := Freeze("foo", "bar")

fmt.Println(attr)
Output:

foo@bar

func Set

func Set(key, value string) Attr

Set is a convenience method that creates an Attr with the specified key and value.

Example
attr := Set("foo", "bar")

fmt.Println(attr)
Output:

foo=bar

func (Attr) String

func (attr Attr) String() string

type AttrTable

type AttrTable interface {
	// Get returns the attribute with key k.
	Get(k string) (Attr, bool)

	// Each calls fn for each attribute in the collection. Iteration stops
	// when fn returns false.
	Each(fn func(Attr) bool)

	// IsEmpty returns true if there are no attributes in the table.
	IsEmpty() bool

	String() string
}

AttrTable is a read-only table of session attributes.

type CommandError

type CommandError string

CommandError is an error (as opposed to a Failure) sent in response to a command.

func (CommandError) Error

func (err CommandError) Error() string

type CommandHandler

type CommandHandler func(
	ctx context.Context,
	req Request,
	res Response,
)

CommandHandler is a callback-function invoked when a command request is received by the peer.

Command requests can only be received for namespaces that a peer is listening to. See Peer.Listen() to start listening.

The handler MUST close the response by calling res.Done(), res.Error() or res.Close(); otherwise the request may be redelivered, possibly to a different peer.

The handler is responsible for closing req.Payload, however there is no requirement that the payload be closed during the execution of the handler.

type Failure

type Failure struct {
	// Type is an application-defined string identifying the failure.
	// They serve the same purpose as an error code. They should be concise
	// and easily understandable within the context of the application's API.
	Type string

	// Message is an optional human-readable description of the failure.
	Message string

	// Payload is an optional application-defined payload.
	Payload *Payload
}

Failure is an application-defined command error.

Failures are used to indicate an error that is "expected" within the domain of the command that produced it. Failures form part of the command's API and should usually be handled by the caller.

Failures can be produced in a command handler by calling Response.Fail() or passing a Failure value to Response.Error().

func (Failure) Error

func (err Failure) Error() string

type FrozenAttributesError

type FrozenAttributesError struct {
	Ref ident.Ref
}

FrozenAttributesError indicates a failure to update a session because at least one of the attributes being updated is frozen.

func (FrozenAttributesError) Error

func (err FrozenAttributesError) Error() string

type NotFoundError

type NotFoundError struct {
	ID ident.SessionID
}

NotFoundError indicates that an operation failed because the session does not exist.

func (NotFoundError) Error

func (err NotFoundError) Error() string

type Notification

type Notification struct {
	// ID uniquely identifies the notification.
	ID ident.MessageID

	// Source refers to the session that sent the notification.
	Source Revision

	// Namespace is the notification namespace. Namespaces are used to route
	// notifications to only those sessions that intend to handle them.
	Namespace string

	// Type is an application-defined notification type.
	Type string

	// Payload contains optional application-defined information. The handler
	// that accepts the notifiation is responsible for closing the payload,
	// however there is no requirement that the payload be closed during the
	// execution of the handler.
	Payload *Payload

	// IsMulticast is true if the notification was (potentially) sent to more
	// than one session.
	IsMulticast bool

	// For multicast notifications, Constraint contains the attributes used as
	// criteria for selecting which sessions receive the notification. The
	// constraint is nil if IsMulticast is false.
	Constraint constraint.Constraint
}

Notification holds information about an inter-session notification.

type NotificationHandler

type NotificationHandler func(
	ctx context.Context,
	target Session,
	n Notification,
)

NotificationHandler is a callback-function invoked when an inter-session notification is received.

Notifications can only be received for namespaces that a session is listening to. See Session.Listen() to start listening.

The handler is responsible for closing n.Payload, however there is no requirement that the payload be closed during the execution of the handler.

type Payload

type Payload struct {
	// contains filtered or unexported fields
}

Payload is an immutable, application-defined value that is included in a command request, command response, or inter-session notification.

A nil-payload pointer is equivalent to a payload with a value of nil.

Payloads must be closed by the application when no longer required. This includes payloads constructed by calling NewPayload() or NewPayloadFromBytes(), as well as any payload returned by a Rinq operation (such as Session.Call()), or passed to a callback function that was provided by the application.

Payloads are NOT safe for concurrent use. To share a payload across multiple goroutines, call Payload.Clone() to obtain a second payload that references the same underlying data.

Payload values can be any value that can be represented using CBOR encoding. See http://cbor.io/ for more information.

Payloads are modeled in this way to allow an application to forward incoming payloads without the need to decode and re-encode them.

func NewPayload

func NewPayload(v interface{}) *Payload

NewPayload creates a new payload from an arbitrary value.

func NewPayloadFromBytes

func NewPayloadFromBytes(buf []byte) *Payload

NewPayloadFromBytes creates a new payload from a binary representation. Ownership of the byte-slice is transferred to the payload. An empty byte-slice is equivalent to the nil value.

func (*Payload) Bytes

func (p *Payload) Bytes() []byte

Bytes returns the binary representation of the payload, in CBOR encoding.

The returned byte-slice is invalidated when the payload is closed, it must be copied if it is intended to be used for longer than the lifetime of the payload.

If the payload was created from a non-empty byte-slice, the return value is always that same byte-slice, unless the payload has been closed.

If the payload was created from a nil value, the returned byte-slice is nil.

func (*Payload) Clone

func (p *Payload) Clone() *Payload

Clone returns a copy of this payload.

func (*Payload) Close

func (p *Payload) Close()

Close releases any resources held by the payload, resetting the payload to represent the nil value.

func (*Payload) Decode

func (p *Payload) Decode(value interface{}) error

Decode unpacks the payload into the given value.

func (*Payload) Len

func (p *Payload) Len() int

Len returns the encoded payload length, in bytes. A length of zero indicates a nil payload value.

func (*Payload) String

func (p *Payload) String() string

String returns a human-readable representation of the payload. No guarantees are made about the format of the string.

func (*Payload) Value

func (p *Payload) Value() interface{}

Value returns the payload value.

type Peer

type Peer interface {
	// ID returns the peer's unique identifier.
	ID() ident.PeerID

	// Session returns a new session owned by this peer.
	//
	// Creating a session does not perform any network IO. The only limit to the
	// number of sessions is the memory required to store them.
	//
	// Sessions created after the peer has been stopped are unusable. Any
	// operation will fail immediately.
	Session() Session

	// Listen starts listening for command requests in the given namespace.
	//
	// When a command request is received with a namespace equal to ns, the
	// handler h is invoked.
	//
	// Repeated calls to Listen() with the same namespace simply changes the
	// handler associated with that namespace.
	//
	// h is invoked on its own goroutine for each command request.
	Listen(ns string, h CommandHandler) error

	// Unlisten stops listening for command requests in the given namepsace.
	//
	// If the peer is not currently listening to ns, nil is returned immediately.
	Unlisten(ns string) error

	// Done returns a channel that is closed when the peer is stopped.
	//
	// Err() may be called to obtain the error that caused the peer to stop, if
	// any occurred.
	Done() <-chan struct{}

	// Err returns the error that caused the Done() channel to close.
	//
	// A nil return value indicates that the peer was stopped because Stop() or
	// GracefulStop() has been called.
	Err() error

	// Stop instructs the peer to disconnect from the network immediately.
	//
	// Stop does NOT block until the peer is disconnected. Use the Done()
	// channel to wait for the peer to disconnect.
	Stop()

	// GracefulStop() instructs the peer to disconnect from the network once
	// all pending operations have completed.
	//
	// Any calls to Session.Call(), command handlers or notification handlers
	// must return before the peer has stopped.
	//
	// GracefulStop does NOT block until the peer is disconnected. Use the
	// Done() channel to wait for the peer to disconnect.
	GracefulStop()
}

Peer represents a connection to a Rinq network.

Peers can act as a server, responding to application-defined commands. Use Peer.Listen() to start accepting incoming command requests.

Command request are sent by sessions, represented by the Session interface. Sessions can also send notifications to other sessions. Sessions are created by calling Peer.Session().

Each peer is assigned a unique ID, which is represented by the PeerID struct. All IDs generated by the peer, such as session IDs and message IDs contain the peer ID, so that they can be traced to their origin easily.

Example (Listen)

This example illustrates how to listen for incoming command requests.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

peer.Listen("my-api", func(
	ctx context.Context,
	req Request,
	res Response,
) {
	defer req.Payload.Close()
	// handle the command
	res.Close()
})

if false { // prevent the example from blocking forever.
	<-peer.Done()
}
Output:

Example (Session)

This example illustrates how to establish a new session.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

sess := peer.Session()
defer sess.Destroy()

fmt.Printf("created session #%d\n", sess.ID().Seq)
Output:

created session #1

type Request

type Request struct {
	// ID uniquely identifies the command request.
	ID ident.MessageID

	// Source is the revision of the session that sent the request, at the time
	// it was sent (which is not necessarily the latest).
	Source Revision

	// Namespace is the command namespace. Namespaces are used to route command
	// requests to the appropriate peer and command handler.
	Namespace string

	// Command is the application-defined command name for the request. The
	// command is logged for each request.
	Command string

	// Payload contains optional application-defined information about the
	// request, such as arguments to the command. The handler that accepts the
	// request is responsible for closing the payload, however there is no
	// requirement that the payload be closed during the execution of the handler.
	Payload *Payload
}

Request holds information about an incoming command request.

type Response

type Response interface {
	// IsRequired returns true if the sender is waiting for the response.
	//
	// If the response is not required, any payload data sent is discarded.
	// The response must always be closed, even if IsRequired() returns false.
	IsRequired() bool

	// IsClosed returns true if the response has already been closed.
	IsClosed() bool

	// Done sends a payload to the source session and closes the response.
	//
	// A panic occurs if the response has already been closed.
	Done(*Payload)

	// Error sends an error to the source session and closes the response.
	//
	// A panic occurs if the response has already been closed.
	Error(error)

	// Fail is a convenience method that creates a Failure and passes it to the
	// Error() method. The created failure is returned.
	//
	// The failure type t is used verbatim. The failure message is formatted
	// according to the format specifier f, interpolated with values from v.
	//
	// A panic occurs if the response has already been closed or if t is empty.
	Fail(t, f string, v ...interface{}) Failure

	// Close finalizes the response.
	//
	// If the origin session is expecting response it will receive a nil payload.
	//
	// It is not an error to close a response multiple times. The return value
	// is true the first time Close() is called, and false on subsequent calls.
	Close() bool
}

Response sends a reply to incoming command requests.

Example (Fail)

This example illustrates how to respond to a command request with an application-defined failure.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

peer.Listen("my-api", func(
	ctx context.Context,
	req Request,
	res Response,
) {
	defer req.Payload.Close()

	res.Fail(
		"my-api-error",
		"the call to %s failed spectacularly!",
		req.Command,
	)
})

sess := peer.Session()
defer sess.Destroy()

in, err := sess.Call(context.Background(), "my-api", "test", nil)
defer in.Close()

fmt.Println(err)
Output:

my-api-error: the call to test failed spectacularly!

type Revision

type Revision interface {
	// SessionID returns the ID of the underlying session.
	SessionID() ident.SessionID

	// Refresh returns the latest revision of the session.
	//
	// If the session has been destroyed, err is nil, but any operations on rev
	// will return an error e such that IsNotFound(e) is true. This means that
	// any non-nil err indicates an unexpected error occurred when querying the
	// session state.
	Refresh(ctx context.Context) (rev Revision, err error)

	// Get returns the attribute with key k within the ns namespace of the
	// attribute table.
	//
	// The returned attribute is guaranteed to be correct as of Ref().Rev.
	// Non-existent attributes are equivalent to empty attributes, therefore it
	// is not an error to request a key that has never been created.
	//
	// Peers do not always have a complete copy of the attribute table. If the
	// attribute value is unknown it is fetched from the owning peer.
	//
	// If the attribute can not be retrieved because it has already been
	// modified, ShouldRetry(err) returns true. To fetch the attribute value at
	// the later revision, first call Refresh() then retry the Get() on the
	// newer revision.
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// revision can not be queried.
	Get(ctx context.Context, ns, k string) (attr Attr, err error)

	// GetMany returns the attributes with keys in k within the ns namespace of
	// the attribute table.
	//
	// The returned attributes are guaranteed to be correct as of Ref().Rev.
	// Non-existent attributes are equivalent to empty attributes, therefore it
	// is not an error to request keys that have never been created.
	//
	// Peers do not always have a complete copy of the attribute table. If any
	// of the attribute values are unknown they are fetched from the owning peer.
	//
	// If any of the attributes can not be retrieved because they have already
	// been modified, ShouldRetry(err) returns true. To fetch the attribute
	// values at the later revision, first call Refresh() then retry the
	// GetMany() on the newer revision.
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// revision can not be queried.
	//
	// If err is nil, t contains all of the attributes specified in k.
	GetMany(ctx context.Context, ns string, k ...string) (t AttrTable, err error)

	// Update atomically modifies a set of attributes within the ns namespace of
	// the attribute table.
	//
	// A successful update produces a new revision.
	//
	// Each update is atomic; either all of the attributes in attrs are updated,
	// or the attribute table remains unchanged. On success, rev is the newly
	// created revision.
	//
	// The following conditions must be met for an update to succeed:
	//
	// 1. The session revision represented by this instance must be the latest
	//    revision. If Ref().Rev is not the latest revision the update fails;
	//    ShouldRetry(err) returns true.
	//
	// 2. All attribute changes must reference non-frozen attributes. If any of
	//    attributes being updated are already frozen the update fails and
	//    ShouldRetry(err) returns false.
	//
	// If attrs is empty no update occurs, rev is this revision and err is nil.
	//
	// As a convenience, if the update fails for any reason, rev is this
	// revision. This allows the caller to assign the return value to an
	// existing variable without first checking for errors.
	Update(ctx context.Context, ns string, attrs ...Attr) (rev Revision, err error)

	// Clear is an update operation that atomically sets the value of each
	// attribute within the ns namespace to the empty string.
	//
	// The sematics are the same as for Update(). This means the operation fails
	// if ANY attribute in the ns namespace is frozen.
	//
	// As a convenience, if the clear operation fails for any reason, rev is
	// this revision. This allows the caller to assign the return value to an
	// existing variable without first checking for errors.
	Clear(ctx context.Context, ns string) (rev Revision, err error)

	// Destroy terminates the session.
	//
	// The session revision represented by this instance must be the latest
	// revision. If Ref().Rev is not the latest revision the destroy fails;
	// ShouldRetry(err) returns true.
	Destroy(ctx context.Context) (err error)
}

Revision represents a specific revision of a session.

Revision is the sole interface for manipulating a session's attribute table.

The underlying session may be "local", i.e. owned by a peer running in this process, or "remote", owned by a different peer.

For remote sessions, operations may require network IO. Deadlines are honored for all methods that accept a context.

Example (Get)

This example illustrates how to read an attribute from a session.

It includes logic necessary to fetch the attribute even if the Revision in use is out-of-date, by retrying on the latest revision.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

sess := peer.Session()
defer sess.Destroy()

rev := sess.CurrentRevision()

ctx := context.Background()
var attr Attr
for {
	attr, err = rev.Get(ctx, "my-api", "user-id")
	if err != nil {
		if ShouldRetry(err) {
			// the attribute could not be fetched because it has been
			// updated since rev was obtained
			rev, err = rev.Refresh(ctx)
			if err == nil {
				continue
			}
		}
		panic(err)
	}

	break
}

if attr.Value == "" {
	fmt.Println("user is not logged in")
}
Output:

user is not logged in
Example (Update)

This example illustrates how to modify an attribute in a session.

It includes logic to retry in the face of an optimistic-lock failure, which occurs if the revision is out of date.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

sess := peer.Session()
defer sess.Destroy()

rev := sess.CurrentRevision()

ctx := context.Background()

for {
	rev, err = rev.Update(ctx, "my-api", Set("user-id", "123"))
	if err != nil {
		if ShouldRetry(err) {
			// the session could not be updated because rev is out of date
			rev, err = rev.Refresh(ctx)
			if err == nil {
				continue
			}
		}
		panic(err)
	}

	fmt.Println("updated to new revision")
	break
}
Output:

updated to new revision

type Session

type Session interface {
	// ID returns the session's unique identifier.
	ID() ident.SessionID

	// CurrentRevision returns the current revision of this session.
	CurrentRevision() Revision

	// Call sends a command request to the next available peer listening to the
	// ns namespace and waits for a response.
	//
	// In the context of the call, the sessions owning peer is the "client" and
	// the listening peer is the "server". The client and server may be the same
	// peer.
	//
	// cmd and out are an application-defined command name and request payload,
	// respectively. Both are passed to the command handler on the server.
	//
	// Calls always use a deadline; if ctx does not have a deadline, a timeout
	// described by options.DefaultTimeout() is used.
	//
	// If the call completes successfully, err is nil and in is the
	// application-defined response payload sent by the server.
	//
	// If err is non-nil, it may represent either client-side error or a
	// server-side error. IsServerError(err) returns true if the error occurred
	// on the server.
	//
	// IsFailure(err) returns true if the error is an application-defined
	// failure. Failures are server-side errors that are part of the command's
	// public API, as opposed to unexpected errors. If err is a failure, out
	// contains the failure's application-defined payload; for this reason
	// out.Close() must always be called, even if err is non-nil.
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// command request can not be sent.
	Call(ctx context.Context, ns, cmd string, out *Payload) (in *Payload, err error)

	// CallAync sends a command request to the next available peer listening to
	// the ns namespace and instructs it to send a response, but does not block.
	//
	// cmd and out are an application-defined command name and request payload,
	// respectively. Both are passed to the command handler on the server.
	//
	// id is a value identifying the outgoing command request.
	//
	// When a response is received, the handler specified by SetAsyncHandler()
	// is invoked. It is passed the id, namespace and command name of the
	// request, along with the response payload and error.
	//
	// It is the application's responsibility to correlate the request with the
	// response and handle the context deadline. The request is NOT tracked by
	// the session and as such the handler is never invoked in the event of a
	// timeout.
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// command request can not be sent.
	CallAsync(ctx context.Context, ns, cmd string, out *Payload) (id ident.MessageID, err error)

	// SetAsyncHandler sets the asynchronous call handler.
	//
	// h is invoked for each command response received to a command request made
	// with CallAsync().
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// handler can not be set.
	SetAsyncHandler(h AsyncHandler) error

	// Execute sends a command request to the next available peer listening to
	// the ns namespace and returns immediately.
	//
	// cmd and out are an application-defined command name and request payload,
	// respectively. Both are passed to the command handler on the server.
	//
	// If IsNotFound(err) returns true, the session has been destroyed and the
	// command request can not be sent.
	Execute(ctx context.Context, ns, cmd string, out *Payload) (err error)

	// Notify sends a message directly to another session listening to the ns
	// namespace.
	//
	// t and out are an application-defined notification type and payload,
	// respectively. Both are passed to the notification handler configured on
	// the session identified by s.
	//
	// If IsNotFound(err) returns true, this session has been destroyed and the
	// notification can not be sent.
	Notify(ctx context.Context, ns, t string, s ident.SessionID, out *Payload) (err error)

	// NotifyMany sends a message to multiple sessions that are listening to the
	// ns namespace.
	//
	// The constraint c is a set of attribute key/value pairs that a session
	// must have in the ns namespace of its attribute table in order to receive
	// the notification.
	//
	// t and out are an application-defined notification type and payload,
	// respectively. Both are passed to the notification handlers configured on
	// those sessions that match c.
	//
	// If IsNotFound(err) returns true, this session has been destroyed and the
	// notification can not be sent.
	NotifyMany(ctx context.Context, ns, t string, c constraint.Constraint, out *Payload) error

	// Listen begins listening for notifications sent to this session in the ns
	// namespace.
	//
	// When a notification is received with a namespace equal to ns, h is invoked.
	//
	// h is invoked on its own goroutine for each notification.
	Listen(ns string, h NotificationHandler) error

	// Unlisten stops listening for notifications from the ns namespace.
	//
	// If the session is not currently listening for notifications, nil is
	// returned immediately.
	Unlisten(ns string) error

	// Destroy terminates the session.
	//
	// Destroy does NOT block until the session is destroyed, use the
	// Session.Done() channel to wait for the session to be destroyed.
	Destroy()

	// Done returns a channel that is closed when the session is destroyed and
	// any pending Session.Call() operations have completed.
	//
	// The session may be destroyed directly with Destroy(), or via a Revision
	// that refers to this session, either locally or remotely.
	//
	// All sessions are destroyed when their owning peer is stopped.
	Done() <-chan struct{}
}

Session is an interface representing a "local" session, that is, a session created by a peer running in this process.

Sessions are the "clients" on a Rinq network, able to issue command requests and send notifications to other sessions.

Sessions are created by calling Peer.Session(). The peer that creates a session is called the "owning peer".

Each session has an in-memory attribute table, which can be used to store application-defined key/value pairs. A session's attribute table can be modified locally, as well as remotely by peers that have received a command request or notification from the session.

The attribute table is namespaced. Any operation performed on the attribute table occurs within a single namespace.

The attribute table is versioned. Each revision of the attribute table is represented by the Revision interface.

An optimistic-locking strategy is employed to protect the attribute table against concurrent writes. In order for a write to succeed, it must be made through a Revision value that represents the current (most recent) revision.

Individual attributes in the table can be "frozen", preventing any further changes to that attribute.

Example (CallAsync)

This example shows how to make an asynchronous command call.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

// listen for command requests
peer.Listen("my-api", func(
	ctx context.Context,
	req Request,
	res Response,
) {
	defer req.Payload.Close()

	payload := NewPayload("<payload>")
	defer payload.Close()

	res.Done(payload)
})

sess := peer.Session()
defer sess.Destroy()

// setup the asynchronous response handler
if err := sess.SetAsyncHandler(func(
	ctx context.Context,
	s Session, _ ident.MessageID,
	ns, cmd string,
	in *Payload, err error,
) {
	defer in.Close()
	peer.Stop()

	fmt.Printf("received %s::%s response with %s payload\n", ns, cmd, in.Value())
}); err != nil {
	panic(err)
}

// send the command request
if _, err := sess.CallAsync(
	context.Background(),
	"my-api",
	"test",
	nil,
); err != nil {
	panic(err)
}

<-peer.Done()
Output:

received my-api::test response with <payload> payload
Example (Notify)

This example shows how to send a notification from one session to another.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}
defer peer.Stop()

// create a session to receive the notification
recv := peer.Session()
defer recv.Destroy()

if err := recv.Listen(
	"my-api",
	func(
		ctx context.Context,
		target Session,
		n Notification,
	) {
		defer n.Payload.Close()
		peer.Stop()

		fmt.Printf("received %s::%s with %s payload\n", n.Namespace, n.Type, n.Payload.Value())
	},
); err != nil {
	panic(err)
}

// create a session to send the notification to recv
send := peer.Session()
defer send.Destroy()

payload := NewPayload("<payload>")
defer payload.Close()

if err := send.Notify(
	context.Background(),
	"my-api",
	"<type>",
	recv.ID(),
	payload,
); err != nil {
	panic(err)
}

<-peer.Done()
Output:

received my-api::<type> with <payload> payload
Example (NotifyMany)

This example shows how to send a notification from one session to several sessions that contain specific attribute values.

peer, err := rinqamqp.DialEnv()
if err != nil {
	panic(err)
}

// create three sessions for receiving notifications
recv1 := peer.Session()
recv2 := peer.Session()
recv3 := peer.Session()

// create a notification handler that stops the peer once TWO notifications
// have been received
var recvCount int32
handler := func(ctx context.Context, target Session, n Notification) {
	defer n.Payload.Close()

	if target == recv3 {
		panic("message delivered to unexpected session")
	}

	fmt.Printf("received %s::%s with %s payload\n", n.Namespace, n.Type, n.Payload.Value())

	if atomic.AddInt32(&recvCount, 1) == 2 {
		peer.Stop()
	}
}

// configure all three sessions to listen for notifications
for _, s := range []Session{recv1, recv2, recv3} {
	if err := s.Listen("my-api", handler); err != nil {
		panic(err)
	}
}

// update the first TWO sessions with a "foo" attribute
for _, s := range []Session{recv1, recv2} {
	rev := s.CurrentRevision()

	if _, err := rev.Update(
		context.Background(),
		"my-api",
		Freeze("foo", "bar"),
	); err != nil {
		panic(err)
	}
}

// create a session to send the notification to recv
send := peer.Session()

payload := NewPayload("<payload>")
defer payload.Close()

// constrain the notification to only those sessions that have a "foo"
// attribute with a value of "bar"
con := constraint.Equal("foo", "bar")

if err := send.NotifyMany(
	context.Background(),
	"my-api",
	"<type>",
	con,
	payload,
); err != nil {
	panic(err)
}

<-peer.Done()
Output:

received my-api::<type> with <payload> payload
received my-api::<type> with <payload> payload

type StaleFetchError

type StaleFetchError struct {
	Ref ident.Ref
}

StaleFetchError indicates a failure to fetch an attribute for a specific revision because it has been modified after that revision.

func (StaleFetchError) Error

func (err StaleFetchError) Error() string

type StaleUpdateError

type StaleUpdateError struct {
	Ref ident.Ref
}

StaleUpdateError indicates a failure to update or destroy a session because the session has been modified after that revision.

func (StaleUpdateError) Error

func (err StaleUpdateError) Error() string

Directories

Path Synopsis
Package constraint provides functions for building session constraints for use with rinq.Session.NotifyMany().
Package constraint provides functions for building session constraints for use with rinq.Session.NotifyMany().
Package ident contains types that represent various Rinq identifiers.
Package ident contains types that represent various Rinq identifiers.
Package options defines options that can be customized when creating a Peer.
Package options defines options that can be customized when creating a Peer.
Package trace provides functions for configuring custom trace identifiers.
Package trace provides functions for configuring custom trace identifiers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL