electron

package
v0.0.0-...-a2e9eb6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2024 License: Apache-2.0 Imports: 14 Imported by: 7

Documentation

Overview

Package electron lets you write concurrent AMQP 1.0 messaging clients and servers.

This package requires the [proton-C library](http://github.com/apache/qpid-proton/go/pkg/proton) to be installed.

Start by creating a Container with NewContainer. An AMQP Container represents a single AMQP "application" and can contain client and server connections.

You can enable AMQP over any connection that implements the standard net.Conn interface. Typically you can connect with net.Dial() or listen for server connections with net.Listen. Enable AMQP by passing the net.Conn to Container.Connection().

AMQP allows bi-direction peer-to-peer message exchange as well as client-to-broker. Messages are sent over "links". Each link is one-way and has a Sender and Receiver end. Connection.Sender() and Connection.Receiver() open links to Send() and Receive() messages. Connection.Incoming() lets you accept incoming links opened by the remote peer. You can open and accept multiple links in both directions on a single Connection.

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.

Some of the documentation examples show client and server side by side in a single program, in separate goroutines. This is only for example purposes, real AMQP applications would run in separate processes on the network.

Example (ClientServer)

Example client sending messages to a server running in a goroutine.

package main

import (
	"fmt"
	"github.com/apache/qpid-proton/go/pkg/amqp"
	"github.com/apache/qpid-proton/go/pkg/electron"
	"log"
	"net"
	"sync"
)

// Example Server that accepts a single Connection, Session and Receiver link
// and prints messages received until the link closes.
func Server(l net.Listener) {
	cont := electron.NewContainer("server")
	c, err := cont.Accept(l)
	if err != nil {
		log.Fatal(err)
	}
	l.Close() // This server only accepts one connection
	// Process incoming endpoints till we get a Receiver link
	var r electron.Receiver
	for r == nil {
		in := <-c.Incoming()
		switch in := in.(type) {
		case *electron.IncomingSession, *electron.IncomingConnection:
			in.Accept() // Accept the incoming connection and session for the receiver
		case *electron.IncomingReceiver:
			in.SetCapacity(10)
			in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages.
			r = in.Accept().(electron.Receiver)
		case nil:
			return // Connection is closed
		default:
			in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
		}
	}
	go func() { // Reject any further incoming endpoints
		for in := range c.Incoming() {
			in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
		}
	}()
	// Receive messages till the Receiver closes
	rm, err := r.Receive()
	for ; err == nil; rm, err = r.Receive() {
		fmt.Printf("server received: %q\n", rm.Message.Body())
		rm.Accept() // Signal to the client that the message was accepted
	}
	fmt.Printf("server receiver closed: %v\n", err)
}

// Example client sending messages to a server running in a goroutine.
func main() {
	l, err := net.Listen("tcp", "127.0.0.1:0") // tcp4 so example will work on ipv6-disabled platforms
	if err != nil {
		log.Fatal(err)
	}

	// SERVER: start the server running in a separate goroutine
	var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
	waitServer.Add(1)
	go func() { // Run the server in the background
		defer waitServer.Done()
		Server(l)
	}()

	// CLIENT: Send messages to the server
	addr := l.Addr()
	c, err := electron.Dial(addr.Network(), addr.String())
	if err != nil {
		log.Fatal(err)
	}
	s, err := c.Sender()
	if err != nil {
		log.Fatal(err)
	}
	for i := 0; i < 3; i++ {
		msg := fmt.Sprintf("hello %v", i)
		// Send and wait for the Outcome from the server.
		// Note: For higher throughput, use SendAsync() to send a stream of messages
		// and process the returning stream of Outcomes concurrently.
		s.SendSync(amqp.NewMessageWith(msg))
	}
	c.Close(nil) // Closing the connection will stop the server

	waitServer.Wait() // Let the server finish

}
Output:

server received: "hello 0"
server received: "hello 1"
server received: "hello 2"
server receiver closed: EOF

Index

Examples

Constants

View Source
const (
	// Messages are sent unsettled
	SndUnsettled = SndSettleMode(proton.SndUnsettled)
	// Messages are sent already settled
	SndSettled = SndSettleMode(proton.SndSettled)
	// Sender can send either unsettled or settled messages.
	SndMixed = SndSettleMode(proton.SndMixed)
)
View Source
const (
	// Receiver settles first.
	RcvFirst = RcvSettleMode(proton.RcvFirst)
	// Receiver waits for sender to settle before settling.
	RcvSecond = RcvSettleMode(proton.RcvSecond)
)

Forever can be used as a timeout parameter to indicate wait forever.

Variables

View Source
var Closed = io.EOF

Closed is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.

View Source
var EOF = io.EOF

EOF is an alias for io.EOF. It is returned as an error when an endpoint was closed cleanly.

View Source
var Timeout = fmt.Errorf("timeout")

Timeout is the error returned if an operation does not complete on time.

Methods named *Timeout in this package take time.Duration timeout parameter.

If timeout > 0 and there is no result available before the timeout, they return a zero or nil value and Timeout as an error.

If timeout == 0 they will return a result if one is immediately available or nil/zero and Timeout as an error if not.

If timeout == Forever the function will return only when there is a result or some non-timeout error occurs.

Functions

func After

func After(timeout time.Duration) <-chan time.Time

After is like time.After but returns a nil channel if timeout == Forever since selecting on a nil channel will never return.

func GlobalSASLConfigDir

func GlobalSASLConfigDir(dir string)

GlobalSASLConfigDir sets the SASL configuration directory for every Connection created in this process. If not called, the default is determined by your SASL installation.

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

Must be called at most once, before any connections are created.

func GlobalSASLConfigName

func GlobalSASLConfigName(name string)

GlobalSASLConfigName sets the SASL configuration name for every Connection created in this process. If not called the default is "proton-server".

The complete configuration file name is

<sasl-config-dir>/<sasl-config-name>.conf

You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.

Must be called at most once, before any connections are created.

func NewConnection

func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)

NewConnection creates a connection with the given options. Options are applied in order.

func SASLExtended

func SASLExtended() bool

Do we support extended SASL negotiation? All implementations of Proton support ANONYMOUS and EXTERNAL on both client and server sides and PLAIN on the client side.

Extended SASL implememtations use an external library (Cyrus SASL) to support other mechanisms beyond these basic ones.

Types

type Connection

type Connection interface {
	Endpoint
	ConnectionSettings

	// Sender opens a new sender on the DefaultSession.
	Sender(...LinkOption) (Sender, error)

	// Receiver opens a new Receiver on the DefaultSession().
	Receiver(...LinkOption) (Receiver, error)

	// DefaultSession() returns a default session for the connection. It is opened
	// on the first call to DefaultSession and returned on subsequent calls.
	DefaultSession() (Session, error)

	// Session opens a new session.
	Session(...SessionOption) (Session, error)

	// Container for the connection.
	Container() Container

	// Disconnect the connection abruptly with an error.
	Disconnect(error)

	// Wait waits for the connection to be disconnected.
	Wait() error

	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
	WaitTimeout(time.Duration) error

	// Incoming returns a channel for incoming endpoints opened by the remote peer.
	// See the Incoming interface for more detail.
	//
	// Note: this channel will first return an *IncomingConnection for the
	// connection itself which allows you to look at security information and
	// decide whether to Accept() or Reject() the connection. Then it will return
	// *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
	// by the remote end.
	//
	// Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
	// blocking electron event loop. Normally you would run a loop in a goroutine
	// to handle incoming types that interest and Accept() those that don't.
	Incoming() <-chan Incoming
}

Connection is an AMQP connection, created by a Container.

func Dial

func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error)

Dial is shorthand for using net.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.

func DialWithDialer

func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error)

DialWithDialer is shorthand for using dialer.Dial() then NewConnection() See net.Dial() for the meaning of the network, address arguments.

type ConnectionOption

type ConnectionOption func(*connection)

ConnectionOption arguments can be passed when creating a connection to configure it.

func AllowIncoming

func AllowIncoming() ConnectionOption

AllowIncoming returns a ConnectionOption to enable incoming endpoints, see Connection.Incoming() This is automatically set for Server() connections.

func ContainerId

func ContainerId(id string) ConnectionOption

ContainerId returns a ConnectionOption that creates a new Container with id and associates it with the connection

func Heartbeat

func Heartbeat(delay time.Duration) ConnectionOption

Heartbeat returns a ConnectionOption that requests the maximum delay between sending frames for the remote peer. If we don't receive any frames within 2*delay we will close the connection.

func Parent

func Parent(cont Container) ConnectionOption

Parent returns a ConnectionOption that associates the Connection with it's Container If not set a connection will create its own default container.

func Password

func Password(password []byte) ConnectionOption

Password returns a ConnectionOption to set the password used to establish a connection. Only applies to outbound client connection.

The connection will erase its copy of the password from memory as soon as it has been used to authenticate. If you are concerned about passwords staying in memory you should never store them as strings, and should overwrite your copy as soon as you are done with it.

func SASLAllowInsecure

func SASLAllowInsecure(b bool) ConnectionOption

SASLAllowInsecure returns a ConnectionOption that allows or disallows clear text SASL authentication mechanisms

By default the SASL layer is configured not to allow mechanisms that disclose the clear text of the password over an unencrypted AMQP connection. This specifically will disallow the use of the PLAIN mechanism without using SSL encryption.

This default is to avoid disclosing password information accidentally over an insecure network.

func SASLAllowedMechs

func SASLAllowedMechs(mechs string) ConnectionOption

SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL mechanisms.

Can be used on the client or the server to restrict the SASL for a connection. mechs is a space-separated list of mechanism names.

The mechanisms allowed by default are determined by your SASL library and system configuration, with two exceptions: GSSAPI and GSS-SPNEGO are disabled by default. To enable them, you must explicitly add them using this option.

Clients must set the allowed mechanisms before the the outgoing connection is attempted. Servers must set them before the listening connection is setup.

func SASLEnable

func SASLEnable() ConnectionOption

SASLEnable returns a ConnectionOption that enables SASL authentication. Only required if you don't set any other SASL options.

func Server

func Server() ConnectionOption

Server returns a ConnectionOption to put the connection in server mode for incoming connections.

A server connection will do protocol negotiation to accept a incoming AMQP connection. Normally you would call this for a connection created by net.Listener.Accept()

func User

func User(user string) ConnectionOption

User returns a ConnectionOption sets the user name for a connection

func VirtualHost

func VirtualHost(virtualHost string) ConnectionOption

VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. Only applies to outbound client connection.

type ConnectionSettings

type ConnectionSettings interface {
	// Authenticated user name associated with the connection.
	User() string

	// The AMQP virtual host name for the connection.
	//
	// Optional, useful when the server has multiple names and provides different
	// service based on the name the client uses to connect.
	//
	// By default it is set to the DNS host name that the client uses to connect,
	// but it can be set to something different at the client side with the
	// VirtualHost() option.
	//
	// Returns error if the connection fails to authenticate.
	VirtualHost() string

	// Heartbeat is the maximum delay between sending frames that the remote peer
	// has requested of us. If the interval expires an empty "heartbeat" frame
	// will be sent automatically to keep the connection open.
	Heartbeat() time.Duration
}

Settings associated with a Connection.

type Container

type Container interface {
	// Id is a unique identifier for the container in your distributed application.
	Id() string

	// Connection creates a connection associated with this container.
	Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)

	// Dial is shorthand for
	//     conn, err := net.Dial(); c, err := Connection(conn, opts...)
	// See net.Dial() for the meaning of the network, address arguments.
	Dial(network string, address string, opts ...ConnectionOption) (Connection, error)

	// Accept is shorthand for:
	//     conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
	Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)

	// String returns Id()
	String() string
}

Container is an AMQP container, it represents a single AMQP "application" which can have multiple client or server connections.

Each Container in a distributed AMQP application must have a unique container-id which is applied to its connections.

Create with NewContainer()

func NewContainer

func NewContainer(id string) Container

NewContainer creates a new container. The id must be unique in your distributed application, all connections created by the container will have this container-id.

If id == "" a random UUID will be generated for the id.

type Endpoint

type Endpoint interface {
	// Close an endpoint and signal an error to the remote end if error != nil.
	Close(error)

	// String is a human readable identifier, useful for debugging and logging.
	String() string

	// Error returns nil if the endpoint is open, otherwise returns an error.
	// Error() == Closed means the endpoint was closed without error.
	Error() error

	// Connection is the connection associated with this endpoint.
	Connection() Connection

	// Done returns a channel that will close when the endpoint closes.
	// After Done() has closed, Error() will return the reason for closing.
	Done() <-chan struct{}

	// Sync() waits for the remote peer to confirm the endpoint is active or
	// reject it with an error. You can call it immediately on new endpoints
	// for more predictable error handling.
	//
	// AMQP is an asynchronous protocol. It is legal to create an endpoint and
	// start using it without waiting for confirmation. This avoids a needless
	// delay in the non-error case and throughput by "assuming the best".
	//
	// However if there *is* an error, these "optimistic" actions will fail. The
	// endpoint and its children will be closed with an error. The error will only
	// be detected when you try to use one of these endpoints or call Sync()
	Sync() error
}

Endpoint is the local end of a communications channel to the remote peer process. The following interface implement Endpoint: Connection, Session, Sender and Receiver.

You can create an endpoint with functions on Container, Connection and Session. You can accept incoming endpoints from the remote peer using Connection.Incoming()

type Incoming

type Incoming interface {
	// Accept and open the endpoint.
	Accept() Endpoint

	// Reject the endpoint with an error
	Reject(error)
	// contains filtered or unexported methods
}

Incoming is the interface for incoming endpoints, see Connection.Incoming()

Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it with optional error

Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender and *IncomingReceiver. Each type provides methods to examine the incoming endpoint request and set configuration options for the local endpoint before calling Accept() or Reject()

type IncomingConnection

type IncomingConnection struct {
	// contains filtered or unexported fields
}

func (*IncomingConnection) Accept

func (in *IncomingConnection) Accept() Endpoint

func (*IncomingConnection) AcceptConnection

func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection

AcceptConnection is like Accept() but takes ConnectionOption arguments like NewConnection(). For example you can set the Heartbeat() for the incoming connection.

func (IncomingConnection) Heartbeat

func (c IncomingConnection) Heartbeat() time.Duration

func (*IncomingConnection) Reject

func (in *IncomingConnection) Reject(err error)

func (*IncomingConnection) String

func (in *IncomingConnection) String() string

func (IncomingConnection) User

func (c IncomingConnection) User() string

func (IncomingConnection) VirtualHost

func (c IncomingConnection) VirtualHost() string

type IncomingReceiver

type IncomingReceiver struct {
	// contains filtered or unexported fields
}

IncomingReceiver is sent on the Connection.Incoming() channel when there is an incoming request to open a receiver link.

func (*IncomingReceiver) Accept

func (in *IncomingReceiver) Accept() Endpoint

Accept accepts an incoming receiver endpoint

func (*IncomingReceiver) Filter

func (l *IncomingReceiver) Filter() map[amqp.Symbol]interface{}

func (*IncomingReceiver) IsReceiver

func (l *IncomingReceiver) IsReceiver() bool

func (*IncomingReceiver) IsSender

func (l *IncomingReceiver) IsSender() bool

func (*IncomingReceiver) LinkName

func (l *IncomingReceiver) LinkName() string

func (*IncomingReceiver) RcvSettle

func (l *IncomingReceiver) RcvSettle() RcvSettleMode

func (*IncomingReceiver) Reject

func (in *IncomingReceiver) Reject(err error)

func (*IncomingReceiver) SetCapacity

func (in *IncomingReceiver) SetCapacity(capacity int)

SetCapacity sets the capacity of the incoming receiver, call before Accept()

func (*IncomingReceiver) SetPrefetch

func (in *IncomingReceiver) SetPrefetch(prefetch bool)

SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()

func (*IncomingReceiver) SndSettle

func (l *IncomingReceiver) SndSettle() SndSettleMode

func (*IncomingReceiver) Source

func (l *IncomingReceiver) Source() string

func (*IncomingReceiver) SourceSettings

func (l *IncomingReceiver) SourceSettings() TerminusSettings

func (*IncomingReceiver) String

func (in *IncomingReceiver) String() string

func (*IncomingReceiver) Target

func (l *IncomingReceiver) Target() string

func (*IncomingReceiver) TargetSettings

func (l *IncomingReceiver) TargetSettings() TerminusSettings

type IncomingSender

type IncomingSender struct {
	// contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a sender link.

func (*IncomingSender) Accept

func (in *IncomingSender) Accept() Endpoint

Accept accepts an incoming sender endpoint

func (*IncomingSender) Filter

func (l *IncomingSender) Filter() map[amqp.Symbol]interface{}

func (*IncomingSender) IsReceiver

func (l *IncomingSender) IsReceiver() bool

func (*IncomingSender) IsSender

func (l *IncomingSender) IsSender() bool

func (*IncomingSender) LinkName

func (l *IncomingSender) LinkName() string

func (*IncomingSender) RcvSettle

func (l *IncomingSender) RcvSettle() RcvSettleMode

func (*IncomingSender) Reject

func (in *IncomingSender) Reject(err error)

func (*IncomingSender) SndSettle

func (l *IncomingSender) SndSettle() SndSettleMode

func (*IncomingSender) Source

func (l *IncomingSender) Source() string

func (*IncomingSender) SourceSettings

func (l *IncomingSender) SourceSettings() TerminusSettings

func (*IncomingSender) String

func (in *IncomingSender) String() string

func (*IncomingSender) Target

func (l *IncomingSender) Target() string

func (*IncomingSender) TargetSettings

func (l *IncomingSender) TargetSettings() TerminusSettings

type IncomingSession

type IncomingSession struct {
	// contains filtered or unexported fields
}

IncomingSender is sent on the Connection.Incoming() channel when there is an incoming request to open a session.

func (*IncomingSession) Accept

func (in *IncomingSession) Accept() Endpoint

Accept an incoming session endpoint.

func (*IncomingSession) Reject

func (in *IncomingSession) Reject(err error)

func (*IncomingSession) SetIncomingCapacity

func (in *IncomingSession) SetIncomingCapacity(bytes uint)

SetIncomingCapacity sets the session buffer capacity of an incoming session in bytes.

func (*IncomingSession) SetOutgoingWindow

func (in *IncomingSession) SetOutgoingWindow(frames uint)

SetOutgoingWindow sets the session outgoing window of an incoming session in frames.

func (*IncomingSession) String

func (in *IncomingSession) String() string

type LinkOption

type LinkOption func(*linkSettings)

LinkOption can be passed when creating a sender or receiver link to set optional configuration.

func AtLeastOnce

func AtLeastOnce() LinkOption

AtLeastOnce returns a LinkOption that requests acknowledgment for every message, acknowledgment indicates the message was definitely received. In the event of a failure, unacknowledged messages can be re-sent but there is a chance that the message will be received twice in this case. Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst

func AtMostOnce

func AtMostOnce() LinkOption

AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages are sent but no acknowledgment is received, messages can be lost if there is a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst

func Capacity

func Capacity(n int) LinkOption

Capacity returns a LinkOption that sets the link capacity

func DurableSubscription

func DurableSubscription(name string) LinkOption

DurableSubscription returns a LinkOption that configures a Receiver as a named durable subscription. The name overrides (and is overridden by) LinkName() so you should normally only use one of these options.

func Filter

func Filter(m map[amqp.Symbol]interface{}) LinkOption

Filter returns a LinkOption that sets a filter.

func LinkName

func LinkName(s string) LinkOption

LinkName returns a LinkOption that sets the link name.

func Prefetch

func Prefetch(p bool) LinkOption

Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.

func RcvSettle

func RcvSettle(m RcvSettleMode) LinkOption

RcvSettle returns a LinkOption that sets the send settle mode

func SndSettle

func SndSettle(m SndSettleMode) LinkOption

SndSettle returns a LinkOption that sets the send settle mode

func Source

func Source(s string) LinkOption

Source returns a LinkOption that sets address that messages are coming from.

func SourceSettings

func SourceSettings(ts TerminusSettings) LinkOption

SourceSettings returns a LinkOption that sets all the SourceSettings. Note: it will override the source address set by a Source() option

func Target

func Target(s string) LinkOption

Target returns a LinkOption that sets address that messages are going to.

func TargetSettings

func TargetSettings(ts TerminusSettings) LinkOption

TargetSettings returns a LinkOption that sets all the TargetSettings. Note: it will override the target address set by a Target() option

type LinkSettings

type LinkSettings interface {
	// Source address that messages are coming from.
	Source() string

	// Target address that messages are going to.
	Target() string

	// Name is a unique name for the link among links between the same
	// containers in the same direction. By default generated automatically.
	LinkName() string

	// IsSender is true if this is the sending end of the link.
	IsSender() bool

	// IsReceiver is true if this is the receiving end of the link.
	IsReceiver() bool

	// SndSettle defines when the sending end of the link settles message delivery.
	SndSettle() SndSettleMode

	// RcvSettle defines when the sending end of the link settles message delivery.
	RcvSettle() RcvSettleMode

	// Session containing the Link
	Session() Session

	// Filter for the link
	Filter() map[amqp.Symbol]interface{}

	// Advanced settings for the source
	SourceSettings() TerminusSettings

	// Advanced settings for the target
	TargetSettings() TerminusSettings
}

Settings associated with a link

type Outcome

type Outcome struct {
	// Status of the message: was it sent, how was it acknowledged.
	Status SentStatus
	// Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
	Error error
	// Value provided by the application in SendAsync()
	Value interface{}
}

Outcome provides information about the outcome of sending a message.

type RcvSettleMode

type RcvSettleMode proton.RcvSettleMode

RcvSettleMode defines when the receiving end of the link settles message delivery.

type ReceivedMessage

type ReceivedMessage struct {
	// Message is the received message.
	Message amqp.Message
	// contains filtered or unexported fields
}

ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.

func (*ReceivedMessage) Accept

func (rm *ReceivedMessage) Accept() error

Accept tells the sender that we take responsibility for processing the message.

func (*ReceivedMessage) Reject

func (rm *ReceivedMessage) Reject() error

Reject tells the sender we consider the message invalid and unusable.

func (*ReceivedMessage) Release

func (rm *ReceivedMessage) Release() error

Release tells the sender we will not process the message but some other receiver might.

type Receiver

type Receiver interface {
	Endpoint
	LinkSettings

	// Receive blocks until a message is available or until the Receiver is closed
	// and has no more buffered messages.
	Receive() (ReceivedMessage, error)

	// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
	//
	// Note that that if Prefetch is false, after a Timeout the credit issued by
	// Receive remains on the link. It will be used by the next call to Receive.
	ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)

	// Prefetch==true means the Receiver will automatically issue credit to the
	// remote sender to keep its buffer as full as possible, i.e. it will
	// "pre-fetch" messages independently of the application calling
	// Receive(). This gives good throughput for applications that handle a
	// continuous stream of messages. Larger capacity may improve throughput, the
	// optimal value depends on the characteristics of your application.
	//
	// Prefetch==false means the Receiver will issue only issue credit when you
	// call Receive(), and will only issue enough credit to satisfy the calls
	// actually made. This gives lower throughput but will not fetch any messages
	// in advance. It is good for synchronous applications that need to evaluate
	// each message before deciding whether to receive another. The
	// request-response pattern is a typical example.  If you make concurrent
	// calls to Receive with pre-fetch disabled, you can improve performance by
	// setting the capacity close to the expected number of concurrent calls.
	//
	Prefetch() bool

	// Capacity is the size (number of messages) of the local message buffer
	// These are messages received but not yet returned to the application by a call to Receive()
	Capacity() int
}

Receiver is a Link that receives messages.

type Sender

type Sender interface {
	Endpoint
	LinkSettings

	// SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
	// Returns an Outcome, which may contain an error if the message could not be sent.
	SendSync(m amqp.Message) Outcome

	// SendWaitable puts a message in the send buffer and returns a channel that
	// you can use to wait for the Outcome of just that message. The channel is
	// buffered so you can receive from it whenever you want without blocking.
	//
	// Note: can block if there is no space to buffer the message.
	SendWaitable(m amqp.Message) <-chan Outcome

	// SendForget buffers a message for sending and returns, with no notification of the outcome.
	//
	// Note: can block if there is no space to buffer the message.
	SendForget(m amqp.Message)

	// SendAsync puts a message in the send buffer and returns immediately.  An
	// Outcome with Value = value will be sent to the ack channel when the remote
	// receiver has acknowledged the message or if there is an error.
	//
	// You can use the same ack channel for many calls to SendAsync(), possibly on
	// many Senders. The channel will receive the outcomes in the order they
	// become available. The channel should be buffered and/or served by dedicated
	// goroutines to avoid blocking the connection.
	//
	// If ack == nil no Outcome is sent.
	//
	// Note: can block if there is no space to buffer the message.
	SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})

	SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)

	SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome

	SendForgetTimeout(m amqp.Message, timeout time.Duration)

	SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
}

Sender is a Link that sends messages.

The result of sending a message is provided by an Outcome value.

A sender can buffer messages up to the credit limit provided by the remote receiver. All the Send* methods will block if the buffer is full until there is space. Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.

type SentStatus

type SentStatus int

SentStatus indicates the status of a sent message.

const (
	// Message was never sent
	Unsent SentStatus = iota
	// Message was sent but never acknowledged. It may or may not have been received.
	Unacknowledged
	// Message was accepted by the receiver (or was sent pre-settled, accept is assumed)
	Accepted
	// Message was rejected as invalid by the receiver
	Rejected
	// Message was not processed by the receiver but may be valid for a different receiver
	Released
	// Receiver responded with an unrecognized status.
	Unknown
)

func (SentStatus) String

func (s SentStatus) String() string

String human readable name for SentStatus.

type Session

type Session interface {
	Endpoint

	// Sender opens a new sender.
	Sender(...LinkOption) (Sender, error)

	// Receiver opens a new Receiver.
	Receiver(...LinkOption) (Receiver, error)
}

Session is an AMQP session, it contains Senders and Receivers.

type SessionOption

type SessionOption func(*session)

SessionOption can be passed when creating a Session

func IncomingCapacity

func IncomingCapacity(bytes uint) SessionOption

IncomingCapacity returns a Session Option that sets the size (in bytes) of the session's incoming data buffer.

func OutgoingWindow

func OutgoingWindow(frames uint) SessionOption

OutgoingWindow returns a Session Option that sets the outgoing window size (in frames).

type SndSettleMode

type SndSettleMode proton.SndSettleMode

SndSettleMode defines when the sending end of the link settles message delivery.

type TerminusSettings

type TerminusSettings struct {
	Durability proton.Durability
	Expiry     proton.ExpiryPolicy
	Timeout    time.Duration
	Dynamic    bool
}

Advanced AMQP settings for the source or target of a link. Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription() and do not need to be set/examined directly.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL