README

Protocol and Reference Implementation

Mercure is a protocol allowing to push data updates to web browsers and other HTTP clients in a convenient, fast, reliable and battery-efficient way. It is especially useful to publish async and real-time updates of resources served through web APIs, to reactive web and mobile apps.

Awesome PkgGoDev CI Coverage Status Go Report Card

Subscriptions Schema

The protocol has been published as an Internet Draft that is maintained in this repository.

A reference, production-grade, implementation of a Mercure hub (the server) is also available in this repository. It's a free software (AGPL) written in Go. It is provided along with a library that can be used in any Go application to implement the Mercure protocol directly (without a hub) and an official Docker image.

In addition, a managed and high-scalability version of the Mercure.rocks hub is available on Mercure.rocks.

Contributing

See CONTRIBUTING.md.

See https://mercure.rocks/docs/hub/license.

Credits

Created by Kévin Dunglas. Graphic design by Laury Sorriaux. Sponsored by Les-Tilleuls.coop.

Expand ▾ Collapse ▴

Documentation

Overview

    Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as a HTTP handler.

    Example
    Output:
    
    

    Index

    Examples

    Constants

    View Source
    const (
    	TopicSelectorStoreDefaultCacheNumCounters = int64(6e7)
    	TopicSelectorStoreCacheMaxCost            = int64(1e8) // 100 MB
    )

      Gather stats to find the best default values.

      View Source
      const BoltDefaultCleanupFrequency = 0.3
      View Source
      const EarliestLastEventID = "earliest"

        EarliestLastEventID is the reserved value representing the earliest available event id.

        Variables

        View Source
        var (
        	// ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid.
        	ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`)
        	// ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents.
        	ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`)
        	// ErrOriginNotAllowed is returned when the Origin is not allowed to post updates.
        	ErrOriginNotAllowed = errors.New("origin not allowed to post updates")
        	// ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported.
        	ErrUnexpectedSigningMethod = errors.New("unexpected signing method")
        	// ErrInvalidJWT is returned when the JWT is invalid.
        	ErrInvalidJWT = errors.New("invalid JWT")
        	// ErrPublicKey is returned when there is an error with the public key.
        	ErrPublicKey = errors.New("public key error")
        )
        View Source
        var ErrClosedTransport = errors.New("hub: read/write on closed Transport")

          ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.

          View Source
          var ErrInvalidConfig = errors.New("invalid config")

            ErrInvalidConfig is returned when the configuration is invalid.

            Deprecated: use the Caddy server module or the standalone library instead.

            Functions

            func AssignUUID

            func AssignUUID(u *Update)

              AssignUUID generates a new UUID an assign it to the given update if no ID is already set.

              func Demo

              func Demo(w http.ResponseWriter, r *http.Request)

                Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.

                func InitConfig

                func InitConfig(v *viper.Viper)

                  InitConfig reads in config file and ENV variables if set.

                  Deprecated: use the Caddy server module or the standalone library instead.

                  func RegisterTransportFactory

                  func RegisterTransportFactory(scheme string, factory TransportFactory)

                  func SetConfigDefaults

                  func SetConfigDefaults(v *viper.Viper)

                    SetConfigDefaults sets defaults on a Viper instance.

                    Deprecated: use the Caddy server module or the standalone library instead.

                    func SetFlags

                    func SetFlags(fs *pflag.FlagSet, v *viper.Viper)

                      SetFlags creates flags and bind them to Viper.

                      Deprecated: use the Caddy server module or the standalone library instead.

                      func Start

                      func Start()

                        Start is an helper method to start the Mercure Hub.

                        Deprecated: use the Caddy server module or the standalone library instead.

                        func ValidateConfig

                        func ValidateConfig(v *viper.Viper) error

                          ValidateConfig validates a Viper instance.

                          Deprecated: use the Caddy server module or the standalone library instead.

                          Types

                          type BoltTransport

                          type BoltTransport struct {
                          	sync.RWMutex
                          	// contains filtered or unexported fields
                          }

                            BoltTransport implements the TransportInterface using the Bolt database.

                            func (*BoltTransport) AddSubscriber

                            func (t *BoltTransport) AddSubscriber(s *Subscriber) error

                              AddSubscriber adds a new subscriber to the transport.

                              func (*BoltTransport) Close

                              func (t *BoltTransport) Close() (err error)

                                Close closes the Transport.

                                func (*BoltTransport) Dispatch

                                func (t *BoltTransport) Dispatch(update *Update) error

                                  Dispatch dispatches an update to all subscribers and persists it in Bolt DB.

                                  func (*BoltTransport) GetSubscribers

                                  func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error)

                                    GetSubscribers get the list of active subscribers.

                                    type ErrTransport

                                    type ErrTransport struct {
                                    	// contains filtered or unexported fields
                                    }

                                      ErrTransport is returned when the Transport's DSN is invalid.

                                      func (*ErrTransport) Error

                                      func (e *ErrTransport) Error() string

                                      func (*ErrTransport) Unwrap

                                      func (e *ErrTransport) Unwrap() error

                                      type Event

                                      type Event struct {
                                      	// The updates' data, encoded in the sever-sent event format: every line starts with the string "data: "
                                      	// https://www.w3.org/TR/eventsource/#dispatchMessage
                                      	Data string
                                      
                                      	// The globally unique identifier corresponding to update
                                      	ID string
                                      
                                      	// The event type, will be attached to the "event" field
                                      	Type string
                                      
                                      	// The reconnection time
                                      	Retry uint64
                                      }

                                        Event is the actual Server Sent Event that will be dispatched.

                                        func (*Event) String

                                        func (e *Event) String() string

                                          String serializes the event in a "text/event-stream" representation.

                                          type Hub

                                          type Hub struct {
                                          	// contains filtered or unexported fields
                                          }

                                            Hub stores channels with clients currently subscribed and allows to dispatch updates.

                                            func NewHub

                                            func NewHub(options ...Option) (*Hub, error)

                                              NewHub creates a new Hub instance.

                                              func NewHubFromViper

                                              func NewHubFromViper(v *viper.Viper) (*Hub, error)

                                                NewHubFromViper creates a new Hub from the Viper config.

                                                Deprecated: use the Caddy server module or the standalone library instead.

                                                func (*Hub) PublishHandler

                                                func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)

                                                  PublishHandler allows publisher to broadcast updates to all subscribers.

                                                  func (*Hub) Serve

                                                  func (h *Hub) Serve()

                                                    Serve starts the HTTP server.

                                                    Deprecated: use the Caddy server module or the standalone library instead.

                                                    func (*Hub) ServeHTTP

                                                    func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)

                                                    func (*Hub) Stop

                                                    func (h *Hub) Stop() error

                                                      Stop stops the hub.

                                                      func (*Hub) SubscribeHandler

                                                      func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)

                                                        SubscribeHandler creates a keep alive connection and sends the events to the subscribers.

                                                        func (*Hub) SubscriptionHandler

                                                        func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)

                                                        func (*Hub) SubscriptionsHandler

                                                        func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)

                                                        type LocalTransport

                                                        type LocalTransport struct {
                                                        	sync.RWMutex
                                                        	// contains filtered or unexported fields
                                                        }

                                                          LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.

                                                          func (*LocalTransport) AddSubscriber

                                                          func (t *LocalTransport) AddSubscriber(s *Subscriber) error

                                                            AddSubscriber adds a new subscriber to the transport.

                                                            func (*LocalTransport) Close

                                                            func (t *LocalTransport) Close() (err error)

                                                              Close closes the Transport.

                                                              func (*LocalTransport) Dispatch

                                                              func (t *LocalTransport) Dispatch(update *Update) error

                                                                Dispatch dispatches an update to all subscribers.

                                                                func (*LocalTransport) GetSubscribers

                                                                func (t *LocalTransport) GetSubscribers() (string, []*Subscriber, error)

                                                                  GetSubscribers get the list of active subscribers.

                                                                  type LogField

                                                                  type LogField = zapcore.Field

                                                                    LogField is an alias of zapcore.Field, it could be replaced by a custom contract when Go will support generics.

                                                                    type Logger

                                                                    type Logger interface {
                                                                    	Debug(msg string, fields ...LogField)
                                                                    	Info(msg string, fields ...LogField)
                                                                    	Warn(msg string, fields ...LogField)
                                                                    	Error(msg string, fields ...LogField)
                                                                    }

                                                                      Logger defines the Mercure logger.

                                                                      type Metrics

                                                                      type Metrics interface {
                                                                      	// SubscriberConnected collects metrics about about subscriber connections.
                                                                      	SubscriberConnected(s *Subscriber)
                                                                      	// SubscriberDisconnected collects metrics about subscriber disconnections.
                                                                      	SubscriberDisconnected(s *Subscriber)
                                                                      	// UpdatePublished collects metrics about update publications.
                                                                      	UpdatePublished(u *Update)
                                                                      }

                                                                      type NopMetrics

                                                                      type NopMetrics struct{}

                                                                      func (NopMetrics) SubscriberConnected

                                                                      func (NopMetrics) SubscriberConnected(s *Subscriber)

                                                                      func (NopMetrics) SubscriberDisconnected

                                                                      func (NopMetrics) SubscriberDisconnected(s *Subscriber)

                                                                      func (NopMetrics) UpdatePublished

                                                                      func (NopMetrics) UpdatePublished(s *Update)

                                                                      type Option

                                                                      type Option func(h *opt) error

                                                                        Option instances allow to configure the library.

                                                                        func WithAllowedHosts

                                                                        func WithAllowedHosts(hosts []string) Option

                                                                          WithAllowedHosts sets the allowed hosts.

                                                                          func WithAnonymous

                                                                          func WithAnonymous() Option

                                                                            WithAnonymous allows subscribers with no valid JWT.

                                                                            func WithCORSOrigins

                                                                            func WithCORSOrigins(origins []string) Option

                                                                              WithCORSOrigins sets the allowed CORS origins.

                                                                              func WithDebug

                                                                              func WithDebug() Option

                                                                                WithDebug enables the debug mode.

                                                                                func WithDemo

                                                                                func WithDemo(uiPath string) Option

                                                                                  WithDemo enables the demo.

                                                                                  func WithDispatchTimeout

                                                                                  func WithDispatchTimeout(timeout time.Duration) Option

                                                                                    WithDispatchTimeout sets maximum dispatch duration of an update.

                                                                                    func WithHeartbeat

                                                                                    func WithHeartbeat(interval time.Duration) Option

                                                                                      WithHeartbeat sets the frequency of the heartbeat, disabled by default.

                                                                                      func WithLogger

                                                                                      func WithLogger(logger Logger) Option

                                                                                        WithLogger sets the logger to use.

                                                                                        func WithMetrics

                                                                                        func WithMetrics(m Metrics) Option

                                                                                          WithMetrics enables collection of Prometheus metrics.

                                                                                          func WithPublishOrigins

                                                                                          func WithPublishOrigins(origins []string) Option

                                                                                            WithPublishOrigins sets the origins allowed to publish updates.

                                                                                            func WithPublisherJWT

                                                                                            func WithPublisherJWT(key []byte, alg string) Option

                                                                                              WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.

                                                                                              func WithSubscriberJWT

                                                                                              func WithSubscriberJWT(key []byte, alg string) Option

                                                                                                WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.

                                                                                                func WithSubscriptions

                                                                                                func WithSubscriptions() Option

                                                                                                  WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.

                                                                                                  func WithTopicSelectorStore

                                                                                                  func WithTopicSelectorStore(tss *TopicSelectorStore) Option

                                                                                                    WithTopicSelectorStore sets the TopicSelectorStore instance to use.

                                                                                                    func WithTransport

                                                                                                    func WithTransport(t Transport) Option

                                                                                                      WithTransport sets the transport to use.

                                                                                                      func WithWriteTimeout

                                                                                                      func WithWriteTimeout(timeout time.Duration) Option

                                                                                                        WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.

                                                                                                        type PrometheusMetrics

                                                                                                        type PrometheusMetrics struct {
                                                                                                        	// contains filtered or unexported fields
                                                                                                        }

                                                                                                          PrometheusMetrics store Hub collected metrics.

                                                                                                          func NewPrometheusMetrics

                                                                                                          func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics

                                                                                                            NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time or it will panic.

                                                                                                            func (*PrometheusMetrics) Register

                                                                                                            func (m *PrometheusMetrics) Register(r *mux.Router)

                                                                                                              Register configures the Prometheus registry with all collected metrics.

                                                                                                              Deprecated: use the Caddy server module or the standalone library instead.

                                                                                                              func (*PrometheusMetrics) SubscriberConnected

                                                                                                              func (m *PrometheusMetrics) SubscriberConnected(s *Subscriber)

                                                                                                              func (*PrometheusMetrics) SubscriberDisconnected

                                                                                                              func (m *PrometheusMetrics) SubscriberDisconnected(s *Subscriber)

                                                                                                              func (*PrometheusMetrics) UpdatePublished

                                                                                                              func (m *PrometheusMetrics) UpdatePublished(u *Update)

                                                                                                              type Subscriber

                                                                                                              type Subscriber struct {
                                                                                                              	ID                 string
                                                                                                              	EscapedID          string
                                                                                                              	Claims             *claims
                                                                                                              	Topics             []string
                                                                                                              	EscapedTopics      []string
                                                                                                              	RequestLastEventID string
                                                                                                              	RemoteAddr         string
                                                                                                              	TopicSelectors     []string
                                                                                                              	Debug              bool
                                                                                                              	// contains filtered or unexported fields
                                                                                                              }

                                                                                                                Subscriber represents a client subscribed to a list of topics.

                                                                                                                func NewSubscriber

                                                                                                                func NewSubscriber(lastEventID string, logger Logger, tss *TopicSelectorStore) *Subscriber

                                                                                                                  NewSubscriber creates a new subscriber.

                                                                                                                  func (*Subscriber) CanDispatch

                                                                                                                  func (s *Subscriber) CanDispatch(u *Update) bool

                                                                                                                    CanDispatch checks if an update can be dispatched to this subsriber.

                                                                                                                    func (*Subscriber) Disconnect

                                                                                                                    func (s *Subscriber) Disconnect()

                                                                                                                      Disconnect disconnects the subscriber.

                                                                                                                      func (*Subscriber) Dispatch

                                                                                                                      func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool

                                                                                                                        Dispatch an update to the subscriber.

                                                                                                                        func (*Subscriber) HistoryDispatched

                                                                                                                        func (s *Subscriber) HistoryDispatched(responseLastEventID string)

                                                                                                                          HistoryDispatched must be called when all messages coming from the history have been dispatched.

                                                                                                                          func (*Subscriber) MarshalLogObject

                                                                                                                          func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error

                                                                                                                          func (*Subscriber) Receive

                                                                                                                          func (s *Subscriber) Receive() <-chan *Update

                                                                                                                            Receive returns a chan when incoming updates are dispatched.

                                                                                                                            type TopicSelectorStore

                                                                                                                            type TopicSelectorStore struct {
                                                                                                                            	// contains filtered or unexported fields
                                                                                                                            }

                                                                                                                              TopicSelectorStore caches compiled templates to improve memory and CPU usage.

                                                                                                                              func NewTopicSelectorStore

                                                                                                                              func NewTopicSelectorStore(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error)

                                                                                                                                NewTopicSelectorStore creates a TopicSelectorStore instance. See https://github.com/dgraph-io/ristretto, set values to 0 to disable.

                                                                                                                                type Transport

                                                                                                                                type Transport interface {
                                                                                                                                	// Dispatch dispatches an update to all subscribers.
                                                                                                                                	Dispatch(update *Update) error
                                                                                                                                
                                                                                                                                	// AddSubscriber adds a new subscriber to the transport.
                                                                                                                                	AddSubscriber(s *Subscriber) error
                                                                                                                                
                                                                                                                                	// Close closes the Transport.
                                                                                                                                	Close() error
                                                                                                                                }

                                                                                                                                  Transport provides methods to dispatch and persist updates.

                                                                                                                                  func NewBoltTransport

                                                                                                                                  func NewBoltTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)

                                                                                                                                    NewBoltTransport create a new boltTransport.

                                                                                                                                    func NewLocalTransport

                                                                                                                                    func NewLocalTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)

                                                                                                                                      NewLocalTransport create a new LocalTransport.

                                                                                                                                      func NewTransport

                                                                                                                                      func NewTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)

                                                                                                                                      type TransportFactory

                                                                                                                                      type TransportFactory = func(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)

                                                                                                                                        TransportFactory is the factory to initialize a new transport.

                                                                                                                                        type TransportSubscribers

                                                                                                                                        type TransportSubscribers interface {
                                                                                                                                        	// GetSubscribers gets the last event ID and the list of active subscribers at this time.
                                                                                                                                        	GetSubscribers() (string, []*Subscriber, error)
                                                                                                                                        }

                                                                                                                                          TransportSubscribers provide a method to retrieve the list of active subscribers.

                                                                                                                                          type Update

                                                                                                                                          type Update struct {
                                                                                                                                          	// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
                                                                                                                                          	// The first one is the canonical IRI, while next ones are alternate IRIs.
                                                                                                                                          	Topics []string
                                                                                                                                          
                                                                                                                                          	// Private updates can only be dispatched to subscribers authorized to receive them.
                                                                                                                                          	Private bool
                                                                                                                                          
                                                                                                                                          	// To print debug informations
                                                                                                                                          	Debug bool
                                                                                                                                          
                                                                                                                                          	// The Server-Sent Event to send.
                                                                                                                                          	Event
                                                                                                                                          }

                                                                                                                                            Update represents an update to send to subscribers.

                                                                                                                                            func (*Update) MarshalLogObject

                                                                                                                                            func (u *Update) MarshalLogObject(enc zapcore.ObjectEncoder) error

                                                                                                                                            Directories

                                                                                                                                            Path Synopsis
                                                                                                                                            caddy module
                                                                                                                                            cmd