Package broker is an interface used for asynchronous messaging

    Package http provides a http based message broker



    This section is empty.


    View Source
    var (
    	DefaultPath    = "/"
    	DefaultAddress = ""


    func Connect

    func Connect() error

    func Disconnect

    func Disconnect() error

    func Init

    func Init(opts ...Option) error

    func Publish

    func Publish(topic string, msg *Message, opts ...PublishOption) error

    func String

    func String() string


    type Broker

    type Broker interface {
    	Init(...Option) error
    	Options() Options
    	Address() string
    	Connect() error
    	Disconnect() error
    	Publish(topic string, m *Message, opts ...PublishOption) error
    	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
    	String() string

      Broker is an interface used for asynchronous messaging.

      var (
      	DefaultBroker Broker = NewBroker()

      func NewBroker

      func NewBroker(opts ...Option) Broker

        NewBroker returns a new http broker

        type Event

        type Event interface {
        	Topic() string
        	Message() *Message
        	Ack() error
        	Error() error

          Event is given to a subscription handler for processing

          type Handler

          type Handler func(Event) error

            Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

            type Message

            type Message struct {
            	Header map[string]string
            	Body   []byte

            type Option

            type Option func(*Options)

            func Addrs

            func Addrs(addrs ...string) Option

              Addrs sets the host addresses to be used by the broker

              func Codec

              func Codec(c codec.Marshaler) Option

                Codec sets the codec used for encoding/decoding used where a broker does not support headers

                func ErrorHandler

                func ErrorHandler(h Handler) Option

                  ErrorHandler will catch all broker errors that cant be handled in normal way, for example Codec errors

                  func Registry

                  func Registry(r registry.Registry) Option

                  func Secure

                  func Secure(b bool) Option

                    Secure communication with the broker

                    func TLSConfig

                    func TLSConfig(t *tls.Config) Option

                      Specify TLS Config

                      type Options

                      type Options struct {
                      	Addrs  []string
                      	Secure bool
                      	Codec  codec.Marshaler
                      	// Handler executed when error happens in broker mesage
                      	// processing
                      	ErrorHandler Handler
                      	TLSConfig *tls.Config
                      	// Registry used for clustering
                      	Registry registry.Registry
                      	// Other options for implementations of the interface
                      	// can be stored in a context
                      	Context context.Context

                      type PublishOption

                      type PublishOption func(*PublishOptions)

                      func PublishContext

                      func PublishContext(ctx context.Context) PublishOption

                        PublishContext set context

                        type PublishOptions

                        type PublishOptions struct {
                        	// Other options for implementations of the interface
                        	// can be stored in a context
                        	Context context.Context

                        type SubscribeOption

                        type SubscribeOption func(*SubscribeOptions)

                        func DisableAutoAck

                        func DisableAutoAck() SubscribeOption

                          DisableAutoAck will disable auto acking of messages after they have been handled.

                          func Queue

                          func Queue(name string) SubscribeOption

                            Queue sets the name of the queue to share messages on

                            func SubscribeContext

                            func SubscribeContext(ctx context.Context) SubscribeOption

                              SubscribeContext set context

                              type SubscribeOptions

                              type SubscribeOptions struct {
                              	// AutoAck defaults to true. When a handler returns
                              	// with a nil error the message is acked.
                              	AutoAck bool
                              	// Subscribers with the same queue name
                              	// will create a shared subscription where each
                              	// receives a subset of messages.
                              	Queue string
                              	// Other options for implementations of the interface
                              	// can be stored in a context
                              	Context context.Context

                              func NewSubscribeOptions

                              func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

                              type Subscriber

                              type Subscriber interface {
                              	Options() SubscribeOptions
                              	Topic() string
                              	Unsubscribe() error

                                Subscriber is a convenience return type for the Subscribe method

                                func Subscribe

                                func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error)