fluent

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2018 License: MIT Imports: 19 Imported by: 0

README

Fluent AMQP

Library that provides fluent and easy wrapper over streadway-amqp API. Adds such features like:

  • Reconnectiong. Will restore all defined infrastructure
  • Non-blocking processing of messages
  • Optional auto-requeue (with delay)
  • Signing and verifiying messages by public/private pair

API documentation

Signing and verification

Hash algorithm (x509) SHA512 with RSA, sign - SHA512.

The signer (producer) should use private key to sign content of message body and message id.

The validator (consumer) should use public certificate to validate content of message and message id against signature and should drops invalid or duplicated messages.

The sign should be put to a header (X-Signature by default: see DefaultSignatureHeader constant in godoc) as binary object (not hex or base64 encoded).

DATA = BYTES(ID) ... BYTES(BODY)
# SIGN via PKCS#1 v1.5
SIGN_HEADER_BODY = SIGN_SHA512(PRIVATE_KEY, DATA)

Documentation

Index

Constants

View Source
const DefaultSignatureHeader = "X-Signature"

Variables

This section is empty.

Functions

func SignalContext

func SignalContext(parent context.Context) context.Context

Types

type BrokerConfig

type BrokerConfig struct {
	// contains filtered or unexported fields
}

func Broker

func Broker(urls ...string) *BrokerConfig

func (*BrokerConfig) Context

func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig

func (*BrokerConfig) Interval

func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig

func (*BrokerConfig) Logger

func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig

func (*BrokerConfig) Start

func (bc *BrokerConfig) Start() *Server

func (*BrokerConfig) Timeout

func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

func (*Exchange) Attr

func (exc *Exchange) Attr(name string, value interface{}) *Exchange

func (*Exchange) Direct

func (exc *Exchange) Direct(name string) *Exchange

func (*Exchange) Fanout

func (exc *Exchange) Fanout(name string) *Exchange

func (*Exchange) HandlerFunc

func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server

func (*Exchange) Key

func (exc *Exchange) Key(routingKey string) *Exchange

func (*Exchange) Topic

func (exc *Exchange) Topic(name string) *Exchange

func (*Exchange) Transact

func (exc *Exchange) Transact(fn TransactionHandler) *Server

func (*Exchange) TransactFunc

func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server

type Logger

type Logger interface {
	Println(items ...interface{})
}

type Message

type Message struct {
	// contains filtered or unexported fields
}

func (*Message) Bytes

func (msg *Message) Bytes(content []byte) *Message

func (*Message) Exchange

func (msg *Message) Exchange(name string) *Message

func (*Message) Header

func (msg *Message) Header(name string, data interface{}) *Message

func (*Message) ID

func (msg *Message) ID(id string) *Message

func (*Message) JSON

func (msg *Message) JSON(obj interface{}) *Message

func (*Message) Key

func (msg *Message) Key(name string) *Message

func (*Message) Reply

func (msg *Message) Reply(correlationId, queueName string) *Message

func (*Message) Send

func (ms *Message) Send()

func (*Message) String

func (msg *Message) String(content string) *Message

func (*Message) TTL

func (msg *Message) TTL(tm time.Duration) *Message

func (*Message) TrySend

func (ms *Message) TrySend() error

func (*Message) Type

func (msg *Message) Type(contentType string) *Message

type ReQueueConfig

type ReQueueConfig struct {
	// contains filtered or unexported fields
}

func (*ReQueueConfig) Create

func (rq *ReQueueConfig) Create() Requeue

func (*ReQueueConfig) Queue

func (rq *ReQueueConfig) Queue(name string) *ReQueueConfig

func (*ReQueueConfig) Timeout

func (rq *ReQueueConfig) Timeout(tm time.Duration) *ReQueueConfig

type ReceiverHandler

type ReceiverHandler interface {
	Handle(msg *amqp.Delivery) bool
}

func NewCertValidator

func NewCertValidator(cert []byte, header string, log Logger) (ReceiverHandler, error)

Creates new handler that validates messages against signature header. Important! application MUST drop duplicated (by message id) messages by it self or it's possible just to resend same messages multiple times.

func NewCertValidatorFromFile

func NewCertValidatorFromFile(certFile string, header string, log Logger) (ReceiverHandler, error)

Creates new handler (see NewCertValidator) with key from public ASN.1 DER certificate. Certificate should contain --CERTIFICATE-- section

type Requeue

type Requeue interface {
	Requeue(original *amqp.Delivery) error
}

type SenderHandler

type SenderHandler interface {
	Handle(msg *amqp.Publishing) bool
}

func NewSigner

func NewSigner(privateKey []byte, header string) (SenderHandler, error)

Create new PKS#1 1.5 SHA512 signer handler

func NewSignerFromFile

func NewSignerFromFile(privateKeyFile string, header string) (SenderHandler, error)

Load private key from PKCS#8 file and create new PKS#1 1.5 SHA512 signer handler. File should contains --PRIVATE KEY-- section

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server keeps broker configuration and all declared objects (queues, exchanges and else) for re-declare after restart

func (*Server) Publisher

func (brk *Server) Publisher() *WriterConfig

Publisher creates new AMQP producer

func (*Server) Requeue

func (brk *Server) Requeue(originalQueue string) *ReQueueConfig

Requeue creates new queue for requeue-ing. New name constructed by default as [originalQueue]/requeue. For example: original queue is SAMPLE, than requeue queue is SAMPLE/requeue

func (*Server) Sink

func (brk *Server) Sink(queueName string) *SinkConfig

Sink creates new AMQP consumer with optional queue name. If queue name is empty - autogenerated one will be used without persistence.

func (*Server) WaitToFinish

func (brk *Server) WaitToFinish()

Wait to finish blocks thread un=til all allocated resources become freed

type SinkConfig

type SinkConfig struct {
	// contains filtered or unexported fields
}

func (*SinkConfig) Attr

func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig

func (*SinkConfig) DeadLetter

func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig

func (*SinkConfig) Direct

func (snk *SinkConfig) Direct(name string) *Exchange

func (*SinkConfig) Fanout

func (snk *SinkConfig) Fanout(name string) *Exchange

func (*SinkConfig) Lazy

func (snk *SinkConfig) Lazy() *SinkConfig

func (*SinkConfig) ManualAck

func (snk *SinkConfig) ManualAck() *SinkConfig

func (*SinkConfig) Retries

func (snk *SinkConfig) Retries(count int) *SinkConfig

func (*SinkConfig) Topic

func (snk *SinkConfig) Topic(name string) *Exchange

func (*SinkConfig) Use

func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig

func (*SinkConfig) Validate

func (snk *SinkConfig) Validate(certFile string) *SinkConfig

type SinkHandlerFunc

type SinkHandlerFunc func(ctx context.Context, msg amqp.Delivery)

type StateHandler

type StateHandler interface {
	ChannelReady(ctx context.Context, ch *amqp.Channel) error
}

type TransactionHandler

type TransactionHandler interface {
	Handle(ctx context.Context, msg amqp.Delivery) error
}

type TransactionHandlerFunc

type TransactionHandlerFunc func(ctx context.Context, msg amqp.Delivery) error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func (*Writer) Prepare

func (writer *Writer) Prepare() *Message

func (*Writer) Reply

func (writer *Writer) Reply(msg *amqp.Delivery) *Message

type WriterConfig

type WriterConfig struct {
	// contains filtered or unexported fields
}

func (*WriterConfig) Create

func (wc *WriterConfig) Create() *Writer

func (*WriterConfig) DefaultDirect

func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig

func (*WriterConfig) DefaultFanout

func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig

func (*WriterConfig) DefaultKey

func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig

func (*WriterConfig) DefaultTopic

func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig

func (*WriterConfig) Sign

func (wc *WriterConfig) Sign(privateFile string) *WriterConfig

Sign body and add signature to DefaultSignatureHeader header. Panic if private key couldn't be read

func (*WriterConfig) Use

func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig

Directories

Path Synopsis
cmd
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL