Documentation
¶
Index ¶
- Constants
- func BoolHeader(msg *amqp.Delivery, param string, def bool) bool
- func FloatHeader(msg *amqp.Delivery, param string, def float64) float64
- func IntHeader(msg *amqp.Delivery, param string, def int64) int64
- func SignalContext(parent context.Context) context.Context
- func StringHeader(msg *amqp.Delivery, param string, def string) string
- type BrokerConfig
- func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig
- func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig
- func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig
- func (bc *BrokerConfig) Start() *Server
- func (bc *BrokerConfig) StdLogger(prefix string) *BrokerConfig
- func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig
- type Exchange
- func (exc *Exchange) Attr(name string, value interface{}) *Exchange
- func (exc *Exchange) Direct(name string) *Exchange
- func (exc *Exchange) Fanout(name string) *Exchange
- func (exc *Exchange) Handler(obj SimpleHandler) *Server
- func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
- func (exc *Exchange) Key(routingKeys ...string) *Exchange
- func (exc *Exchange) Topic(name string) *Exchange
- func (exc *Exchange) Transact(fn TransactionHandler) *Server
- func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
- type Logger
- type Message
- func (msg *Message) Bytes(content []byte) *Message
- func (msg *Message) Exchange(name string) *Message
- func (msg *Message) Header(name string, data interface{}) *Message
- func (msg *Message) ID(id string) *Message
- func (msg *Message) JSON(obj interface{}) *Message
- func (msg *Message) Key(name string) *Message
- func (ms *Message) Publish(ctx context.Context) (<-chan struct{}, error)
- func (ms *Message) PublishWait(ctx context.Context) error
- func (msg *Message) Raw() *amqp.Publishing
- func (msg *Message) Reply(correlationId, queueName string) *Message
- func (msg *Message) ReplyTo(correlationId, queueName string) *Message
- func (ms *Message) Send() <-chan struct{}
- func (ms *Message) SendContext(ctx context.Context) error
- func (msg *Message) String(content string) *Message
- func (msg *Message) TTL(tm time.Duration) *Message
- func (msg *Message) Time(stamp time.Time) *Message
- func (ms *Message) TrySend() error
- func (msg *Message) Type(contentType string) *Message
- type ReQueueConfig
- type ReceiverHandler
- type Requeue
- type SenderHandler
- type Server
- type SimpleHandler
- type SinkConfig
- func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
- func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
- func (snk *SinkConfig) Direct(name string) *Exchange
- func (snk *SinkConfig) Fanout(name string) *Exchange
- func (snk *SinkConfig) Handler(obj SimpleHandler) *Server
- func (snk *SinkConfig) HandlerFunc(fn SinkHandlerFunc) *Server
- func (snk *SinkConfig) Lazy() *SinkConfig
- func (snk *SinkConfig) ManualAck() *SinkConfig
- func (snk *SinkConfig) Requeue(interval time.Duration) *SinkConfig
- func (snk *SinkConfig) Retries(count int) *SinkConfig
- func (snk *SinkConfig) Topic(name string) *Exchange
- func (snk *SinkConfig) Transact(fn TransactionHandler) *Server
- func (snk *SinkConfig) TransactFunc(fn TransactionHandlerFunc) *Server
- func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
- func (snk *SinkConfig) Validate(certFile string) *SinkConfig
- type SinkHandlerFunc
- type StateHandler
- type TransactionHandler
- type TransactionHandlerFunc
- type Writer
- type WriterConfig
- func (wc *WriterConfig) Create() *Writer
- func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
- func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
- func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
- func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
- func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
- func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig
Constants ¶
const DefaultSignatureHeader = "X-Signature"
Variables ¶
This section is empty.
Functions ¶
func FloatHeader ¶ added in v0.0.11
Types ¶
type BrokerConfig ¶
type BrokerConfig struct {
// contains filtered or unexported fields
}
func Broker ¶
func Broker(urls ...string) *BrokerConfig
func (*BrokerConfig) Context ¶
func (bc *BrokerConfig) Context(ctx context.Context) *BrokerConfig
func (*BrokerConfig) Interval ¶
func (bc *BrokerConfig) Interval(tm time.Duration) *BrokerConfig
func (*BrokerConfig) Logger ¶
func (bc *BrokerConfig) Logger(logger Logger) *BrokerConfig
func (*BrokerConfig) Start ¶
func (bc *BrokerConfig) Start() *Server
func (*BrokerConfig) StdLogger ¶ added in v0.0.2
func (bc *BrokerConfig) StdLogger(prefix string) *BrokerConfig
func (*BrokerConfig) Timeout ¶
func (bc *BrokerConfig) Timeout(tm time.Duration) *BrokerConfig
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
func (*Exchange) Handler ¶ added in v0.0.10
func (exc *Exchange) Handler(obj SimpleHandler) *Server
func (*Exchange) HandlerFunc ¶
func (exc *Exchange) HandlerFunc(fn SinkHandlerFunc) *Server
func (*Exchange) Transact ¶
func (exc *Exchange) Transact(fn TransactionHandler) *Server
func (*Exchange) TransactFunc ¶
func (exc *Exchange) TransactFunc(fn TransactionHandlerFunc) *Server
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func (*Message) PublishWait ¶ added in v0.0.10
func (*Message) Raw ¶ added in v0.0.11
func (msg *Message) Raw() *amqp.Publishing
func (*Message) SendContext ¶ added in v0.0.9
type ReQueueConfig ¶
type ReQueueConfig struct {
// contains filtered or unexported fields
}
func (*ReQueueConfig) Create ¶
func (rq *ReQueueConfig) Create() Requeue
func (*ReQueueConfig) Queue ¶
func (rq *ReQueueConfig) Queue(name string) *ReQueueConfig
func (*ReQueueConfig) Timeout ¶
func (rq *ReQueueConfig) Timeout(tm time.Duration) *ReQueueConfig
type ReceiverHandler ¶
func NewCertValidator ¶
func NewCertValidator(cert []byte, header string, log Logger) (ReceiverHandler, error)
Creates new handler that validates messages against signature header. Important! application MUST drop duplicated (by message id) messages by it self or it's possible just to resend same messages multiple times.
func NewCertValidatorFromFile ¶
func NewCertValidatorFromFile(certFile string, header string, log Logger) (ReceiverHandler, error)
Creates new handler (see NewCertValidator) with key from public ASN.1 DER certificate. Certificate should contain --CERTIFICATE-- section
type SenderHandler ¶
type SenderHandler interface {
Handle(msg *amqp.Publishing) bool
}
func NewSigner ¶
func NewSigner(privateKey []byte, header string) (SenderHandler, error)
Create new PKS#1 1.5 SHA512 signer handler
func NewSignerFromFile ¶
func NewSignerFromFile(privateKeyFile string, header string) (SenderHandler, error)
Load private key from PKCS#8 file and create new PKS#1 1.5 SHA512 signer handler. File should contains --PRIVATE KEY-- section
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server keeps broker configuration and all declared objects (queues, exchanges and else) for re-declare after restart
func (*Server) Publisher ¶
func (brk *Server) Publisher() *WriterConfig
Publisher creates new AMQP producer
func (*Server) Requeue ¶
func (brk *Server) Requeue(originalQueue string) *ReQueueConfig
Requeue creates new queue for requeue-ing. New name constructed by default as [originalQueue]/requeue. For example: original queue is SAMPLE, than requeue queue is SAMPLE/requeue
func (*Server) Sink ¶
func (brk *Server) Sink(queueName string) *SinkConfig
Sink creates new AMQP consumer with optional queue name. If queue name is empty - autogenerated one will be used without persistence.
func (*Server) WaitToFinish ¶
func (brk *Server) WaitToFinish()
Wait to finish blocks thread un=til all allocated resources become freed
type SimpleHandler ¶ added in v0.0.10
type SinkConfig ¶
type SinkConfig struct {
// contains filtered or unexported fields
}
func (*SinkConfig) Attr ¶
func (snk *SinkConfig) Attr(name string, value interface{}) *SinkConfig
func (*SinkConfig) DeadLetter ¶
func (snk *SinkConfig) DeadLetter(exchange, routingKey string) *SinkConfig
func (*SinkConfig) Direct ¶
func (snk *SinkConfig) Direct(name string) *Exchange
func (*SinkConfig) Fanout ¶
func (snk *SinkConfig) Fanout(name string) *Exchange
func (*SinkConfig) Handler ¶ added in v0.0.10
func (snk *SinkConfig) Handler(obj SimpleHandler) *Server
func (*SinkConfig) HandlerFunc ¶ added in v0.0.2
func (snk *SinkConfig) HandlerFunc(fn SinkHandlerFunc) *Server
func (*SinkConfig) Lazy ¶
func (snk *SinkConfig) Lazy() *SinkConfig
func (*SinkConfig) ManualAck ¶
func (snk *SinkConfig) ManualAck() *SinkConfig
func (*SinkConfig) Requeue ¶ added in v0.0.5
func (snk *SinkConfig) Requeue(interval time.Duration) *SinkConfig
func (*SinkConfig) Retries ¶
func (snk *SinkConfig) Retries(count int) *SinkConfig
func (*SinkConfig) Topic ¶
func (snk *SinkConfig) Topic(name string) *Exchange
func (*SinkConfig) Transact ¶ added in v0.0.2
func (snk *SinkConfig) Transact(fn TransactionHandler) *Server
func (*SinkConfig) TransactFunc ¶ added in v0.0.2
func (snk *SinkConfig) TransactFunc(fn TransactionHandlerFunc) *Server
func (*SinkConfig) Use ¶
func (snk *SinkConfig) Use(handler ReceiverHandler) *SinkConfig
func (*SinkConfig) Validate ¶
func (snk *SinkConfig) Validate(certFile string) *SinkConfig
type StateHandler ¶
type TransactionHandler ¶
type TransactionHandlerFunc ¶
type WriterConfig ¶
type WriterConfig struct {
// contains filtered or unexported fields
}
func (*WriterConfig) Create ¶
func (wc *WriterConfig) Create() *Writer
func (*WriterConfig) DefaultDirect ¶
func (wc *WriterConfig) DefaultDirect(name string) *WriterConfig
func (*WriterConfig) DefaultFanout ¶
func (wc *WriterConfig) DefaultFanout(name string) *WriterConfig
func (*WriterConfig) DefaultKey ¶
func (wc *WriterConfig) DefaultKey(routingKey string) *WriterConfig
func (*WriterConfig) DefaultTopic ¶
func (wc *WriterConfig) DefaultTopic(name string) *WriterConfig
func (*WriterConfig) Sign ¶
func (wc *WriterConfig) Sign(privateFile string) *WriterConfig
Sign body and add signature to DefaultSignatureHeader header. Panic if private key couldn't be read
func (*WriterConfig) Use ¶
func (wc *WriterConfig) Use(handler SenderHandler) *WriterConfig
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
amqp-recv
command
|
|
|
amqp-send
command
|
|
|
example
|
|
|
cmd/fluent-sample
command
|
|