mercure

package module
v0.15.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: AGPL-3.0 Imports: 43 Imported by: 3

README

Protocol and Reference Implementation

Mercure is a protocol for pushing data updates to web browsers and other HTTP clients in a convenient, fast, reliable, and battery-efficient way. It is especially useful to publish async and real-time updates of resources served through web APIs, to reactive web and mobile apps.

Awesome Artifact HUB PkgGoDev CI Coverage Status Go Report Card

Subscriptions Schema

The protocol is maintained in this repository and is also available as an Internet-Draft.

A reference, production-grade, implementation of a Mercure hub (the server) is also available in this repository. It's free software (AGPL) written in Go. It is provided along with a library that can be used in any Go application to implement the Mercure protocol directly (without a hub) and an official Docker image.

In addition, a managed and high-scalability version of the Mercure.rocks hub is available on Mercure.rocks.

Contributing

See CONTRIBUTING.md.

See license information.

Credits

Created by Kévin Dunglas. Graphic design by Laury Sorriaux. Sponsored by Les-Tilleuls.coop.

Documentation

Overview

Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as a HTTP handler.

Example
package main

import (
	"log"
	"net/http"

	"github.com/dunglas/mercure"
)

func main() {
	h, err := mercure.NewHub(
		mercure.WithPublisherJWT([]byte("!ChangeMe!"), "HS256"),
		mercure.WithSubscriberJWT([]byte("!ChangeMe!"), "HS256"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer h.Stop()

	http.Handle("/.well-known/mercure", h)
	log.Panic(http.ListenAndServe(":8080", nil))
}
Output:

Index

Examples

Constants

View Source
const (
	DefaultWriteTimeout    = 600 * time.Second
	DefaultDispatchTimeout = 5 * time.Second
	DefaultHeartbeat       = 40 * time.Second
)
View Source
const (
	DefaultTopicSelectorStoreLRUMaxEntriesPerShard = int64(1e4)
	DefaultTopicSelectorStoreLRUShardCount         = int64(256) // 2.5 million entries.
)

Gather stats to find the best default values.

View Source
const (
	TopicSelectorStoreRistrettoDefaultCacheNumCounters = int64(6e7)
	TopicSelectorStoreRistrettoCacheMaxCost            = int64(1e8) // 100 MB
)

Gather stats to find the best default values.

View Source
const BoltDefaultCleanupFrequency = 0.3
View Source
const EarliestLastEventID = "earliest"

EarliestLastEventID is the reserved value representing the earliest available event id.

Variables

View Source
var (
	// ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid.
	ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`)
	// ErrInvalidAuthorizationQuery is returned when the authorization query parameter is invalid.
	ErrInvalidAuthorizationQuery = errors.New(`invalid "authorization" Query parameter`)
	// ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents.
	ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`)
	// ErrOriginNotAllowed is returned when the Origin is not allowed to post updates.
	ErrOriginNotAllowed = errors.New("origin not allowed to post updates")
	// ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported.
	ErrUnexpectedSigningMethod = errors.New("unexpected signing method")
	// ErrInvalidJWT is returned when the JWT is invalid.
	ErrInvalidJWT = errors.New("invalid JWT")
	// ErrPublicKey is returned when there is an error with the public key.
	ErrPublicKey = errors.New("public key error")
)
View Source
var ErrClosedTransport = errors.New("hub: read/write on closed Transport")

ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.

View Source
var ErrInvalidConfig = errors.New("invalid config")

ErrInvalidConfig is returned when the configuration is invalid.

Deprecated: use the Caddy server module or the standalone library instead.

View Source
var ErrUnsupportedProtocolVersion = errors.New("compatibility mode only supports protocol version 7")

ErrUnsupportedProtocolVersion is returned when the version passed is unsupported.

Functions

func AssignUUID

func AssignUUID(u *Update)

AssignUUID generates a new UUID an assign it to the given update if no ID is already set.

func InitConfig deprecated

func InitConfig(v *viper.Viper)

InitConfig reads in config file and ENV variables if set.

Deprecated: use the Caddy server module or the standalone library instead.

func RegisterTransportFactory

func RegisterTransportFactory(scheme string, factory TransportFactory)

func SetConfigDefaults deprecated

func SetConfigDefaults(v *viper.Viper)

SetConfigDefaults sets defaults on a Viper instance.

Deprecated: use the Caddy server module or the standalone library instead.

func SetFlags deprecated

func SetFlags(fs *pflag.FlagSet, v *viper.Viper)

SetFlags creates flags and bind them to Viper.

Deprecated: use the Caddy server module or the standalone library instead.

func Start deprecated

func Start()

Start is an helper method to start the Mercure Hub.

Deprecated: use the Caddy server module or the standalone library instead.

func ValidateConfig deprecated

func ValidateConfig(v *viper.Viper) error

ValidateConfig validates a Viper instance.

Deprecated: use the Caddy server module or the standalone library instead.

Types

type BoltTransport

type BoltTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

BoltTransport implements the TransportInterface using the Bolt database.

func (*BoltTransport) AddSubscriber

func (t *BoltTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*BoltTransport) Close

func (t *BoltTransport) Close() (err error)

Close closes the Transport.

func (*BoltTransport) Dispatch

func (t *BoltTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers and persists it in Bolt DB.

func (*BoltTransport) GetSubscribers

func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error)

GetSubscribers get the list of active subscribers.

func (*BoltTransport) RemoveSubscriber added in v0.14.0

func (t *BoltTransport) RemoveSubscriber(s *Subscriber) error

RemoveSubscriber removes a new subscriber from the transport.

type CheckedEntry added in v0.13.0

type CheckedEntry = zapcore.CheckedEntry

CheckedEntry is an alias of zapcore.CheckedEntry, it could be replaced by a custom contract when Go will support generics.

type Event

type Event struct {
	// The updates' data, encoded in the sever-sent event format: every line starts with the string "data: "
	// https://www.w3.org/TR/eventsource/#dispatchMessage
	Data string

	// The globally unique identifier corresponding to update
	ID string

	// The event type, will be attached to the "event" field
	Type string

	// The reconnection time
	Retry uint64
}

Event is the actual Server Sent Event that will be dispatched.

func (*Event) String

func (e *Event) String() string

String serializes the event in a "text/event-stream" representation.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores channels with clients currently subscribed and allows to dispatch updates.

func NewHub

func NewHub(options ...Option) (*Hub, error)

NewHub creates a new Hub instance.

func NewHubFromViper deprecated

func NewHubFromViper(v *viper.Viper) (*Hub, error)

NewHubFromViper creates a new Hub from the Viper config.

Deprecated: use the Caddy server module or the standalone library instead.

func (*Hub) Demo added in v0.14.0

func (h *Hub) Demo(w http.ResponseWriter, r *http.Request)

Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.

func (*Hub) PublishHandler

func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)

PublishHandler allows publisher to broadcast updates to all subscribers.

func (*Hub) Serve deprecated

func (h *Hub) Serve()

Serve starts the HTTP server.

Deprecated: use the Caddy server module or the standalone library instead.

func (*Hub) ServeHTTP

func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Hub) Stop added in v0.11.1

func (h *Hub) Stop() error

Stop stops the hub.

func (*Hub) SubscribeHandler

func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)

SubscribeHandler creates a keep alive connection and sends the events to the subscribers.

func (*Hub) SubscriptionHandler

func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)

func (*Hub) SubscriptionsHandler

func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)

type Level added in v0.13.0

type Level = zapcore.Level

Level is an alias of zapcore.Level, it could be replaced by a custom contract when Go will support generics.

type LocalTransport

type LocalTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.

func (*LocalTransport) AddSubscriber

func (t *LocalTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*LocalTransport) Close

func (t *LocalTransport) Close() (err error)

Close closes the Transport.

func (*LocalTransport) Dispatch

func (t *LocalTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers.

func (*LocalTransport) GetSubscribers

func (t *LocalTransport) GetSubscribers() (string, []*Subscriber, error)

GetSubscribers gets the list of active subscribers.

func (*LocalTransport) RemoveSubscriber added in v0.14.0

func (t *LocalTransport) RemoveSubscriber(s *Subscriber) error

RemoveSubscriber removes a subscriber from the transport.

type LogField

type LogField = zapcore.Field

LogField is an alias of zapcore.Field, it could be replaced by a custom contract when Go will support generics.

type Logger

type Logger interface {
	Info(msg string, fields ...LogField)
	Error(msg string, fields ...LogField)
	Check(level Level, msg string) *CheckedEntry
	Level() Level
}

Logger defines the Mercure logger.

type Metrics

type Metrics interface {
	// SubscriberConnected collects metrics about subscriber connections.
	SubscriberConnected(s *Subscriber)
	// SubscriberDisconnected collects metrics about subscriber disconnections.
	SubscriberDisconnected(s *Subscriber)
	// UpdatePublished collects metrics about update publications.
	UpdatePublished(u *Update)
}

type NopMetrics

type NopMetrics struct{}

func (NopMetrics) SubscriberConnected

func (NopMetrics) SubscriberConnected(_ *Subscriber)

func (NopMetrics) SubscriberDisconnected

func (NopMetrics) SubscriberDisconnected(_ *Subscriber)

func (NopMetrics) UpdatePublished

func (NopMetrics) UpdatePublished(_ *Update)

type Option

type Option func(h *opt) error

Option instances allow to configure the library.

func WithAllowedHosts

func WithAllowedHosts(hosts []string) Option

WithAllowedHosts sets the allowed hosts.

func WithAnonymous

func WithAnonymous() Option

WithAnonymous allows subscribers with no valid JWT.

func WithCORSOrigins

func WithCORSOrigins(origins []string) Option

WithCORSOrigins sets the allowed CORS origins.

func WithCookieName added in v0.14.0

func WithCookieName(cookieName string) Option

WithCookieName sets the name of the authorization cookie (defaults to "mercureAuthorization").

func WithDebug

func WithDebug() Option

WithDebug enables the debug mode.

func WithDemo

func WithDemo() Option

WithDemo enables the demo.

func WithDispatchTimeout

func WithDispatchTimeout(timeout time.Duration) Option

WithDispatchTimeout sets maximum dispatch duration of an update.

func WithHeartbeat

func WithHeartbeat(interval time.Duration) Option

WithHeartbeat sets the frequency of the heartbeat, disabled by default.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger to use.

func WithMetrics

func WithMetrics(m Metrics) Option

WithMetrics enables collection of Prometheus metrics.

func WithProtocolVersionCompatibility added in v0.14.0

func WithProtocolVersionCompatibility(protocolVersionCompatibility int) Option

WithProtocolVersionCompatibility sets the version of the Mercure protocol to be backward compatible with (only version 7 is supported).

func WithPublishOrigins

func WithPublishOrigins(origins []string) Option

WithPublishOrigins sets the origins allowed to publish updates.

func WithPublisherJWT

func WithPublisherJWT(key []byte, alg string) Option

WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.

func WithSubscriberJWT

func WithSubscriberJWT(key []byte, alg string) Option

WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.

func WithSubscriptions

func WithSubscriptions() Option

WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.

func WithTopicSelectorStore

func WithTopicSelectorStore(tss *TopicSelectorStore) Option

WithTopicSelectorStore sets the TopicSelectorStore instance to use.

func WithTransport

func WithTransport(t Transport) Option

WithTransport sets the transport to use.

func WithUI added in v0.12.0

func WithUI() Option

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics store Hub collected metrics.

func NewPrometheusMetrics

func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics

NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time or it will panic.

func (*PrometheusMetrics) Register deprecated

func (m *PrometheusMetrics) Register(r *mux.Router)

Register configures the Prometheus registry with all collected metrics.

Deprecated: use the Caddy server module or the standalone library instead.

func (*PrometheusMetrics) SubscriberConnected

func (m *PrometheusMetrics) SubscriberConnected(_ *Subscriber)

func (*PrometheusMetrics) SubscriberDisconnected

func (m *PrometheusMetrics) SubscriberDisconnected(_ *Subscriber)

func (*PrometheusMetrics) UpdatePublished

func (m *PrometheusMetrics) UpdatePublished(_ *Update)

type Subscriber

type Subscriber struct {
	ID                     string
	EscapedID              string
	Claims                 *claims
	EscapedTopics          []string
	RequestLastEventID     string
	RemoteAddr             string
	SubscribedTopics       []string
	SubscribedTopicRegexps []*regexp.Regexp
	AllowedPrivateTopics   []string
	AllowedPrivateRegexps  []*regexp.Regexp
	Debug                  bool
	// contains filtered or unexported fields
}

Subscriber represents a client subscribed to a list of topics.

func NewSubscriber

func NewSubscriber(lastEventID string, logger Logger) *Subscriber

NewSubscriber creates a new subscriber.

func (*Subscriber) Disconnect

func (s *Subscriber) Disconnect()

Disconnect disconnects the subscriber.

func (*Subscriber) Dispatch

func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool

Dispatch an update to the subscriber. Security checks must (topics matching) be done before calling Dispatch, for instance by calling Match.

func (*Subscriber) HistoryDispatched

func (s *Subscriber) HistoryDispatched(responseLastEventID string)

HistoryDispatched must be called when all messages coming from the history have been dispatched.

func (*Subscriber) MarshalLogObject

func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error

func (*Subscriber) Match added in v0.14.0

func (s *Subscriber) Match(u *Update) bool

Match checks if the current subscriber can receive the given update.

func (*Subscriber) MatchTopics added in v0.14.0

func (s *Subscriber) MatchTopics(topics []string, private bool) bool

MatchTopic checks if the current subscriber can access to the given topic.

func (*Subscriber) Ready added in v0.14.0

func (s *Subscriber) Ready() (n int)

Ready flips the ready flag to true and flushes queued live updates returning number of events flushed.

func (*Subscriber) Receive

func (s *Subscriber) Receive() <-chan *Update

Receive returns a chan when incoming updates are dispatched.

func (*Subscriber) SetTopics added in v0.14.0

func (s *Subscriber) SetTopics(subscribedTopics, allowedPrivateTopics []string)

SetTopics compiles topic selector regexps.

type SubscriberList added in v0.14.0

type SubscriberList struct {
	// contains filtered or unexported fields
}

func NewSubscriberList added in v0.14.0

func NewSubscriberList(size int) *SubscriberList

func (*SubscriberList) Add added in v0.14.0

func (sl *SubscriberList) Add(s *Subscriber)

func (*SubscriberList) Len added in v0.14.0

func (sl *SubscriberList) Len() int

func (*SubscriberList) MatchAny added in v0.14.0

func (sl *SubscriberList) MatchAny(u *Update) (res []*Subscriber)

func (*SubscriberList) Remove added in v0.14.0

func (sl *SubscriberList) Remove(s *Subscriber)

func (*SubscriberList) Walk added in v0.14.0

func (sl *SubscriberList) Walk(start uint64, callback func(s *Subscriber) bool) uint64

type TopicSelectorStore

type TopicSelectorStore struct {
	// contains filtered or unexported fields
}

TopicSelectorStore caches compiled templates to improve memory and CPU usage.

func NewTopicSelectorStoreLRU added in v0.14.0

func NewTopicSelectorStoreLRU(maxEntriesPerShard, shardCount int64) (*TopicSelectorStore, error)

NewTopicSelectorStoreLRU creates a TopicSelectorStore with an LRU cache.

func NewTopicSelectorStoreRistretto deprecated added in v0.14.0

func NewTopicSelectorStoreRistretto(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error)

NewTopicSelectorStoreRistretto creates a TopicSelectorStore instance with a ristretto cache. See https://github.com/dgraph-io/ristretto, set values to 0 to disable.

Deprecated: use NewTopicSelectorStoreLRU instead.

type TopicSelectorStoreCache added in v0.14.0

type TopicSelectorStoreCache interface {
	Get(key interface{}) (interface{}, bool)
	Set(key interface{}, value interface{}, n int64) bool
}

type Transport

type Transport interface {
	// Dispatch dispatches an update to all subscribers.
	Dispatch(update *Update) error

	// AddSubscriber adds a new subscriber to the transport.
	AddSubscriber(s *Subscriber) error

	// RemoveSubscriber removes a subscriber from the transport.
	RemoveSubscriber(s *Subscriber) error

	// Close closes the Transport.
	Close() error
}

Transport provides methods to dispatch and persist updates.

func NewBoltTransport

func NewBoltTransport(u *url.URL, l Logger) (Transport, error)

NewBoltTransport create a new boltTransport.

func NewLocalTransport

func NewLocalTransport(_ *url.URL, _ Logger) (Transport, error)

NewLocalTransport create a new LocalTransport.

func NewTransport

func NewTransport(u *url.URL, l Logger) (Transport, error)

type TransportError added in v0.12.2

type TransportError struct {
	// contains filtered or unexported fields
}

TransportError is returned when the Transport's DSN is invalid.

func (*TransportError) Error added in v0.12.2

func (e *TransportError) Error() string

func (*TransportError) Unwrap added in v0.12.2

func (e *TransportError) Unwrap() error

type TransportFactory

type TransportFactory = func(u *url.URL, l Logger) (Transport, error)

TransportFactory is the factory to initialize a new transport.

type TransportSubscribers

type TransportSubscribers interface {
	// GetSubscribers gets the last event ID and the list of active subscribers at this time.
	GetSubscribers() (string, []*Subscriber, error)
}

TransportSubscribers provide a method to retrieve the list of active subscribers.

type Update

type Update struct {
	// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
	// The first one is the canonical IRI, while next ones are alternate IRIs.
	Topics []string

	// Private updates can only be dispatched to subscribers authorized to receive them.
	Private bool

	// To print debug informations
	Debug bool

	// The Server-Sent Event to send.
	Event
}

Update represents an update to send to subscribers.

func (*Update) MarshalLogObject

func (u *Update) MarshalLogObject(enc zapcore.ObjectEncoder) error

Directories

Path Synopsis
caddy module
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL