sse

package module
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 30, 2024 License: MIT Imports: 21 Imported by: 8

README

go-sse

Go Reference CI codecov Go Report Card

Lightweight, fully spec-compliant HTML5 server-sent events library.

Table of contents

Installation and usage

Install the package using go get:

go get -u github.com/tmaxmax/go-sse

It is strongly recommended to use tagged versions of go-sse in your projects. The master branch has tested but unreleased and maybe undocumented changes, which may break backwards compatibility - use with caution.

The library provides both server-side and client-side implementations of the protocol. The implementations are completely decoupled and unopinionated: you can connect to a server created using go-sse from the browser and you can connect to any server that emits events using the client!

If you are not familiar with the protocol or not sure how it works, read MDN's guide for using server-sent events. The spec is also useful read!

go-sse promises to support the Go versions supported by the Go team – that is, the 2 most recent major releases.

Implementing a server

Providers and why they are vital

First, a server instance has to be created:

import "github.com/tmaxmax/go-sse"

s := &sse.Server{} // zero value ready to use!

The sse.Server type also implements the http.Handler interface, but a server is framework-agnostic: See the ServeHTTP implementation to learn how to implement your own custom logic. It also has some additional configuration options:

s := &sse.Server{
    Provider: /* what goes here? find out next! */,
    OnSession: /* see Go docs for this one */,
    Logger: /* see Go docs for this one, too */,
}

What is this "provider"? A provider is an implementation of the publish-subscribe messaging system:

type Provider interface {
    // Publish a message to all subscribers of the given topics.
    Publish(msg *Message, topics []string) error
    // Add a new subscriber that is unsubscribed when the context is done.
    Subscribe(ctx context.Context, sub Subscription) error
    // Cleanup all resources and stop publishing messages or accepting subscriptions.
    Shutdown(ctx context.Context) error
}

The provider is what dispatches events to clients. When you publish a message (an event), the provider distributes it to all connections (subscribers). It is the central piece of the server: it determines the maximum number of clients your server can handle, the latency between broadcasting events and receiving them client-side and the maximum message throughput supported by your server. As different use cases have different needs, go-sse allows to plug in your own system. Some examples of such external systems are:

If an external system is required, an adapter that satisfies the Provider interface must be created so it can then be used with go-sse. To implement such an adapter, read the Provider documentation for implementation requirements! And maybe share them with others: go-sse is built with reusability in mind!

But in most cases the power and scalability that these external systems bring is not necessary, so go-sse comes with a default provider builtin. Read further!

Meet Joe, the default provider

The server still works by default, without a provider. go-sse brings you Joe: the trusty, pure Go pub-sub implementation, who handles all your events by default! Befriend Joe as following:

import "github.com/tmaxmax/go-sse"

joe := &sse.Joe{} // the zero value is ready to use!

and he'll dispatch events all day! By default, he has no memory of what events he has received, but you can help him remember and replay older messages to new clients using a ReplayProvider:

type ReplayProvider interface {
    // Put a new event in the provider's buffer.
    // If the provider automatically adds IDs aswell,
    // the returned message will also have the ID set,
    // otherwise the input value is returned.
    Put(msg *Message, topics []string) *Message
    // Replay valid events to a subscriber.
    Replay(sub Subscription)
}

go-sse provides two replay providers by default, which both hold the events in-memory: the ValidReplayProvider and FiniteReplayProvider. The first replays events that are valid, not expired, the second replays a finite number of the most recent events. For example:

joe = &sse.Joe{
    ReplayProvider: &sse.ValidReplayProvider{TTL: time.Minute * 5}, // let's have events expire after 5 minutes 
}

will tell Joe to replay all valid events! Replay providers can do so much more (for example, add IDs to events automatically): read the docs on how to use the existing ones and how to implement yours.

You can also implement your own replay providers: maybe you need persistent storage for your events? Or event validity is determined based on other criterias than expiry time? And if you think your replay provider may be useful to others, you are encouraged to share it!

go-sse created the ReplayProvider interface mainly for Joe, but it encourages you to integrate it with your own Provider implementations, where suitable.

Publish your first event

To publish events from the server, we use the sse.Message struct:

import "github.com/tmaxmax/go-sse"

m := &sse.Message{}
m.AppendData("Hello world!", "Nice\nto see you.")

Now let's send it to our clients:

var s *sse.Server

s.Publish(m)

This is how clients will receive our event:

data: Hello world!
data: Nice
data: to see you.

You can also see that go-sse takes care of splitting input by lines into new fields, as required by the specification.

Keep in mind that providers, such as the ValidReplayProvider used above, will panic if they receive events without IDs. To have our event expire, as configured, we must set an ID for the event:

m.ID = sse.ID("unique")

This is how the event will look:

id: unique
data: Hello world!
data: Nice
data: to see you.

Now that it has an ID, the event will be considered expired 5 minutes after it's been published – it won't be replayed to clients after the expiry!

sse.ID is a function that returns an EventID – a special type that denotes an event's ID. An ID must not have newlines, so we must use special functions which validate the value beforehand. The ID constructor function we've used above panics (it is useful when creating IDs from static strings), but there's also NewID, which returns an error indicating whether the value was successfully converted to an ID or not:

id, err := sse.NewID("invalid\nID")

Here, err will be non-nil and id will be an unset value: no id field will be sent to clients if you set an event's ID using that value!

Setting the event's type (the event field) is equally easy:

m.Type = sse.Type("The event's name")

Like IDs, types cannot have newlines. You are provided with constructors that follow the same convention: Type panics, NewType returns an error. Read the docs to find out more about messages and how to use them!

The server-side "Hello world"

Now, let's put everything that we've learned together! We'll create a server that sends a "Hello world!" message every second to all its clients, with Joe's help:

package main

import (
    "log"
    "net/http"
    "time"

    "github.com/tmaxmax/go-sse"
)

func main() {
    s := &sse.Server{}

    go func() {
        m := &sse.Message{}
        m.AppendData("Hello world")

        for range time.Tick(time.Second) {
            _ = s.Publish(m)
        }
    }()

    if err := http.ListenAndServe(":8000", s); err != nil {
        log.Fatalln(err)
    }
}

Joe is our default provider here, as no provider is given to the server constructor. The server is already an http.Handler so we can use it directly with http.ListenAndServe.

Also see a more complex example!

This is by far a complete presentation, make sure to read the docs in order to use go-sse to its full potential!

Using the client

Creating a client

We will use the sse.Client type for connecting to event streams:

type Client struct {
    HTTPClient              *http.Client
    OnRetry                 backoff.Notify
    ResponseValidator       ResponseValidator
    MaxRetries              int
    DefaultReconnectionTime time.Duration
}

As you can see, it uses a net/http client. It also uses the cenkalti/backoff library for implementing auto-reconnect when a connection to a server is lost. Read the client docs and the Backoff library's docs to find out how to configure the client. We'll use the default client the package provides for further examples.

Initiating a connection

We must first create an http.Request - yup, a fully customizable request:

req, err := http.NewRequestWithContext(ctx, http.MethodGet, "host", http.NoBody)

Any kind of request is valid as long as your server handler supports it: you can do a GET, a POST, send a body; do whatever! The context is used as always for cancellation - to stop receiving events you will have to cancel the context. Let's initiate a connection with this request:

import "github.com/tmaxmax/go-sse"

conn := sse.DefaultClient.NewConnection(req)
// you can also do sse.NewConnection(req)
// it is an utility function that calls the
// NewConnection method on the default client
Subscribing to events

Great! Let's imagine the event stream looks as following:

data: some unnamed event

event: I have a name
data: some data

event: Another name
data: some data

To receive the unnamed events, we subscribe to them as following:

unsubscribe := conn.SubscribeMessages(func (event sse.Event) {
    // do something with the event
})

To receive the events named "I have a name":

unsubscribe := conn.SubscribeEvent("I have a name", func (event sse.Event) {
    // do something with the event
})

If you want to subscribe to all events, regardless of their name:

unsubscribe := conn.SubscribeToAll(func (event sse.Event) {
    // do something with the event
})

All Susbcribe methods return a function that when called tells the connection to stop calling the corresponding callback.

In order to work with events, the sse.Event type has some fields and methods exposed:

type Event struct {
    LastEventID string
    Name        string
    Data        string
}

Pretty self-explanatory, but make sure to read the docs!

Now, with this knowledge, let's subscribe to all unnamed events and, when the connection is established, print their data:

unsubscribe := conn.SubscribeMessages(func(event sse.Event) {
    fmt.Printf("Received an unnamed event: %s\n", event.Data)
})
Establishing the connection

Great, we are subscribed now! Let's start receiving events:

err := conn.Connect()

By calling Connect, the request created above will be sent to the server, and if successful, the subscribed callbacks will be called when new events are received. Connect returns only after all callbacks have finished executing. To stop calling a certain callback, call the unsubscribe function returned when subscribing. You can also subscribe new callbacks after calling Connect from a different goroutine. When using a context.Context to stop the connection, the error returned will be the context error – be it context.Canceled, context.DeadlineExceeded or a custom cause (when using context.WithCancelCause). In other words, a successfully closed Connection will always return an error – if the context error is not relevant, you can ignore it. For example:

if err := conn.Connect(); !errors.Is(err, context.Canceled) {
    // handle error
}

A context created with context.WithCancel, or one with context.WithCancelCause and cancelled with the error context.Canceled is assumed above.

There may be situations where the connection does not have to live for indeterminately long – for example when using the OpenAI API. In those situations, configure the client to not retry the connection and ignore io.EOF on return:

client := sse.Client{
    Backoff: sse.Backoff{
        MaxRetries: -1,
    },
    // other settings...
}

req, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/...", body)
conn := client.NewConnection(req)

conn.SubscribeMessages(/* callback */)

if err := conn.Connect(); !errors.Is(err, io.EOF) {
    // handle error
}
Connection lost?

Either way, after receiving so many events, something went wrong and the server is temporarily down. Oh no! As a last hope, it has sent us the following event:

retry: 60000
: that's a minute in milliseconds and this
: is a comment which is ignored by the client

Not a sweat, though! The connection will automatically be reattempted after a minute, when we'll hope the server's back up again. Canceling the request's context will cancel any reconnection attempt, too.

If the server doesn't set a retry time, the client's DefaultReconnectionTime is used.

The "Hello world" server's client

Let's use what we know to create a client for the previous server example:

package main

import (
    "fmt"
    "net/http"
    "os"

    "github.com/tmaxmax/go-sse"
)

func main() {
    r, _ := http.NewRequest(http.MethodGet, "http://localhost:8000", nil)
    conn := sse.NewConnection(r)

    conn.SubscribeMessages(func(ev sse.Event) {
        fmt.Printf("%s\n\n", ev.Data)
    })

    if err := conn.Connect(); err != nil {
        fmt.Fprintln(os.Stderr, err)
    }
}

Yup, this is it! We are using the default client to receive all the unnamed events from the server. The output will look like this, when both programs are run in parallel:

Hello world!

Hello world!

Hello world!

Hello world!

...

See the complex example's client too!

License

This project is licensed under the MIT license.

Contributing

The library's in its early stages, so contributions are vital - I'm so glad you wish to improve go-sse! Maybe start by opening an issue first, to describe the intended modifications and further discuss how to integrate them. Open PRs to the master branch and wait for CI to complete. If all is clear, your changes will soon be merged! Also, make sure your changes come with an extensive set of tests and the code is formatted.

Thank you for contributing!

Documentation

Overview

Package sse provides utilities for creating and consuming fully spec-compliant HTML5 server-sent events streams.

The central piece of a server's implementation is the Provider interface. A Provider describes a publish-subscribe system that can be used to implement messaging for the SSE protocol. This package already has an implementation, called Joe, that is the default provider for any server. Abstracting the messaging system implementation away allows servers to use any arbitrary provider under the same interface. The default provider will work for simple use-cases, but where scalability is required, one will look at a more suitable solution. Adapters that satisfy the Provider interface can easily be created, and then plugged into the server instance. Events themselves are represented using the Message type.

On the client-side, we use the Client struct to create connections to event streams. Using an `http.Request` we instantiate a Connection. Then we subscribe to incoming events using callback functions, and then we establish the connection by calling the Connection's Connect method.

Example (MessageWriter)
e := Message{
	Type: Type("test"),
	ID:   ID("1"),
}
w := &strings.Builder{}

bw := base64.NewEncoder(base64.StdEncoding, w)
binary.Write(bw, binary.BigEndian, []byte{6, 9, 4, 2, 0})
binary.Write(bw, binary.BigEndian, []byte("data from sensor"))
bw.Close()
w.WriteByte('\n') // Ensures that the data written above will be a distinct `data` field.

enc := json.NewEncoder(w)
enc.SetIndent("", "  ")
enc.Encode(map[string]string{"hello": "world"})
// Not necessary to add a newline here – json.Encoder.Encode adds a newline at the end.

// io.CopyN(hex.NewEncoder(w), rand.Reader, 8)
io.Copy(hex.NewEncoder(w), bytes.NewReader([]byte{5, 1, 6, 34, 234, 12, 143, 91}))

mw := io.MultiWriter(os.Stdout, w)
// The first newline adds the data written above as a `data field`.
io.WriteString(mw, "\nYou'll see me both in console and in event\n\n")

// Add the data to the event. It will be split into fields here,
// according to the newlines present in the input.
e.AppendData(w.String())
e.WriteTo(os.Stdout)
Output:

You'll see me both in console and in event

id: 1
event: test
data: BgkEAgBkYXRhIGZyb20gc2Vuc29y
data: {
data:   "hello": "world"
data: }
data: 05010622ea0c8f5b
data: You'll see me both in console and in event
data:

Index

Examples

Constants

View Source
const DefaultTopic = ""

DefaultTopic is the identifier for the topic that is implied when no topics are specified for a Subscription or a Message.

Variables

View Source
var DefaultClient = &Client{
	HTTPClient:        http.DefaultClient,
	ResponseValidator: DefaultValidator,
	Backoff: Backoff{
		InitialInterval: time.Millisecond * 500,
		Multiplier:      1.5,
		Jitter:          0.5,
	},
}

DefaultClient is the client that is used when creating a new connection using the NewConnection function. Unset properties on new clients are replaced with the ones set for the default client.

View Source
var ErrNoGetBody = errors.New("the GetBody function doesn't exist on the request")

ErrNoGetBody is a sentinel error returned when the connection cannot be reattempted due to GetBody not existing on the original request.

View Source
var ErrNoTopic = errors.New("go-sse.server: no topics specified")

ErrNoTopic is a sentinel error returned by Providers when a Message is published without any topics. It is not an issue to call Server.Publish without topics, because the Server will add the DefaultTopic; it is an error to call Provider.Publish without any topics, though.

View Source
var ErrProviderClosed = errors.New("go-sse.server: provider is closed")

ErrProviderClosed is a sentinel error returned by providers when any operation is attempted after the provider is closed.

View Source
var ErrUnexpectedEOF = parser.ErrUnexpectedEOF

ErrUnexpectedEOF is returned when unmarshaling a Message from an input that doesn't end in a newline.

If it returned from a Connection, it means that the data from the server has reached EOF in the middle of an incomplete event and retries are disabled (normally the client retries the connection in this situation).

View Source
var ErrUpgradeUnsupported = errors.New("go-sse.server: upgrade unsupported")

ErrUpgradeUnsupported is returned when a request can't be upgraded to support server-sent events.

Functions

This section is empty.

Types

type Backoff added in v0.8.0

type Backoff struct {
	// The initial wait time before a reconnection is attempted.
	// Must be >0. Defaults to 500ms.
	InitialInterval time.Duration
	// How much should the reconnection time grow on subsequent attempts.
	// Must be >=1; 1 = constant interval. Defaults to 1.5.
	Multiplier float64
	// How much does the reconnection time vary relative to the base value.
	// This is useful to prevent multiple clients to reconnect at the exact
	// same time, as it makes the wait times distinct.
	// Must be in range (0, 1); -1 = no randomization. Defaults to 0.5.
	Jitter float64
	// How much can the wait time grow.
	// If <=0 = the wait time can infinitely grow. Defaults to infinite growth.
	MaxInterval time.Duration
	// How much time can retries be attempted.
	// For example, if this is 5 seconds, after 5 seconds the client
	// will stop retrying.
	// If <=0 = no limit. Defaults to no limit.
	MaxElapsedTime time.Duration
	// How many retries are allowed.
	// <0 = no retries, 0 = infinite. Defaults to infinite retries.
	MaxRetries int
}

Backoff configures the reconnection strategy of a Connection.

type Client

type Client struct {
	// The HTTP client to be used. Defaults to http.DefaultClient.
	HTTPClient *http.Client
	// A callback that's executed whenever a reconnection attempt starts.
	// It receives the error that caused the retry and the reconnection time.
	OnRetry func(error, time.Duration)
	// A function to check if the response from the server is valid.
	// Defaults to a function that checks the response's status code is 200
	// and the content type is text/event-stream.
	//
	// If the error type returned has a Temporary or a Timeout method,
	// they will be used to determine whether to reattempt the connection.
	// Otherwise, the error will be considered permanent and no reconnections
	// will be attempted.
	ResponseValidator ResponseValidator
	// Backoff configures the backoff strategy. See the documentation of
	// each field for more information.
	Backoff Backoff
}

The Client struct is used to initialize new connections to different servers. It is safe for concurrent use.

After connections are created, the Connect method must be called to start receiving events.

func (*Client) NewConnection

func (c *Client) NewConnection(r *http.Request) *Connection

NewConnection initializes and configures a connection. On connect, the given request is sent and if successful the connection starts receiving messages. Use the request's context to stop the connection.

If the request has a body, it is necessary to provide a GetBody function in order for the connection to be reattempted, in case of an error. Using readers such as bytes.Reader, strings.Reader or bytes.Buffer when creating a request using http.NewRequestWithContext will ensure this function is present on the request.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a connection to an events stream. Created using the Client struct, a Connection processes the incoming events and calls the subscribed event callbacks. If the connection to the server temporarily fails, the connection will be reattempted. Retry values received from servers will be taken into account.

Connections must not be copied after they are created.

func NewConnection

func NewConnection(r *http.Request) *Connection

NewConnection creates a connection using the default client.

func (*Connection) Connect

func (c *Connection) Connect() error

Connect sends the request the connection was created with to the server and, if successful, it starts receiving events. The caller goroutine is blocked until the request's context is done or an error occurs.

If the request's context is cancelled, Connect returns its error. Otherwise, if the maximum number or retries is made, the last error that occurred is returned. Connect never returns otherwise – either the context is cancelled, or it's done retrying.

All errors returned other than the context errors will be wrapped inside a *ConnectionError.

func (*Connection) SubscribeEvent

func (c *Connection) SubscribeEvent(typ string, cb EventCallback) EventCallbackRemover

SubscribeEvent subscribes the given callback to all the events with the provided type (the `event` field has the value given here). Remove the callback by calling the returned function.

func (*Connection) SubscribeMessages

func (c *Connection) SubscribeMessages(cb EventCallback) EventCallbackRemover

SubscribeMessages subscribes the given callback to all events without type (without or with empty `event` field). Remove the callback by calling the returned function.

func (*Connection) SubscribeToAll

func (c *Connection) SubscribeToAll(cb EventCallback) EventCallbackRemover

SubscribeToAll subscribes the given callback to all events, with or without type. Remove the callback by calling the returned function.

type ConnectionError

type ConnectionError struct {
	// The request for which the connection failed.
	Req *http.Request
	// The reason the operation failed.
	Err error
	// The reason why the request failed.
	Reason string
}

ConnectionError is the type that wraps all the connection errors that occur.

func (*ConnectionError) Error

func (e *ConnectionError) Error() string

func (*ConnectionError) Unwrap

func (e *ConnectionError) Unwrap() error

type Event

type Event struct {
	// The last non-empty ID of all the events received. This may not be
	// the ID of the latest event!
	LastEventID string
	// The event's type. It is empty if the event is unnamed.
	Type string
	// The event's payload.
	Data string
}

The Event struct represents an event sent to the client by a server.

type EventCallback added in v0.3.0

type EventCallback func(Event)

EventCallback is a function that is used to receive events from a Connection.

type EventCallbackRemover added in v0.3.0

type EventCallbackRemover func()

EventCallbackRemover is a function that removes an already registered callback from a connection. Calling it multiple times is a no-op.

type EventID

type EventID struct {
	// contains filtered or unexported fields
}

EventID is a value of the "id" field. It must have a single line.

func ID added in v0.5.0

func ID(value string) EventID

ID creates an event ID and assumes it is valid. If it is not valid, it panics.

func NewID added in v0.5.0

func NewID(value string) (EventID, error)

NewID creates an event ID value. A valid ID must not have any newlines. If the input is not valid, an unset (invalid) ID is returned.

func (EventID) IsSet

func (i EventID) IsSet() bool

IsSet returns true if the receiver is a valid (set) value.

func (*EventID) MarshalJSON

func (i *EventID) MarshalJSON() ([]byte, error)

MarshalJSON returns a JSON representation of the underlying value if it is set. It otherwise returns the representation of the JSON null value.

func (*EventID) MarshalText

func (i *EventID) MarshalText() ([]byte, error)

MarshalText returns a copy of the underlying value if it is set. It returns an error when trying to marshal an unset value.

func (*EventID) Scan

func (i *EventID) Scan(src interface{}) error

Scan implements the sql.Scanner interface. Values can be scanned from:

  • nil interfaces (result: unset value)
  • byte slice
  • string

func (EventID) String

func (i EventID) String() string

String returns the underlying value. The value may be an empty string, make sure to check if the value is set before using it.

func (*EventID) UnmarshalJSON

func (i *EventID) UnmarshalJSON(data []byte) error

UnmarshalJSON sets the underlying value to the given JSON value if the value is a string. The previous value is discarded if the operation fails.

func (*EventID) UnmarshalText

func (i *EventID) UnmarshalText(data []byte) error

UnmarshalText sets the underlying value to the given string, if valid. If the input is invalid, no changes are made to the receiver.

func (EventID) Value

func (i EventID) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

type EventType added in v0.5.0

type EventType struct {
	// contains filtered or unexported fields
}

EventType is a value of the "event" field. It must have a single line.

func NewType added in v0.5.0

func NewType(value string) (EventType, error)

NewType creates a value for the "event" field. It is valid if it does not have any newlines. If the input is not valid, an unset (invalid) ID is returned.

func Type added in v0.5.0

func Type(value string) EventType

Type creates an EventType and assumes it is valid. If it is not valid, it panics.

func (EventType) IsSet added in v0.5.0

func (i EventType) IsSet() bool

IsSet returns true if the receiver is a valid (set) value.

func (*EventType) MarshalJSON added in v0.5.0

func (i *EventType) MarshalJSON() ([]byte, error)

MarshalJSON returns a JSON representation of the underlying value if it is set. It otherwise returns the representation of the JSON null value.

func (*EventType) MarshalText added in v0.5.0

func (i *EventType) MarshalText() ([]byte, error)

MarshalText returns a copy of the underlying value if it is set. It returns an error when trying to marshal an unset value.

func (*EventType) Scan added in v0.5.0

func (i *EventType) Scan(src interface{}) error

Scan implements the sql.Scanner interface. Values can be scanned from:

  • nil interfaces (result: unset value)
  • byte slice
  • string

func (EventType) String added in v0.5.0

func (i EventType) String() string

String returns the underlying value. The value may be an empty string, make sure to check if the value is set before using it.

func (*EventType) UnmarshalJSON added in v0.5.0

func (i *EventType) UnmarshalJSON(data []byte) error

UnmarshalJSON sets the underlying value to the given JSON value if the value is a string. The previous value is discarded if the operation fails.

func (*EventType) UnmarshalText added in v0.5.0

func (i *EventType) UnmarshalText(data []byte) error

UnmarshalText sets the underlying value to the given string, if valid. If the input is invalid, no changes are made to the receiver.

func (EventType) Value added in v0.5.0

func (i EventType) Value() (driver.Value, error)

Value implements the driver.Valuer interface.

type FiniteReplayProvider

type FiniteReplayProvider struct {

	// Count is the maximum number of events FiniteReplayProvider should hold as valid.
	// It must be a positive integer, or the code will panic.
	Count int
	// AutoIDs configures FiniteReplayProvider to automatically set the IDs of events.
	AutoIDs bool
	// contains filtered or unexported fields
}

FiniteReplayProvider is a replay provider that replays at maximum a certain number of events. The events must have an ID unless the AutoIDs flag is toggled.

func (*FiniteReplayProvider) Put

func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message

Put puts a message into the provider's buffer. If there are more messages than the maximum number, the oldest message is removed.

func (*FiniteReplayProvider) Replay

func (f *FiniteReplayProvider) Replay(subscription Subscription) error

Replay replays the messages in the buffer to the listener. It doesn't take into account the messages' expiry times.

type Joe

type Joe struct {

	// An optional replay provider that Joe uses to resend older messages to new subscribers.
	ReplayProvider ReplayProvider
	// contains filtered or unexported fields
}

Joe is a basic server provider that synchronously executes operations by queueing them in channels. Events are also sent synchronously to subscribers, so if a subscriber's callback blocks, the others have to wait.

Joe optionally supports event replaying with the help of a replay provider.

If the replay provider panics, the subscription for which it panicked is considered failed and an error is returned, and thereafter the replay provider is not used anymore – no replays will be attempted for future subscriptions. If due to some other unexpected scenario something panics internally, Joe will remove all subscribers and close itself, so subscribers don't end up blocked.

He serves simple use-cases well, as he's light on resources, and does not require any external services. Also, he is the default provider for Servers.

func (*Joe) Publish

func (j *Joe) Publish(msg *Message, topics []string) error

Publish tells Joe to send the given message to the subscribers. When a message is published to multiple topics, Joe makes sure to not send the Message multiple times to clients that are subscribed to more than one topic that receive the given Message. Every client receives each unique message once, regardless of how many topics it is subscribed to or to how many topics the message is published.

func (*Joe) Shutdown added in v0.6.0

func (j *Joe) Shutdown(ctx context.Context) (err error)

Stop signals Joe to close all subscribers and stop receiving messages. It returns when all the subscribers are closed.

Further calls to Stop will return ErrProviderClosed.

func (*Joe) Subscribe

func (j *Joe) Subscribe(ctx context.Context, sub Subscription) error

Subscribe tells Joe to send new messages to this subscriber. The subscription is automatically removed when the context is done, a callback error occurs or Joe is stopped.

type LogLevel added in v0.8.0

type LogLevel int

LogLevel are the supported log levels of the Server's Logger.

const (
	LogLevelInfo LogLevel = iota
	LogLevelWarn
	LogLevelError
)

All the available log levels.

type Logger added in v0.4.1

type Logger interface {
	// Log is called by the Server to log an event. The http.Request context
	// is passed. The message string is useful for display and the data contains
	// additional information about the event.
	//
	// When the log level is Error, the data map will contain an "err" key
	// with a value of type error. This is the error that triggered the log
	// event.
	//
	// If the data map contains the "lastEventID" key, then it means that
	// a client is being subscribed. The value corresponding to "lastEventID"
	// is of type sse.EventID; there will also be a "topics" key, with a value of
	// type []string, which contains all the topics the client is being
	// subscribed to.
	Log(ctx context.Context, level LogLevel, msg string, data map[string]any)
}

The Logger interface which the Server expects. Adapt your loggers to this interface in order to use it with the Server.

type Message

type Message struct {
	ID    EventID
	Type  EventType
	Retry time.Duration
	// contains filtered or unexported fields
}

Message is the representation of an event sent from the server to its clients.

func (*Message) AppendComment added in v0.5.0

func (e *Message) AppendComment(comments ...string)

AppendComment adds comment fields to the message's event. If the comments span multiple lines, they are broken into multiple comment fields.

func (*Message) AppendData

func (e *Message) AppendData(chunks ...string)

AppendData adds multiple data fields on the message's event from the given strings. Each string will be a distinct data field, and if the strings themselves span multiple lines they will be broken into multiple fields.

Server-sent events are not suited for binary data: the event fields are delimited by newlines, where a newline can be a LF, CR or CRLF sequence. When the client interprets the fields, it joins multiple data fields using LF, so information is altered. Here's an example:

initial payload: This is a\r\nmultiline\rtext.\nIt has multiple\nnewline\r\nvariations.
data sent over the wire:
	data: This is a
	data: multiline
	data: text.
	data: It has multiple
	data: newline
	data: variations
data received by client: This is a\nmultiline\ntext.\nIt has multiple\nnewline\nvariations.

Each line prepended with "data:" is a field; multiple data fields are joined together using LF as the delimiter. If you attempted to send the same payload without prepending the "data:" prefix, like so:

data: This is a
multiline
text.
It has multiple
newline
variations

there would be only one data field (the first one). The rest would be different fields, named "multiline", "text.", "It has multiple" etc., which are invalid fields according to the protocol.

Besides, the protocol explicitly states that event streams must always be UTF-8 encoded: https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream.

If you need to send binary data, you can use a Base64 encoder or any other encoder that does not output any newline characters (\r or \n) and then append the resulted data.

Given that clients treat all newlines the same and replace the original newlines with LF, for internal code simplicity AppendData replaces them aswell.

func (*Message) Clone

func (e *Message) Clone() *Message

Clone returns a copy of the message.

func (*Message) MarshalText

func (e *Message) MarshalText() ([]byte, error)

MarshalText writes the standard textual representation of the message's event. Marshalling and unmarshalling will result in a message with an event that has the same fields; topic will be lost.

If you want to preserve everything, create your own custom marshalling logic. For an example using encoding/json, see the top-level MessageCustomJSONMarshal example.

Use the WriteTo method if you don't need the byte representation.

The representation is written to a bytes.Buffer, which means the error is always nil. If the buffer grows to a size bigger than the maximum allowed, MarshalText will panic. See the bytes.Buffer documentation for more info.

func (*Message) String

func (e *Message) String() string

String writes the message's event standard textual representation to a strings.Builder and returns the resulted string. It may panic if the representation is too long to be buffered.

Use the WriteTo method if you don't actually need the string representation.

func (*Message) UnmarshalText

func (e *Message) UnmarshalText(p []byte) error

UnmarshalText extracts the first event found in the given byte slice into the receiver. The input is expected to be a wire format event, as defined by the spec. Therefore, previous fields present on the Message will be overwritten (i.e. event, ID, comments, data, retry).

Unmarshaling ignores fields with invalid names. If no valid fields are found, an error is returned. For a field to be valid it must end in a newline - if the last field of the event doesn't end in one, an error is returned.

All returned errors are of type UnmarshalError.

func (*Message) WriteTo

func (e *Message) WriteTo(w io.Writer) (int64, error)

WriteTo writes the standard textual representation of the message's event to an io.Writer. This operation is heavily optimized, so it is strongly preferred over MarshalText or String.

type MessageWriter added in v0.5.2

type MessageWriter interface {
	// Sens sends the message to the client.
	// To make sure it is sent, call Flush.
	Send(m *Message) error
	// Flush sends any buffered messages to the client.
	Flush() error
}

MessageWriter is a special kind of response writer used by providers to send Messages to clients.

type Provider

type Provider interface {
	// Subscribe to the provider. The context is used to remove the subscriber automatically
	// when it is done. Errors returned by the subscription's callback function must be returned
	// by Subscribe.
	//
	// Providers can assume that the topics list for a subscription has at least one topic.
	Subscribe(ctx context.Context, subscription Subscription) error
	// Publish a message to all the subscribers that are subscribed to the given topics.
	// The topics slice must be non-empty, or ErrNoTopic will be raised.
	Publish(message *Message, topics []string) error
	// Shutdown stops the provider. Calling Shutdown will clean up all the provider's resources
	// and make Subscribe and Publish fail with an error. All the listener channels will be
	// closed and any ongoing publishes will be aborted.
	//
	// If the given context times out before the provider is shut down – shutting it down takes
	// longer, the context error is returned.
	//
	// Calling Shutdown multiple times after it successfully returned the first time
	// does nothing but return ErrProviderClosed.
	Shutdown(ctx context.Context) error
}

A Provider is a publish-subscribe system that can be used to implement a HTML5 server-sent events protocol. A standard interface is required so HTTP request handlers are agnostic to the provider's implementation.

Providers are required to be thread-safe.

After Shutdown is called, trying to call any method of the provider must return ErrProviderClosed. The providers may return other implementation-specific errors too, but the close error is guaranteed to be the same across providers.

type ReplayProvider

type ReplayProvider interface {
	// Put adds a new event to the replay buffer. The Message that is returned may not have the
	// same address, if the replay provider automatically sets IDs.
	//
	// Put panics if the message couldn't be queued – if no topics are provided, or
	// a message without an ID is put into a ReplayProvider which does not
	// automatically set IDs.
	//
	// The Put operation may be executed by the replay provider in another goroutine only if
	// it can ensure that any Replay operation called after the Put goroutine is started
	// can replay the new received message. This also requires the replay provider implementation
	// to be thread-safe.
	//
	// Replay providers are not required to guarantee that after Put returns the new events
	// can be replayed. If an error occurs internally when putting the new message
	// and retrying the operation would block for too long, it can be aborted.
	// The errors aren't returned as the server providers won't be able to handle them in a useful manner.
	Put(message *Message, topics []string) *Message
	// Replay sends to a new subscriber all the valid events received by the provider
	// since the event with the listener's ID. If the ID the listener provides
	// is invalid, the provider should not replay any events.
	//
	// Replay operations must be executed in the same goroutine as the one it is called in.
	// Other goroutines may be launched from inside the Replay method, but the events must
	// be sent to the listener in the same goroutine that Replay is called in.
	//
	// If an error is returned, then at least some messages weren't successfully replayed.
	// The error is nil if there were no messages to replay for the particular subscription
	// or if all messages were replayed successfully.
	Replay(subscription Subscription) error
}

A ReplayProvider is a type that can replay older published events to new subscribers. Replay providers use event IDs, the topics the events were published to and optionally the events' expiration times or any other criteria to determine which are valid for replay.

While providers can require events to have IDs beforehand, they can also set the IDs themselves, automatically - it's up to the implementation. Providers should ignore events without IDs, if they require IDs to be set.

Replay providers are not required to be thread-safe - server providers are required to ensure only one operation is executed on the replay provider at any given time. Server providers may not execute replay operation concurrently with other operations, so make sure any action on the replay provider blocks for as little as possible. If a replay provider is thread-safe, some operations may be run in a separate goroutine - see the interface's method documentation.

Executing actions that require waiting for a long time on I/O, such as HTTP requests or database calls must be handled with great care, so the server provider is not blocked. Reducing them to the minimum by using techniques such as caching or by executing them in separate goroutines is recommended, as long as the implementation fulfills the requirements.

If not specified otherwise, the errors returned are implementation-specific.

type ResponseValidator

type ResponseValidator func(*http.Response) error

The ResponseValidator type defines the type of the function that checks whether server responses are valid, before starting to read events from them. See the Client's documentation for more info.

These errors are considered permanent and thus if the client is configured to retry on error no retry is attempted and the error is returned.

var DefaultValidator ResponseValidator = func(r *http.Response) error {
	if r.StatusCode != http.StatusOK {
		return fmt.Errorf("expected status code %d %s, received %d %s", http.StatusOK, http.StatusText(http.StatusOK), r.StatusCode, http.StatusText(r.StatusCode))
	}
	cts := r.Header.Get("Content-Type")
	ct := contentType(cts)
	if expected := "text/event-stream"; ct != expected {
		return fmt.Errorf("expected content type to have %q, received %q", expected, cts)
	}
	return nil
}

DefaultValidator is the default client response validation function. As per the spec, It checks the content type to be text/event-stream and the response status code to be 200 OK.

If this validator fails, errors are considered permanent. No retry attempts are made.

See https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model.

var NoopValidator ResponseValidator = func(_ *http.Response) error {
	return nil
}

NoopValidator is a client response validator function that treats all responses as valid.

type ResponseWriter added in v0.6.0

type ResponseWriter interface {
	http.ResponseWriter
	Flush() error
}

ResponseWriter is a http.ResponseWriter augmented with a Flush method.

type Server

type Server struct {
	// The provider used to publish and subscribe clients to events.
	// Defaults to Joe.
	Provider Provider
	// A callback that's called when an SSE session is started.
	// You can use this to authorize the session, set the topics
	// the client should be subscribed to and so on. Using the
	// Res field of the Session you can write an error response
	// to the client.
	//
	// The boolean returned indicates whether the returned subscription
	// is valid or not. If it is valid, the Provider will receive it
	// and events will be sent to this client, otherwise the request
	// will be ended.
	//
	// If this is not set, the client will be subscribed to the provider
	// using the DefaultTopic.
	OnSession func(*Session) (Subscription, bool)
	// If Logger is not nil, the Server will log various information about
	// the request lifecycle. See the documentation of Logger for more info.
	Logger Logger
	// contains filtered or unexported fields
}

A Server is mostly a convenience wrapper around a provider. It implements the http.Handler interface and has some methods for calling the underlying provider's methods.

When creating a server, if no provider is specified using the WithProvider option, the Joe provider found in this package with no replay provider is used.

func (*Server) Publish

func (s *Server) Publish(e *Message, topics ...string) error

Publish sends the event to all subscribes that are subscribed to the topic the event is published to. The topics are optional - if none are specified, the event is published to the DefaultTopic.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements a default HTTP handler for a server.

This handler upgrades the request, subscribes it to the server's provider and starts sending incoming events to the client, while logging any errors. It also sends the Last-Event-ID header's value, if present.

If the request isn't upgradeable, it writes a message to the client along with an 500 Internal Server ConnectionError response code. If on subscribe the provider returns an error, it writes the error message to the client and a 500 Internal Server ConnectionError response code.

To customize behavior, use the OnSession callback or create your custom handler.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown closes all the connections and stops the server. Publish operations will fail with the error sent by the underlying provider. NewServer requests will be ignored.

Call this method when shutting down the HTTP server using http.Server's RegisterOnShutdown method. Not doing this will result in the server never shutting down or connections being abruptly stopped.

See the Provider.Shutdown documentation for information on context usage and errors.

type Session added in v0.6.0

type Session struct {
	// The response writer for the request. Can be used to write an error response
	// back to the client. Must not be used after the Session was subscribed!
	Res ResponseWriter
	// The initial HTTP request. Can be used to retrieve authentication data,
	// topics, or data from context – a logger, for example.
	Req *http.Request
	// Last event ID of the client. It is unset if no ID was provided in the Last-Event-Id
	// request header.
	LastEventID EventID
	// contains filtered or unexported fields
}

A Session is an HTTP request from an SSE client. Create one using the Upgrade function.

Using a Session you can also access the initial HTTP request, get the last event ID, or write data to the client.

func Upgrade

func Upgrade(w http.ResponseWriter, r *http.Request) (*Session, error)

Upgrade upgrades an HTTP request to support server-sent events. It returns a Session that's used to send events to the client, or an error if the upgrade failed.

The headers required by the SSE protocol are only sent when calling the Send method for the first time. If other operations are done before sending messages, other headers and status codes can safely be set.

func (*Session) Flush added in v0.6.0

func (s *Session) Flush() error

Flush sends any buffered messages to the client.

func (*Session) Send added in v0.6.0

func (s *Session) Send(e *Message) error

Send sends the given event to the client. It returns any errors that occurred while writing the event.

type Subscription

type Subscription struct {
	// The client to which messages are sent. The implementation of the interface does not have to be
	// thread-safe – providers will not call methods on it concurrently.
	Client MessageWriter
	// An optional last event ID indicating the event to resume the stream from.
	// The events will replay starting from the first valid event sent after the one with the given ID.
	// If the ID is invalid replaying events will be omitted and new events will be sent as normal.
	LastEventID EventID
	// The topics to receive message from. Must be a non-empty list.
	// Topics are orthogonal to event types. They are used to filter what the server sends to each client.
	Topics []string
}

The Subscription struct is used to subscribe to a given provider.

type UnmarshalError

type UnmarshalError struct {
	Reason    error
	FieldName string
	// The value of the invalid field.
	FieldValue string
}

UnmarshalError is the error returned by the Message's UnmarshalText method. If the error is related to a specific field, FieldName will be a non-empty string. If no fields were found in the target text or any other errors occurred, only a Reason will be provided. Reason is always present.

func (*UnmarshalError) Error

func (u *UnmarshalError) Error() string

func (*UnmarshalError) Unwrap

func (u *UnmarshalError) Unwrap() error

type ValidReplayProvider

type ValidReplayProvider struct {
	// The function used to retrieve the current time. Defaults to time.Now.
	// Useful when testing.
	Now func() time.Time

	// TTL is for how long a message is valid, since it was added.
	TTL time.Duration
	// After how long the ReplayProvider should attempt to clean up expired events.
	// By default cleanup is done after a fourth of the TTL has passed; this means
	// that messages may be stored for a duration equal to 5/4*TTL. If this is not
	// desired, set the GC interval to a value sensible for your use case or set
	// it to -1 – this disables automatic cleanup, enabling you to do it manually
	// using the GC method.
	GCInterval time.Duration
	// AutoIDs configures ValidReplayProvider to automatically set the IDs of events.
	AutoIDs bool
	// contains filtered or unexported fields
}

ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events. You can use this provider for replaying an infinite number of events, if the events never expire. The provider removes any expired events when a new event is put and after at least a GCInterval period passed. The events must have an ID unless the AutoIDs flag is toggled.

func (*ValidReplayProvider) GC

func (v *ValidReplayProvider) GC()

GC removes all the expired messages from the provider's buffer.

func (*ValidReplayProvider) Put

func (v *ValidReplayProvider) Put(message *Message, topics []string) *Message

Put puts the message into the provider's buffer.

func (*ValidReplayProvider) Replay

func (v *ValidReplayProvider) Replay(subscription Subscription) error

Replay replays all the valid messages to the listener.

Directories

Path Synopsis
cmd
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL