mercure

package module
v0.12.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2021 License: AGPL-3.0 Imports: 36 Imported by: 1

README

Protocol and Reference Implementation

Mercure is a protocol allowing to push data updates to web browsers and other HTTP clients in a convenient, fast, reliable and battery-efficient way. It is especially useful to publish async and real-time updates of resources served through web APIs, to reactive web and mobile apps.

Awesome Artifact HUB PkgGoDev CI Coverage Status Go Report Card

Subscriptions Schema

The protocol has been published as an Internet Draft that is maintained in this repository.

A reference, production-grade, implementation of a Mercure hub (the server) is also available in this repository. It's a free software (AGPL) written in Go. It is provided along with a library that can be used in any Go application to implement the Mercure protocol directly (without a hub) and an official Docker image.

In addition, a managed and high-scalability version of the Mercure.rocks hub is available on Mercure.rocks.

Contributing

See CONTRIBUTING.md.

See https://mercure.rocks/docs/hub/license.

Credits

Created by Kévin Dunglas. Graphic design by Laury Sorriaux. Sponsored by Les-Tilleuls.coop.

Documentation

Overview

Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as a HTTP handler.

Example
package main

import (
	"log"
	"net/http"

	"github.com/dunglas/mercure"
)

func main() {
	h, err := mercure.NewHub(
		mercure.WithPublisherJWT([]byte("!ChangeMe!"), "HS256"),
		mercure.WithSubscriberJWT([]byte("!ChangeMe!"), "HS256"),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer h.Stop()

	http.Handle("/.well-known/mercure", h)
	log.Panic(http.ListenAndServe(":8080", nil))
}
Output:

Index

Examples

Constants

View Source
const (
	TopicSelectorStoreDefaultCacheNumCounters = int64(6e7)
	TopicSelectorStoreCacheMaxCost            = int64(1e8) // 100 MB
)

Gather stats to find the best default values.

View Source
const BoltDefaultCleanupFrequency = 0.3
View Source
const EarliestLastEventID = "earliest"

EarliestLastEventID is the reserved value representing the earliest available event id.

Variables

View Source
var (
	// ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid.
	ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`)
	// ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents.
	ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`)
	// ErrOriginNotAllowed is returned when the Origin is not allowed to post updates.
	ErrOriginNotAllowed = errors.New("origin not allowed to post updates")
	// ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported.
	ErrUnexpectedSigningMethod = errors.New("unexpected signing method")
	// ErrInvalidJWT is returned when the JWT is invalid.
	ErrInvalidJWT = errors.New("invalid JWT")
	// ErrPublicKey is returned when there is an error with the public key.
	ErrPublicKey = errors.New("public key error")
)
View Source
var ErrClosedTransport = errors.New("hub: read/write on closed Transport")

ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.

View Source
var ErrInvalidConfig = errors.New("invalid config")

ErrInvalidConfig is returned when the configuration is invalid.

Deprecated: use the Caddy server module or the standalone library instead.

Functions

func AssignUUID

func AssignUUID(u *Update)

AssignUUID generates a new UUID an assign it to the given update if no ID is already set.

func Demo

func Demo(w http.ResponseWriter, r *http.Request)

Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.

func InitConfig deprecated

func InitConfig(v *viper.Viper)

InitConfig reads in config file and ENV variables if set.

Deprecated: use the Caddy server module or the standalone library instead.

func RegisterTransportFactory

func RegisterTransportFactory(scheme string, factory TransportFactory)

func SetConfigDefaults deprecated

func SetConfigDefaults(v *viper.Viper)

SetConfigDefaults sets defaults on a Viper instance.

Deprecated: use the Caddy server module or the standalone library instead.

func SetFlags deprecated

func SetFlags(fs *pflag.FlagSet, v *viper.Viper)

SetFlags creates flags and bind them to Viper.

Deprecated: use the Caddy server module or the standalone library instead.

func Start deprecated

func Start()

Start is an helper method to start the Mercure Hub.

Deprecated: use the Caddy server module or the standalone library instead.

func ValidateConfig deprecated

func ValidateConfig(v *viper.Viper) error

ValidateConfig validates a Viper instance.

Deprecated: use the Caddy server module or the standalone library instead.

Types

type BoltTransport

type BoltTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

BoltTransport implements the TransportInterface using the Bolt database.

func (*BoltTransport) AddSubscriber

func (t *BoltTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*BoltTransport) Close

func (t *BoltTransport) Close() (err error)

Close closes the Transport.

func (*BoltTransport) Dispatch

