Documentation
¶
Index ¶
- Constants
- func SignalContext(parent context.Context) context.Context
- type BrokerConfig
- type Exchange
- func (exc *Exchange) Attr(name string, value interface{}) *Exchange
- func (exc *Exchange) Direct(name string) *Exchange
- func (exc *Exchange) Fanout(name string) *Exchange
- func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
- func (exc *Exchange) Key(routingKey string) *Exchange
- func (exc *Exchange) Topic(name string) *Exchange
- func (exc *Exchange) Transact(fn TransactionHandler) *Server
- func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
- type Logger
- type Message
- func (msg *Message) Bytes(content []byte) *Message
- func (msg *Message) Exchange(name string) *Message
- func (msg *Message) Header(name string, data interface{}) *Message
- func (msg *Message) ID(id string) *Message
- func (msg *Message) JSON(obj interface{}) *Message
- func (msg *Message) Key(name string) *Message
- func (msg *Message) Reply(correlationId, queueName string) *Message
- func (ms *Message) Send()
- func (msg *Message) String(content string) *Message
- func (msg *Message) TTL(tm time.Duration) *Message
- func (ms *Message) TrySend() error
- func (msg *Message) Type(contentType string) *Message
- type ReQueueConfig
- type ReceiverHandler
- type Requeue
- type SenderHandler
- type Server
- type SinkConfig
- func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
- func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
- func (snk *SinkConfig) Direct(name string) *Exchange
- func (snk *SinkConfig) Fanout(name string) *Exchange
- func (snk *SinkConfig) Lazy() *SinkConfig
- func (snk *SinkConfig) ManualAck() *SinkConfig
- func (snk *SinkConfig) Retries(count int) *SinkConfig
- func (snk *SinkConfig) Topic(name string) *Exchange
- func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
- func (snk *SinkConfig) Validate(certFile string) *SinkConfig
- type SinkHandlerFunc
- type StateHandler
- type TransactionHandler
- type TransactionHandlerFunc
- type Writer
- type WriterConfig
- func (wc *WriterConfig) Create() *Writer
- func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
- func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
- func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
- func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
- func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
- func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig
Constants ¶
const DefaultSignatureHeader = "X-Signature"
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
// contains filtered or unexported fields
}
func Broker ¶
func Broker(urls ...string) *BrokerConfig
func (*BrokerConfig) Context ¶
func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig
func (*BrokerConfig) Interval ¶
func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig
func (*BrokerConfig) Logger ¶
func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig
func (*BrokerConfig) Start ¶
func (bc *BrokerConfig) Start() *Server
func (*BrokerConfig) Timeout ¶
func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
func (*Exchange) HandlerFunc ¶
func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
func (*Exchange) Transact ¶
func (exc *Exchange) Transact(fn TransactionHandler) *Server
func (*Exchange) TransactFunc ¶
func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
type ReQueueConfig ¶
type ReQueueConfig struct {
// contains filtered or unexported fields
}
func (*ReQueueConfig) Create ¶
func (rq *ReQueueConfig) Create() Requeue
func (*ReQueueConfig) Queue ¶
func (rq *ReQueueConfig) Queue(name string) *ReQueueConfig
func (*ReQueueConfig) Timeout ¶
func (rq *ReQueueConfig) Timeout(tm time.Duration) *ReQueueConfig
type ReceiverHandler ¶
func NewCertValidator ¶
func NewCertValidator(cert []byte, header string, log Logger) (ReceiverHandler, error)
Creates new handler that validates messages against signature header. Important! application MUST drop duplicated (by message id) messages by it self or it's possible just to resend same messages multiple times.
func NewCertValidatorFromFile ¶
func NewCertValidatorFromFile(certFile string, header string, log Logger) (ReceiverHandler, error)
Creates new handler (see NewCertValidator) with key from public ASN.1 DER certificate. Certificate should contain --CERTIFICATE-- section
type SenderHandler ¶
type SenderHandler interface {
Handle(msg *amqp.Publishing) bool
}
func NewSigner ¶
func NewSigner(privateKey []byte, header string) (SenderHandler, error)
Create new PKS#1 1.5 SHA512 signer handler
func NewSignerFromFile ¶
func NewSignerFromFile(privateKeyFile string, header string) (SenderHandler, error)
Load private key from PKCS#8 file and create new PKS#1 1.5 SHA512 signer handler. File should contains --PRIVATE KEY-- section
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server keeps broker configuration and all declared objects (queues, exchanges and else) for re-declare after restart
func (*Server) Publisher ¶
func (brk *Server) Publisher() *WriterConfig
Publisher creates new AMQP producer
func (*Server) Requeue ¶
func (brk *Server) Requeue(originalQueue string) *ReQueueConfig
Requeue creates new queue for requeue-ing. New name constructed by default as [originalQueue]/requeue. For example: original queue is SAMPLE, than requeue queue is SAMPLE/requeue
func (*Server) Sink ¶
func (brk *Server) Sink(queueName string) *SinkConfig
Sink creates new AMQP consumer with optional queue name. If queue name is empty - autogenerated one will be used without persistence.
func (*Server) WaitToFinish ¶
func (brk *Server) WaitToFinish()
Wait to finish blocks thread un=til all allocated resources become freed
type SinkConfig ¶
type SinkConfig struct {
// contains filtered or unexported fields
}
func (*SinkConfig) Attr ¶
func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
func (*SinkConfig) DeadLetter ¶
func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
func (*SinkConfig) Direct ¶
func (snk *SinkConfig) Direct(name string) *Exchange
func (*SinkConfig) Fanout ¶
func (snk *SinkConfig) Fanout(name string) *Exchange
func (*SinkConfig) Lazy ¶
func (snk *SinkConfig) Lazy() *SinkConfig
func (*SinkConfig) ManualAck ¶
func (snk *SinkConfig) ManualAck() *SinkConfig
func (*SinkConfig) Retries ¶
func (snk *SinkConfig) Retries(count int) *SinkConfig
func (*SinkConfig) Topic ¶
func (snk *SinkConfig) Topic(name string) *Exchange
func (*SinkConfig) Use ¶
func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
func (*SinkConfig) Validate ¶
func (snk *SinkConfig) Validate(certFile string) *SinkConfig
type StateHandler ¶
type TransactionHandler ¶
type TransactionHandlerFunc ¶
type WriterConfig ¶
type WriterConfig struct {
// contains filtered or unexported fields
}
func (*WriterConfig) Create ¶
func (wc *WriterConfig) Create() *Writer
func (*WriterConfig) DefaultDirect ¶
func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
func (*WriterConfig) DefaultFanout ¶
func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
func (*WriterConfig) DefaultKey ¶
func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
func (*WriterConfig) DefaultTopic ¶
func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
func (*WriterConfig) Sign ¶
func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
Sign body and add signature to DefaultSignatureHeader header. Panic if private key couldn't be read
func (*WriterConfig) Use ¶
func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig