Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultDialer(network, addr string) (net.Conn, error)
- type Certificates
- type Client
- func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
- func (c *Client) Send(r *Request) (*amqp.Delivery, error)
- func (c *Client) Stop()
- func (c *Client) WithConsumeSettings(s ConsumeSettings) *Client
- func (c *Client) WithDebugLogger(f LogFunc) *Client
- func (c *Client) WithDialConfig(dc amqp.Config) *Client
- func (c *Client) WithErrorLogger(f LogFunc) *Client
- func (c *Client) WithQueueDeclareSettings(s QueueDeclareSettings) *Client
- func (c *Client) WithTLS(cert Certificates) *Client
- func (c *Client) WithTimeout(t time.Duration) *Client
- type ClientMiddlewareFunc
- type ConsumeSettings
- type Dialer
- type ExchangeDeclareSettings
- type HandlerBinding
- func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
- func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding
- func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
- func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
- type HandlerFunc
- type LogFunc
- type OnStartedFunc
- type PublishSettings
- type QosConfig
- type QueueDeclareSettings
- type Request
- func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
- func (r *Request) WithBody(b string) *Request
- func (r *Request) WithContentType(ct string) *Request
- func (r *Request) WithContext(ctx context.Context) *Request
- func (r *Request) WithCorrelationID(id string) *Request
- func (r *Request) WithExchange(e string) *Request
- func (r *Request) WithHeaders(h amqp.Table) *Request
- func (r *Request) WithResponse(wr bool) *Request
- func (r *Request) WithRoutingKey(rk string) *Request
- func (r *Request) WithTimeout(t time.Duration) *Request
- func (r *Request) Write(p []byte) (int, error)
- func (r *Request) WriteHeader(header string, value interface{})
- type ResponseWriter
- type SendFunc
- type Server
- func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
- func (s *Server) Bind(binding HandlerBinding)
- func (s *Server) ListenAndServe()
- func (s *Server) OnStarted(f OnStartedFunc)
- func (s *Server) Stop()
- func (s *Server) WithDebugLogger(f LogFunc) *Server
- func (s *Server) WithDialConfig(c amqp.Config) *Server
- func (s *Server) WithErrorLogger(f LogFunc) *Server
- type ServerMiddlewareFunc
Constants ¶
const ( // CtxQueueName can be used to get the queue name from the context.Context // inside the HandlerFunc. CtxQueueName ctxKey = "queue_name" )
Variables ¶
var ( // ErrUnexpectedConnClosed is returned by ListenAndServe() if the server // shuts down without calling Stop() and if AMQP does not give an error // when said shutdown happens. ErrUnexpectedConnClosed = errors.New("unexpected connection close without specific error") // ErrTimeout is an error returned when a client request does not // receive a response within the client timeout duration. ErrTimeout = errors.New("request timed out") )
Functions ¶
Types ¶
type Certificates ¶
Certificates represents the certificate, the key and the CA to use when using RabbitMQ with TLS or the certificate and key when using as TLS configuration for RPC server. The fields should be the path to files stored on disk and will be passed to ioutil.ReadFile and tls.LoadX509KeyPair.
func (*Certificates) TLSConfig ¶
func (c *Certificates) TLSConfig() *tls.Config
TLSConfig will return a *tls.Config type based on the files set in the Certificates type.
type Client ¶
type Client struct { // Sender is the main send function called after all middlewares has been // chained and called. This field can be overridden to simplify testing. Sender SendFunc // contains filtered or unexported fields }
Client represents an AMQP client used within a RPC framework. This client can be used to communicate with RPC servers.
func NewClient ¶
NewClient will return a pointer to a new Client. There are two ways to manage the connection that will be used by the client (i.e. when using TLS).
The first one is to use the Certificates type and just pass the filenames to the client certificate, key and the server CA. If this is done the function will handle the reading of the files.
It is also possible to create a custom amqp.Config with whatever configuration desired and that will be used as dial configuration when connection to the message bus.
func (*Client) AddMiddleware ¶
func (c *Client) AddMiddleware(m ClientMiddlewareFunc) *Client
AddMiddleware will add a middleware which will be executed on request.
func (*Client) Stop ¶
func (c *Client) Stop()
Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.
func (*Client) WithConsumeSettings ¶
func (c *Client) WithConsumeSettings(s ConsumeSettings) *Client
WithConsumeSettings will set the settings used when consuming in the client globally.
func (*Client) WithDebugLogger ¶
WithDebugLogger sets the logger to use for debug logging.
func (*Client) WithDialConfig ¶
WithDialConfig sets the dial config used for the client.
func (*Client) WithErrorLogger ¶
WithErrorLogger sets the logger to use for error logging.
func (*Client) WithQueueDeclareSettings ¶
func (c *Client) WithQueueDeclareSettings(s QueueDeclareSettings) *Client
WithQueueDeclareSettings will set the settings used when declaring queues for the client globally.
func (*Client) WithTLS ¶
func (c *Client) WithTLS(cert Certificates) *Client
WithTLS sets the TLS config in the dial config for the client.
type ClientMiddlewareFunc ¶
ClientMiddlewareFunc represents a function that can be used as a middleware.
type ConsumeSettings ¶
type ConsumeSettings struct { Consumer string AutoAck bool Exclusive bool NoLocal bool NoWait bool Args amqp.Table }
ConsumeSettings is the settings that will be used when the consumption on a specified queue is started.
type ExchangeDeclareSettings ¶
type ExchangeDeclareSettings struct { Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
ExchangeDeclareSettings is the settings that will be used when a handler is mapped to a fanout exchange and an exchange is declared.
type HandlerBinding ¶
type HandlerBinding struct { QueueName string ExchangeName string ExchangeType string RoutingKey string BindHeaders amqp.Table Handler HandlerFunc }
HandlerBinding holds information about how an exchange and a queue should be declared and bound. If the ExchangeName is not defined (an empty string), the queue will not be bound to the exchange but assumed to use the default match.
func DirectBinding ¶
func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding
DirectBinding returns a HandlerBinding to use for direct exchanges where each routing key will be mapped to one handler.
func FanoutBinding ¶
func FanoutBinding(exchangeName string, handler HandlerFunc) HandlerBinding
FanoutBinding returns a HandlerBinding to use for fanout exchanges. These exchanges does not use the routing key. We do not use the default exchange (amq.fanout) since this would broadcast all messages everywhere.
func HeadersBinding ¶
func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding
HeadersBinding returns a HandlerBinding to use for header exchanges that will match on specific headers. The heades are specified as an amqp.Table. The default exchange amq.match will be used.
func TopicBinding ¶
func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding
TopicBinding returns a HandlerBinding to use for topic exchanges. The default exchange (amq.topic) will be used. The topic is matched on the routing key.
type HandlerFunc ¶
type HandlerFunc func(context.Context, *ResponseWriter, amqp.Delivery)
HandlerFunc is the function that handles all request based on the routing key.
func ServerMiddlewareChain ¶
func ServerMiddlewareChain(next HandlerFunc, m ...ServerMiddlewareFunc) HandlerFunc
ServerMiddlewareChain will attatch all given middlewares to your HandlerFunc. The middlewares will be executed in the same order as your input.
For example:
s := New("url") s.Bind(DirectBinding( "foobar", ServerMiddlewareChain( myHandler, middlewareOne, middlewareTwo, middlewareThree, ), ))
type LogFunc ¶
type LogFunc func(format string, args ...interface{})
LogFunc is used for logging in amqp-rpc. It makes it possible to define your own logging.
Here is an example where the logger from the log package is used:
debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags) errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags) server := NewServer(url) server.WithErrorLogger(errorLogger.Printf) server.WithDebugLogger(debugLogger.Printf)
It can also be used with for example a Logrus logger:
logger := logrus.New() logger.SetLevel(logrus.DebugLevel) logger.Formatter = &logrus.JSONFormatter{} s.WithErrorLogger(logger.Warnf) s.WithDebugLogger(logger.Debugf) client := NewClient(url) client.WithErrorLogger(logger.Errorf) client.WithDebugLogger(logger.Debugf)
type OnStartedFunc ¶
type OnStartedFunc func(*amqp.Connection, *amqp.Connection, *amqp.Channel, *amqp.Channel)
OnStartedFunc is the function that can be passed to Server.OnStarted().
type PublishSettings ¶
PublishSettings is the settings that will be used when a message is about to be published to the message bus.
type QueueDeclareSettings ¶
type QueueDeclareSettings struct { Durable bool DeleteWhenUnused bool Exclusive bool NoWait bool Args amqp.Table }
QueueDeclareSettings is the settings that will be used when the response any kind of queue is declared. Se documentation for amqp.QueueDeclare for more information about these settings.
type Request ¶
type Request struct { // Exchange is the exchange to which the rquest will be published when // passing it to the clients send function. Exchange string // Routing key is the routing key that will be used in the amqp.Publishing // request. RoutingKey string // Reply is a boolean value telling if the request should wait for a reply // or just send the request without waiting. Reply bool // Timeout is the time we should wait after a request is sent before // we assume the request got lost. Timeout time.Duration // Publishing is the publising that are going to be sent. Publishing amqp.Publishing // Context is a context which you can use to pass data from where the // request is created to middlewares. By default this will be a // context.Background() Context context.Context // contains filtered or unexported fields }
Request is a requet to perform with the client
func NewRequest ¶
func NewRequest() *Request
NewRequest will generate a new request to be published. The default request will use the content type "text/plain" and always wait for reply.
func (*Request) AddMiddleware ¶
func (r *Request) AddMiddleware(m ClientMiddlewareFunc) *Request
AddMiddleware will add a middleware which will be executed when the request is sent.
func (*Request) WithBody ¶
WithBody will convert a string to a byte slice and add as the body passed for the request.
func (*Request) WithContentType ¶
WithContentType will update the content type passed in the header of the request. This value will bee set as the ContentType in the amqp.Publishing type but also preserved as a header value.
func (*Request) WithContext ¶
WithContext will set the context on the request.
func (*Request) WithCorrelationID ¶
WithCorrelationID will add/overwrite the correlation ID used for the request and set it on the Publishing.
func (*Request) WithExchange ¶
WithExchange will set the exchange on to which the request will be published.
func (*Request) WithHeaders ¶
WithHeaders will set the full amqp.Table as the headers for the request. Note that this will overwrite anything previously set on the headers.
func (*Request) WithResponse ¶
WithResponse sets the value determining wether the request should wait for a response or not. A request that does not require a response will only catch errors occurring before the reuqest has been published.
func (*Request) WithRoutingKey ¶
WithRoutingKey will set the routing key for the request.
func (*Request) WithTimeout ¶
WithTimeout will set the client timeout used when publishing messages. t will be rounded using the duration's Round function to the nearest multiple of a millisecond. Rounding will be away from zero.
func (*Request) Write ¶
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*Request) WriteHeader ¶
WriteHeader will write a header for the specified key.
type ResponseWriter ¶
type ResponseWriter struct {
// contains filtered or unexported fields
}
ResponseWriter is used by a handler to construct an RPC response. The ResponseWriter may NOT be used after the handler has returned.
Because the ResponseWriter implements io.Writer you can for example use it to write json:
encoder := json.NewEncoder(responseWriter) encoder.Encode(dataObject)
func NewResponseWriter ¶
func NewResponseWriter(p *amqp.Publishing) *ResponseWriter
NewResponseWriter will create a new response writer with given amqp.Publishing.
func (*ResponseWriter) Immediate ¶
func (rw *ResponseWriter) Immediate(i bool)
Immediate sets the immediate flag on the later amqp.Publish.
func (*ResponseWriter) Mandatory ¶
func (rw *ResponseWriter) Mandatory(m bool)
Mandatory sets the mandatory flag on the later amqp.Publish.
func (*ResponseWriter) Publishing ¶
func (rw *ResponseWriter) Publishing() *amqp.Publishing
Publishing returns the internal amqp.Publishing that are used for the response, useful for modification.
func (*ResponseWriter) Write ¶
func (rw *ResponseWriter) Write(p []byte) (int, error)
Write will write the response Body of the amqp.Publishing. It is safe to call Write multiple times.
func (*ResponseWriter) WriteHeader ¶
func (rw *ResponseWriter) WriteHeader(header string, value interface{})
WriteHeader will write a header for the specified key.
type SendFunc ¶
SendFunc represents the function that Send does. It takes a Request as input and returns a delivery and an error.
func ClientMiddlewareChain ¶
func ClientMiddlewareChain(next SendFunc, m ...ClientMiddlewareFunc) SendFunc
ClientMiddlewareChain will attatch all given middlewares to your SendFunc. The middlewares will be executed in the same order as your input.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server represents an AMQP server used within the RPC framework. The server uses bindings to keep a list of handler functions.
func (*Server) AddMiddleware ¶
func (s *Server) AddMiddleware(m ServerMiddlewareFunc) *Server
AddMiddleware will add a ServerMiddleware to the list of middlewares to be triggered before the handle func for each request.
func (*Server) Bind ¶
func (s *Server) Bind(binding HandlerBinding)
Bind will add a HandlerBinding to the list of servers to serve.
func (*Server) ListenAndServe ¶
func (s *Server) ListenAndServe()
ListenAndServe will dial the RabbitMQ message bus, set up all the channels, consume from all RPC server queues and monitor to connection to ensure the server is always connected.
func (*Server) OnStarted ¶
func (s *Server) OnStarted(f OnStartedFunc)
OnStarted can be used to hook into the connections/channels that the server is using. This can be useful if you want more control over amqp directly.
server := NewServer(url) server.OnStarted(func(inConn, outConn *amqp.Connection, inChan, outChan *amqp.Channel) { // Do something with amqp connections/channels. })
func (*Server) Stop ¶
func (s *Server) Stop()
Stop will gracefully disconnect from AMQP after draining first incoming then outgoing messages. This method won't wait for server shutdown to complete, you should instead wait for ListenAndServe to exit.
func (*Server) WithDebugLogger ¶
WithDebugLogger sets the logger to use for debug logging.
func (*Server) WithDialConfig ¶
WithDialConfig sets the dial config used for the server.
func (*Server) WithErrorLogger ¶
WithErrorLogger sets the logger to use for error logging.
type ServerMiddlewareFunc ¶
type ServerMiddlewareFunc func(next HandlerFunc) HandlerFunc
ServerMiddlewareFunc represent a function that can be used as a middleware.
For example:
func myMiddle(next HandlerFunc) HandlerFunc { // Preinitialization of middleware here. return func(ctx context.Context, rw *ResponseWriter d amqp.Delivery) { // Before handler execution here. // Execute the handler. next(ctx, rw, d) // After execution here. } } s := New("url") // Add middleware to specific handler. s.Bind(DirectBinding("foobar", myMiddle(HandlerFunc))) // Add middleware to all handlers on the server. s.AddMiddleware(myMiddle)