Documentation
¶
Overview ¶
Package mercure helps implementing the Mercure protocol (https://mercure.rocks) in Go projects. It provides an implementation of a Mercure hub as a HTTP handler.
Example ¶
package main import ( "log" "net/http" "github.com/dunglas/mercure" ) func main() { h, err := mercure.NewHub( mercure.WithPublisherJWT([]byte("!ChangeMe!"), "HS256"), mercure.WithSubscriberJWT([]byte("!ChangeMe!"), "HS256"), ) if err != nil { log.Fatal(err) } defer h.Stop() http.Handle("/.well-known/mercure", h) log.Panic(http.ListenAndServe(":8080", nil)) }
Output:
Index ¶
- Constants
- Variables
- func AssignUUID(u *Update)
- func InitConfig(v *viper.Viper)deprecated
- func RegisterTransportFactory(scheme string, factory TransportFactory)
- func SetConfigDefaults(v *viper.Viper)deprecated
- func SetFlags(fs *pflag.FlagSet, v *viper.Viper)deprecated
- func Start()deprecated
- func ValidateConfig(v *viper.Viper) errordeprecated
- type BoltTransport
- type CheckedEntry
- type Event
- type Hub
- func (h *Hub) Demo(w http.ResponseWriter, r *http.Request)
- func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) Serve()deprecated
- func (h *Hub) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (h *Hub) Stop() error
- func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)
- func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)
- type Level
- type LocalTransport
- type LogField
- type Logger
- type Metrics
- type NopMetrics
- type Option
- func WithAllowedHosts(hosts []string) Option
- func WithAnonymous() Option
- func WithCORSOrigins(origins []string) Option
- func WithCookieName(cookieName string) Option
- func WithDebug() Option
- func WithDemo() Option
- func WithDispatchTimeout(timeout time.Duration) Option
- func WithHeartbeat(interval time.Duration) Option
- func WithLogger(logger Logger) Option
- func WithMetrics(m Metrics) Option
- func WithPublishOrigins(origins []string) Option
- func WithPublisherJWT(key []byte, alg string) Option
- func WithSubscriberJWT(key []byte, alg string) Option
- func WithSubscriptions() Option
- func WithTopicSelectorStore(tss *TopicSelectorStore) Option
- func WithTransport(t Transport) Option
- func WithUI() Option
- func WithWriteTimeout(timeout time.Duration) Option
- type PostgresTransport
- func (t *PostgresTransport) AddSubscriber(s *Subscriber) error
- func (t *PostgresTransport) Close() (err error)
- func (t *PostgresTransport) Dispatch(update *Update) error
- func (t *PostgresTransport) GetSubscribers() (string, []*Subscriber, error)
- func (t *PostgresTransport) RemoveSubscriber(s *Subscriber) error
- type PrometheusMetrics
- type RedisTransport
- type Subscriber
- func (s *Subscriber) Disconnect()
- func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool
- func (s *Subscriber) HistoryDispatched(responseLastEventID string)
- func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error
- func (s *Subscriber) Match(u *Update) bool
- func (s *Subscriber) MatchTopic(topic string, private bool) (match bool)
- func (s *Subscriber) Ready() int
- func (s *Subscriber) Receive() <-chan *Update
- func (s *Subscriber) SetTopics(topics, privateTopics []string)
- type SubscriberList
- type TopicSelectorStore
- type TopicSelectorStoreCache
- type Transport
- func NewBoltTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- func NewLocalTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- func NewPostgresTransport(iu *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- func NewRedisTCPTransport(iu *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- func NewRedisUnixTransport(iu *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- func NewTransport(u *url.URL, l Logger, tss *TopicSelectorStore) (Transport, error)
- type TransportError
- type TransportFactory
- type TransportSubscribers
- type Update
Examples ¶
Constants ¶
const ( DefaultTopicSelectorStoreLRUMaxEntriesPerShard = int64(1e4) DefaultTopicSelectorStoreLRUShardCount = int64(256) // 2.5 million entries. )
Gather stats to find the best default values.
const ( TopicSelectorStoreRistrettoDefaultCacheNumCounters = int64(6e7) TopicSelectorStoreRistrettoCacheMaxCost = int64(1e8) // 100 MB )
Gather stats to find the best default values.
const BoltDefaultCleanupFrequency = 0.3
const EarliestLastEventID = "earliest"
EarliestLastEventID is the reserved value representing the earliest available event id.
Variables ¶
var ( // ErrInvalidAuthorizationHeader is returned when the Authorization header is invalid. ErrInvalidAuthorizationHeader = errors.New(`invalid "Authorization" HTTP header`) // ErrNoOrigin is returned when the cookie authorization mechanism is used and no Origin nor Referer headers are presents. ErrNoOrigin = errors.New(`an "Origin" or a "Referer" HTTP header must be present to use the cookie-based authorization mechanism`) // ErrOriginNotAllowed is returned when the Origin is not allowed to post updates. ErrOriginNotAllowed = errors.New("origin not allowed to post updates") // ErrUnexpectedSigningMethod is returned when the signing JWT method is not supported. ErrUnexpectedSigningMethod = errors.New("unexpected signing method") // ErrInvalidJWT is returned when the JWT is invalid. ErrInvalidJWT = errors.New("invalid JWT") // ErrPublicKey is returned when there is an error with the public key. ErrPublicKey = errors.New("public key error") )
var ErrClosedTransport = errors.New("hub: read/write on closed Transport")
ErrClosedTransport is returned by the Transport's Dispatch and AddSubscriber methods after a call to Close.
var ErrInvalidConfig = errors.New("invalid config")
ErrInvalidConfig is returned when the configuration is invalid.
Deprecated: use the Caddy server module or the standalone library instead.
Functions ¶
func AssignUUID ¶
func AssignUUID(u *Update)
AssignUUID generates a new UUID an assign it to the given update if no ID is already set.
func InitConfig
deprecated
func RegisterTransportFactory ¶
func RegisterTransportFactory(scheme string, factory TransportFactory)
func SetConfigDefaults
deprecated
func Start
deprecated
func Start()
Start is an helper method to start the Mercure Hub.
Deprecated: use the Caddy server module or the standalone library instead.
func ValidateConfig
deprecated
Types ¶
type BoltTransport ¶
BoltTransport implements the TransportInterface using the Bolt database.
func (*BoltTransport) AddSubscriber ¶
func (t *BoltTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*BoltTransport) Close ¶
func (t *BoltTransport) Close() (err error)
Close closes the Transport.
func (*BoltTransport) Dispatch ¶
func (t *BoltTransport) Dispatch(update *Update) error
Dispatch dispatches an update to all subscribers and persists it in Bolt DB.
func (*BoltTransport) GetSubscribers ¶
func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers get the list of active subscribers.
func (*BoltTransport) RemoveSubscriber ¶
func (t *BoltTransport) RemoveSubscriber(s *Subscriber) error
RemoveSubscriber removes a new subscriber from the transport.
type CheckedEntry ¶
type CheckedEntry = zapcore.CheckedEntry
CheckedEntry is an alias of zapcore.CheckedEntry, it could be replaced by a custom contract when Go will support generics.
type Event ¶
type Event struct { // The updates' data, encoded in the sever-sent event format: every line starts with the string "data: " // https://www.w3.org/TR/eventsource/#dispatchMessage Data string // The globally unique identifier corresponding to update ID string // The event type, will be attached to the "event" field Type string // The reconnection time Retry uint64 }
Event is the actual Server Sent Event that will be dispatched.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub stores channels with clients currently subscribed and allows to dispatch updates.
func NewHubFromViper
deprecated
func (*Hub) Demo ¶
func (h *Hub) Demo(w http.ResponseWriter, r *http.Request)
Demo exposes INSECURE Demo endpoints to test discovery and authorization mechanisms. Add a query parameter named "body" to define the content to return in the response's body. Add a query parameter named "jwt" set a "mercureAuthorization" cookie containing this token. The Content-Type header will automatically be set according to the URL's extension.
func (*Hub) PublishHandler ¶
func (h *Hub) PublishHandler(w http.ResponseWriter, r *http.Request)
PublishHandler allows publisher to broadcast updates to all subscribers.
func (*Hub) SubscribeHandler ¶
func (h *Hub) SubscribeHandler(w http.ResponseWriter, r *http.Request)
SubscribeHandler creates a keep alive connection and sends the events to the subscribers.
func (*Hub) SubscriptionHandler ¶
func (h *Hub) SubscriptionHandler(w http.ResponseWriter, r *http.Request)
func (*Hub) SubscriptionsHandler ¶
func (h *Hub) SubscriptionsHandler(w http.ResponseWriter, r *http.Request)
type Level ¶
Level is an alias of zapcore.Level, it could be replaced by a custom contract when Go will support generics.
type LocalTransport ¶
LocalTransport implements the TransportInterface without database and simply broadcast the live Updates.
func (*LocalTransport) AddSubscriber ¶
func (t *LocalTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*LocalTransport) Close ¶
func (t *LocalTransport) Close() (err error)
Close closes the Transport.
func (*LocalTransport) Dispatch ¶
func (t *LocalTransport) Dispatch(update *Update) error
Dispatch dispatches an update to all subscribers.
func (*LocalTransport) GetSubscribers ¶
func (t *LocalTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers gets the list of active subscribers.
func (*LocalTransport) RemoveSubscriber ¶
func (t *LocalTransport) RemoveSubscriber(s *Subscriber) error
RemoveSubscriber removes a subscriber from the transport.
type LogField ¶
LogField is an alias of zapcore.Field, it could be replaced by a custom contract when Go will support generics.
type Logger ¶
type Logger interface { Info(msg string, fields ...LogField) Error(msg string, fields ...LogField) Check(Level, string) *CheckedEntry }
Logger defines the Mercure logger.
type Metrics ¶
type Metrics interface { // SubscriberConnected collects metrics about about subscriber connections. SubscriberConnected(s *Subscriber) // SubscriberDisconnected collects metrics about subscriber disconnections. SubscriberDisconnected(s *Subscriber) // UpdatePublished collects metrics about update publications. UpdatePublished(u *Update) }
type NopMetrics ¶
type NopMetrics struct{}
func (NopMetrics) SubscriberConnected ¶
func (NopMetrics) SubscriberConnected(s *Subscriber)
func (NopMetrics) SubscriberDisconnected ¶
func (NopMetrics) SubscriberDisconnected(s *Subscriber)
func (NopMetrics) UpdatePublished ¶
func (NopMetrics) UpdatePublished(s *Update)
type Option ¶
type Option func(h *opt) error
Option instances allow to configure the library.
func WithAllowedHosts ¶
WithAllowedHosts sets the allowed hosts.
func WithAnonymous ¶
func WithAnonymous() Option
WithAnonymous allows subscribers with no valid JWT.
func WithCORSOrigins ¶
WithCORSOrigins sets the allowed CORS origins.
func WithCookieName ¶
WithCookieName sets the name of the authorization cookie (defaults to "mercureAuthorization").
func WithDispatchTimeout ¶
WithDispatchTimeout sets maximum dispatch duration of an update.
func WithHeartbeat ¶
WithHeartbeat sets the frequency of the heartbeat, disabled by default.
func WithMetrics ¶
WithMetrics enables collection of Prometheus metrics.
func WithPublishOrigins ¶
WithPublishOrigins sets the origins allowed to publish updates.
func WithPublisherJWT ¶
WithPublisherJWT sets the JWT key and the signing algorithm to use for publishers.
func WithSubscriberJWT ¶
WithSubscriberJWT sets the JWT key and the signing algorithm to use for subscribers.
func WithSubscriptions ¶
func WithSubscriptions() Option
WithSubscriptions allows to dispatch updates when subscriptions are created or terminated.
func WithTopicSelectorStore ¶
func WithTopicSelectorStore(tss *TopicSelectorStore) Option
WithTopicSelectorStore sets the TopicSelectorStore instance to use.
func WithTransport ¶
WithTransport sets the transport to use.
func WithWriteTimeout ¶
WithWriteTimeout sets maximum duration before closing the connection, defaults to 600s, set to 0 to disable.
type PostgresTransport ¶
PostgresTransport implements the TransportInterface for PostgreSQL databases
func (*PostgresTransport) AddSubscriber ¶
func (t *PostgresTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*PostgresTransport) Close ¶
func (t *PostgresTransport) Close() (err error)
Close closes the Transport.
func (*PostgresTransport) Dispatch ¶
func (t *PostgresTransport) Dispatch(update *Update) error
func (*PostgresTransport) GetSubscribers ¶
func (t *PostgresTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers gets the list of active subscribers.
func (*PostgresTransport) RemoveSubscriber ¶
func (t *PostgresTransport) RemoveSubscriber(s *Subscriber) error
RemoveSubscriber removes a new subscriber from the transport.
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics store Hub collected metrics.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics
NewPrometheusMetrics creates a Prometheus metrics collector. This method must be called only one time or it will panic.
func (*PrometheusMetrics) Register
deprecated
func (m *PrometheusMetrics) Register(r *mux.Router)
Register configures the Prometheus registry with all collected metrics.
Deprecated: use the Caddy server module or the standalone library instead.
func (*PrometheusMetrics) SubscriberConnected ¶
func (m *PrometheusMetrics) SubscriberConnected(s *Subscriber)
func (*PrometheusMetrics) SubscriberDisconnected ¶
func (m *PrometheusMetrics) SubscriberDisconnected(s *Subscriber)
func (*PrometheusMetrics) UpdatePublished ¶
func (m *PrometheusMetrics) UpdatePublished(u *Update)
type RedisTransport ¶
RedisTransport implements the TransportInterface for redis databases
func (*RedisTransport) AddSubscriber ¶
func (t *RedisTransport) AddSubscriber(s *Subscriber) error
AddSubscriber adds a new subscriber to the transport.
func (*RedisTransport) Close ¶
func (t *RedisTransport) Close() (err error)
Close closes the Transport.
func (*RedisTransport) Dispatch ¶
func (t *RedisTransport) Dispatch(update *Update) error
func (*RedisTransport) GetSubscribers ¶
func (t *RedisTransport) GetSubscribers() (string, []*Subscriber, error)
GetSubscribers gets the list of active subscribers.
func (*RedisTransport) RemoveSubscriber ¶
func (t *RedisTransport) RemoveSubscriber(s *Subscriber) error
RemoveSubscriber removes a new subscriber from the transport.
type Subscriber ¶
type Subscriber struct { ID string EscapedID string Claims *claims EscapedTopics []string RequestLastEventID string RemoteAddr string Topics []string TopicRegexps []*regexp.Regexp PrivateTopics []string PrivateRegexps []*regexp.Regexp Debug bool // contains filtered or unexported fields }
Subscriber represents a client subscribed to a list of topics.
func NewSubscriber ¶
func NewSubscriber(lastEventID string, logger Logger) *Subscriber
NewSubscriber creates a new subscriber.
func (*Subscriber) Disconnect ¶
func (s *Subscriber) Disconnect()
Disconnect disconnects the subscriber.
func (*Subscriber) Dispatch ¶
func (s *Subscriber) Dispatch(u *Update, fromHistory bool) bool
Dispatch an update to the subscriber. Security checks must (topics matching) be done before calling Dispatch, for instance by calling Match.
func (*Subscriber) HistoryDispatched ¶
func (s *Subscriber) HistoryDispatched(responseLastEventID string)
HistoryDispatched must be called when all messages coming from the history have been dispatched.
func (*Subscriber) MarshalLogObject ¶
func (s *Subscriber) MarshalLogObject(enc zapcore.ObjectEncoder) error
func (*Subscriber) Match ¶
func (s *Subscriber) Match(u *Update) bool
Match checks if the current subscriber can receive the given update.
func (*Subscriber) MatchTopic ¶
func (s *Subscriber) MatchTopic(topic string, private bool) (match bool)
MatchTopic checks if the current subscriber can access to the given topic.
func (*Subscriber) Ready ¶
func (s *Subscriber) Ready() int
Ready flips the ready flag to true and flushes queued live updates returning number of events flushed.
func (*Subscriber) Receive ¶
func (s *Subscriber) Receive() <-chan *Update
Receive returns a chan when incoming updates are dispatched.
func (*Subscriber) SetTopics ¶
func (s *Subscriber) SetTopics(topics, privateTopics []string)
SetTopics compiles topic selector regexps.
type SubscriberList ¶
type SubscriberList struct {
// contains filtered or unexported fields
}
func NewSubscriberList ¶
func NewSubscriberList(size int) *SubscriberList
func (*SubscriberList) Add ¶
func (sc *SubscriberList) Add(s *Subscriber)
func (*SubscriberList) Len ¶
func (sc *SubscriberList) Len() int
func (*SubscriberList) MatchAny ¶
func (sc *SubscriberList) MatchAny(u *Update) (res []*Subscriber)
func (*SubscriberList) Remove ¶
func (sc *SubscriberList) Remove(s *Subscriber)
func (*SubscriberList) Walk ¶
func (sc *SubscriberList) Walk(start uint64, callback func(s *Subscriber) bool) uint64
type TopicSelectorStore ¶
type TopicSelectorStore struct {
// contains filtered or unexported fields
}
TopicSelectorStore caches compiled templates to improve memory and CPU usage.
func NewTopicSelectorStoreLRU ¶
func NewTopicSelectorStoreLRU(maxEntriesPerShard, shardCount int64) (*TopicSelectorStore, error)
NewTopicSelectorStoreLRU creates a TopicSelectorStore with an LRU cache.
func NewTopicSelectorStoreRistretto
deprecated
func NewTopicSelectorStoreRistretto(cacheNumCounters, cacheMaxCost int64) (*TopicSelectorStore, error)
NewTopicSelectorStoreRistretto creates a TopicSelectorStore instance with a ristretto cache. See https://github.com/dgraph-io/ristretto, set values to 0 to disable.
Deprecated: use NewTopicSelectorStoreLRU instead.
type TopicSelectorStoreCache ¶
type Transport ¶
type Transport interface { // Dispatch dispatches an update to all subscribers. Dispatch(update *Update) error // AddSubscriber adds a new subscriber to the transport. AddSubscriber(s *Subscriber) error // RemoveSubscriber removes a subscriber from the transport. RemoveSubscriber(s *Subscriber) error // Close closes the Transport. Close() error }
Transport provides methods to dispatch and persist updates.
func NewBoltTransport ¶
NewBoltTransport create a new boltTransport.
func NewLocalTransport ¶
NewLocalTransport create a new LocalTransport.
func NewPostgresTransport ¶
func NewRedisTCPTransport ¶
Create a TCP-based transport. Input URL is one of
redis://[user[:pass]@]HOST:[PORT][/DBNUM][?[stream=NAME][cleanup_interval=D[&event_ttl=D]][&REDIS_PARAM]] Creates a plaintext TCP transport rediss://[user[:pass]@]HOST:[PORT][/DBNUM][?[stream=NAME][cleanup_interval=D[&event_ttl=D]][&REDIS_PARAM]] Creates TLS-encrypted TCP transport
HOST is the hostname or IP address of the redis server.
PORT is the port number it is listening on. It defaults to 6379.
DBNUM is the number of the redis database to use ¶
STREAM sets the redis stream name (defaults to "mercure")
The parameters cleanup_interval and event_ttl control periodic database cleanups. Both take as their argument a duration specification suitable as input to time.ParseDuration. The cleanup_control parameter sets a duration between two successive database cleanups. It must be set in order for the cleanup routine to be enabled. Optional event_ttl parameter sets the time-to-live of an event in the database. It defaults to 24 hours.
REDIS_PARAM are redis-specific parameters as described in
https://pkg.go.dev/github.com/go-redis/redis/v8#ParseURL.
func NewRedisUnixTransport ¶
Create a UNIX-based (socket) redis transport. Input URL is:
redis+unix://[user[:pass]@]SOCKET_PATH[?[stream=NAME][cleanup_interval=N[&event_ttl=N]][&db=DBNUM][REDIS_PARAM]]
SOCKET_PATH is the pathname of the unix socket server is listening on. It must be an absolute pathname, i.e. it must begine with a /.
DBNUM is the number of the redis database to use.
See above for the description of stream, cleanup_interval and event_ttl.
REDIS_PARAM are redis-specific parameters as described in
https://pkg.go.dev/github.com/go-redis/redis/v8#ParseURL.
func NewTransport ¶
type TransportError ¶
type TransportError struct {
// contains filtered or unexported fields
}
TransportError is returned when the Transport's DSN is invalid.
func (*TransportError) Error ¶
func (e *TransportError) Error() string
func (*TransportError) Unwrap ¶
func (e *TransportError) Unwrap() error
type TransportFactory ¶
TransportFactory is the factory to initialize a new transport.
type TransportSubscribers ¶
type TransportSubscribers interface { // GetSubscribers gets the last event ID and the list of active subscribers at this time. GetSubscribers() (string, []*Subscriber, error) }
TransportSubscribers provide a method to retrieve the list of active subscribers.
type Update ¶
type Update struct { // The topics' Internationalized Resource Identifier (RFC3987) (will most likely be URLs). // The first one is the canonical IRI, while next ones are alternate IRIs. Topics []string // Private updates can only be dispatched to subscribers authorized to receive them. Private bool // To print debug informations Debug bool // The Server-Sent Event to send. Event }
Update represents an update to send to subscribers.
func (*Update) MarshalLogObject ¶
func (u *Update) MarshalLogObject(enc zapcore.ObjectEncoder) error
Source Files
¶
- authorization.go
- bolt_transport.go
- config.go
- demo.go
- event.go
- handler.go
- hub.go
- local_transport.go
- log.go
- metrics.go
- postgres_transport.go
- publish.go
- redis_transport.go
- subscribe.go
- subscriber.go
- subscriber_list.go
- subscription.go
- topic_selector.go
- topic_selector_lru.go
- topic_selector_ristretto.go
- transport.go
- update.go