func (t *BoltTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers and persists it in Bolt DB.

func (*BoltTransport) GetSubscribers

func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error)

GetSubscribers get the list of active subscribers.

type Event

type Event struct {
	// The updates' data, encoded in the sever-sent event format: every line starts with the string "data: "
	// https://www.w3.org/TR/eventsource/#dispatchMessage
	Data string

	// The globally unique identifier corresponding to update
	ID string

	// The event type, will be attached to the "event" field
	Type string

	// The reconnection time
	Retry uint64
}

Event is the actual Server Sent Event that will be dispatched.

func (*Event) String

func (e *Event) String() string

String serializes the event in a "text/event-stream" representation.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub stores channels with clients currently subscribed and allows to dispatch updates.

func NewHub

func NewHub(options ...Option) (*Hub, error)

NewHub creates a new Hub instance.

func NewHubFromViper deprecated

func NewHubFromViper(v *viper.Viper) (*Hub, error)

NewHubFromViper creates a new Hub from the Viper config.

Deprecated: use the Caddy server module or the standalone library instead.

func (*Hub) PublishHandler

func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)

PublishHandler allows publisher to broadcast updates to all subscribers.

func (*Hub) Serve deprecated

func (h *Hub) Serve()

Serve starts the HTTP server.

Deprecated: use the Caddy server module or the standalone library instead.

func (*Hub) ServeHTTP

func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Hub) Stop

func (h *Hub) Stop() error

Stop stops the hub.

func (*Hub) SubscribeHandler

func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)

SubscribeHandler creates a keep alive connection and sends the events to the subscribers.

func (*Hub) SubscriptionHandler

func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)

func (*Hub) SubscriptionsHandler

func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)

type LocalTransport

type LocalTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.

func (*LocalTransport) AddSubscriber

func (t *LocalTransport) AddSubscriber(s *Subscriber) error

AddSubscriber adds a new subscriber to the transport.

func (*LocalTransport) Close

func (t *LocalTransport) Close() (err error)

Close closes the Transport.

func (*LocalTransport) Dispatch

func (t *LocalTransport) Dispatch(update *Update) error

Dispatch dispatches an update to all subscribers.

func (*LocalTransport) GetSubscribers

func (t *LocalTransport) GetSubscribers() (string, []*Subscriber, error)

GetSubscribers get the list of active subscribers.

type LogField

type LogField = zapcore.Field

LogField is an alias of zapcore.Field, it could be replaced by a custom contract when Go will support generics.

type Logger

type Logger interface {
	Debug(msg string, fields ...LogField)
	Info(msg string, fields ...LogField)
	Warn(msg string, fields ...LogField)
	Error(msg string, fields ...LogField)
}

Logger defines the Mercure logger.

type Metrics

type Metrics interface {
	// SubscriberConnected collects metrics about about subscriber connections.
	SubscriberConnected(s *Subscriber)
	// SubscriberDisconnected collects metrics about subscriber disconnections.
	SubscriberDisconnected(s *Subscriber)
	// UpdatePublished collects metrics about update publications.
	UpdatePublished(u *Update)
}

type NopMetrics

type NopMetrics struct{}

func (NopMetrics) SubscriberConnected

func (NopMetrics) SubscriberConnected(s *Subscriber)

func (NopMetrics) SubscriberDisconnected

func (NopMetrics) SubscriberDisconnected(s *Subscriber)

func (NopMetrics) UpdatePublished

func (NopMetrics) UpdatePublished(s *Update)

type Option

type Option func(h *opt) error

Option instances allow to configure the library.

func WithAllowedHosts

func WithAllowedHosts(hosts []string) Option

WithAllowedHosts sets the allowed hosts.

func WithAnonymous

func WithAnonymous() Option

WithAnonymous allows subscribers with no valid JWT.

func WithCORSOrigins

func WithCORSOrigins(origins []string) Option

WithCORSOrigins sets the allowed CORS origins.

func WithDebug

func WithDebug() Option

WithDebug enables the debug mode.

func WithDemo

func WithDemo() Option

WithDemo enables the demo.

func WithDispatchTimeout

func WithDispatchTimeout(timeout time.Duration) Option

WithDispatchTimeout sets maximum dispatch duration of an update.

func WithHeartbeat

func WithHeartbeat(interval time.Duration) Option

WithHeartbeat sets the frequency of the heartbeat, disabled by default.

func WithPublishOrigins

func WithPublishOrigins(origins []string) Option

WithPublishOrigins sets the origins allowed to publish updates.

func WithPublisherJWT

func WithPublisherJWT(key []byte, alg string) Option

WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.

func WithSubscriberJWT

func WithSubscriberJWT(key []byte, alg string) Option

WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.

func WithSubscriptions

func WithSubscriptions() Option

WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.

func WithTopicSelectorStore

func WithTopicSelectorStore(tss *TopicSelectorStore) Option

WithTopicSelectorStore sets the TopicSelectorStore instance to use.

func WithTransport

func WithTransport(t Transport) Option

WithTransport sets the transport to use.

func WithUI

func WithUI() Option

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics store Hub collected metrics.

func NewPrometheusMetrics

func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics

NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time or it will panic.

func (*PrometheusMetrics) Register deprecated

func (m *PrometheusMetrics) Register(r *mux.Router)

Register configures the Prometheus registry with all collected metrics.

Deprecated: use the Caddy server module or the standalone library instead.

func (*PrometheusMetrics) SubscriberConnected

func (m *PrometheusMetrics) SubscriberConnected(s *Subscriber)

func (*PrometheusMetrics) SubscriberDisconnected

func (m *PrometheusMetrics) SubscriberDisconnected(s *Subscriber)

func (*PrometheusMetrics) UpdatePublished

func (m *PrometheusMetrics) UpdatePublished(u *Update)

type Subscriber

type Subscriber struct {
	ID                 string
	EscapedID          string
	Claims             *claims
	Topics             []string
	EscapedTopics      []string
	RequestLastEventID string
	RemoteAddr         string
	TopicSelectors     []string
	Debug              bool
	// contains filtered or unexported fields
}

Subscriber represents a client subscribed to a list of topics.

func NewSubscriber

func NewSubscriber(lastEventID string, tss *TopicSelectorStore) *Subscriber

NewSubscriber creates a new subscriber.

func (*Subscriber) CanDispatch

func (s *Subscriber) CanDispatch(u *Update) bool

CanDispatch checks if an update can be dispatched to this subsriber.

func (*Subscriber) Disconnect

func (s *Subscriber) Disconnect()

Disconnect disconnects the subscriber.

func (*Subscriber) Dispatch

func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool

Dispatch an update to the subscriber.

func (*Subscriber) HistoryDispatched

func (s *Subscriber) HistoryDispatched(responseLastEventID string)

HistoryDispatched must be called when all messages coming from the history have been dispatched.

func (*Subscriber) Receive

func (s *Subscriber) Receive() <-chan *Update

Receive returns a chan when incoming updates are dispatched.

type TopicSelectorStore

type TopicSelectorStore struct {
	// contains filtered or unexported fields
}

TopicSelectorStore caches compiled templates to improve memory and CPU usage.

func NewTopicSelectorStore

func NewTopicSelectorStore(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error)

NewTopicSelectorStore creates a TopicSelectorStore instance. See https://github.com/dgraph-io/ristretto, set values to 0 to disable.

type Transport

type Transport interface {
	// Dispatch dispatches an update to all subscribers.
	Dispatch(update *Update) error

	// AddSubscriber adds a new subscriber to the transport.
	AddSubscriber(s *Subscriber) error

	// Close closes the Transport.
	Close() error
}

Transport provides methods to dispatch and persist updates.

func NewBoltTransport

func NewBoltTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)

NewBoltTransport create a new boltTransport.

func NewLocalTransport

func NewLocalTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)

NewLocalTransport create a new LocalTransport.

func NewTransport

func NewTransport(u *url.URL, tss *TopicSelectorStore) (Transport, error)

type TransportError

type TransportError struct {
	// contains filtered or unexported fields
}

TransportError is returned when the Transport's DSN is invalid.

func (*TransportError) Error

func (e *TransportError) Error() string

func (*TransportError) Unwrap

func (e *TransportError) Unwrap() error

type TransportFactory

type TransportFactory = func(u *url.URL, tss *TopicSelectorStore) (Transport, error)

TransportFactory is the factory to initialize a new transport.

type TransportSubscribers

type TransportSubscribers interface {
	// GetSubscribers gets the last event ID and the list of active subscribers at this time.
	GetSubscribers() (string, []*Subscriber, error)
}

TransportSubscribers provide a method to retrieve the list of active subscribers.

type Update

type Update struct {
	// The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs).
	// The first one is the canonical IRI, while next ones are alternate IRIs.
	Topics []string

	// Private updates can only be dispatched to subscribers authorized to receive them.
	Private bool

	// To print debug informations
	Debug bool

	// The Server-Sent Event to send.
	Event
}

Update represents an update to send to subscribers.

Directories

Path Synopsis
caddy module
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL