eventsource

package module
v0.0.0-...-220e99a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2013 License: MIT Imports: 11 Imported by: 14

README

eventsource

eventsource provides the building blocks for consuming and building EventSource services.

Installing

$ go get github.com/bernerdschaefer/eventsource

Importing

import "github.com/bernerdschaefer/eventsource"

Docs

See godoc for pretty documentation or:

# in the eventsource package directory
$ go doc

Documentation

Overview

Package eventsource provides the building blocks for consuming and building EventSource services.

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed signals that the event source has been closed and will not be
	// reopened.
	ErrClosed = errors.New("closed")

	// ErrInvalidEncoding is returned by Encoder and Decoder when invalid UTF-8
	// event data is encountered.
	ErrInvalidEncoding = errors.New("invalid UTF-8 sequence")
)

Functions

This section is empty.

Types

type Decoder

type Decoder struct {
	// contains filtered or unexported fields
}

A Decoder reads and decodes EventSource events from an input stream.

Example
package main

import (
	"fmt"
	"github.com/bernerdschaefer/eventsource"
	"io"
	"log"
	"strings"
)

func main() {
	stream := strings.NewReader(`id: 1
event: add
data: 123

id: 2
event: remove
data: 321

id: 3
event: add
data: 123

`)
	dec := eventsource.NewDecoder(stream)

	for {
		var event eventsource.Event
		err := dec.Decode(&event)

		if err == io.EOF {
			break
		} else if err != nil {
			log.Fatal(err)
		}

		fmt.Printf("%s. %s %s\n", event.ID, event.Type, event.Data)
	}

}
Output:

1. add 123
2. remove 321
3. add 123

func NewDecoder

func NewDecoder(r io.Reader) *Decoder

NewDecoder returns a new decoder that reads from r.

func (*Decoder) Decode

func (d *Decoder) Decode(e *Event) error

Decode reads the next event from its input and stores it in the provided Event pointer.

func (*Decoder) ReadField

func (d *Decoder) ReadField() (field string, value []byte, err error)

ReadField reads a single line from the stream and parses it as a field. A complete event is signalled by an empty key and value. The returned error may either be an error from the stream, or an ErrInvalidEncoding if the value is not valid UTF-8.

type Encoder

type Encoder struct {
	// contains filtered or unexported fields
}

Encoder writes EventSource events to an output stream.

Example
package main

import (
	"github.com/bernerdschaefer/eventsource"
	"log"
	"os"
)

func main() {
	enc := eventsource.NewEncoder(os.Stdout)

	events := []eventsource.Event{
		{ID: "1", Data: []byte("data")},
		{ResetID: true, Data: []byte("id reset")},
		{Type: "add", Data: []byte("1")},
	}

	for _, event := range events {
		if err := enc.Encode(event); err != nil {
			log.Fatal(err)
		}
	}

	if err := enc.WriteField("", []byte("heartbeat")); err != nil {
		log.Fatal(err)
	}

	if err := enc.Flush(); err != nil {
		log.Fatal(err)
	}

}
Output:

id: 1
data: data

id
data: id reset

event: add
data: 1

: heartbeat

func NewEncoder

func NewEncoder(w io.Writer) *Encoder

NewEncoder returns a new encoder that writes to w.

func (*Encoder) Encode

func (e *Encoder) Encode(event Event) error

Encode writes an event to the connection.

func (*Encoder) Flush

func (e *Encoder) Flush() error

Flush sends an empty line to signal event is complete, and flushes the writer.

func (*Encoder) WriteField

func (e *Encoder) WriteField(field string, value []byte) error

WriteField writes an event field to the connection. If the provided value contains newlines, multiple fields will be emitted. If the returned error is not nil, it will be either ErrInvalidEncoding or an error from the connection.

type Event

type Event struct {
	Type    string
	ID      string
	Retry   string
	Data    []byte
	ResetID bool
}

An Event is a message can be written to an event stream and read from an event source.

type EventSource

type EventSource struct {
	// contains filtered or unexported fields
}

An EventSource consumes server sent events over HTTP with automatic recovery.

func New

func New(req *http.Request, retry time.Duration) *EventSource

New prepares an EventSource. The connection is automatically managed, using req to connect, and retrying from recoverable errors after waiting the provided retry duration.

Example
package main

import (
	"github.com/bernerdschaefer/eventsource"
	"log"
	"net/http"
	"time"
)

func main() {
	req, _ := http.NewRequest("GET", "http://localhost:9090/events", nil)
	req.SetBasicAuth("user", "pass")

	es := eventsource.New(req, 3*time.Second)

	for {
		event, err := es.Read()

		if err != nil {
			log.Fatal(err)
		}

		log.Printf("%s. %s %s\n", event.ID, event.Type, event.Data)
	}
}
Output:

func (*EventSource) Close

func (es *EventSource) Close()

Close the source. Any further calls to Read() will return ErrClosed.

func (*EventSource) Read

func (es *EventSource) Read() (Event, error)

Read an event from EventSource. If an error is returned, the EventSource will not reconnect, and any further call to Read() will return the same error.

type FlushWriter

type FlushWriter interface {
	io.Writer
	Flush()
}

The FlushWriter interface groups basic Write and Flush methods.

type Handler

type Handler func(lastId string, encoder *Encoder, stop <-chan bool)

Handler is an adapter for ordinary functions to act as an HTTP handler for event sources. It receives the ID of the last event processed by the client, and Encoder to deliver messages, and a channel to be notified if the client connection is closed.

Example
package main

import (
	"github.com/bernerdschaefer/eventsource"
	"net/http"
	"time"
)

func main() {
	http.Handle("/events", eventsource.Handler(func(lastID string, e *eventsource.Encoder, stop <-chan bool) {
		for {
			select {
			case <-time.After(200 * time.Millisecond):
				e.Encode(eventsource.Event{Data: []byte("tick")})
			case <-stop:
				return
			}
		}
	}))
}
Output:

func (Handler) ServeHTTP

func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP calls h with an Encoder and a close notification channel. It performs Content-Type negotiation.

Example
package main

import (
	"github.com/bernerdschaefer/eventsource"
	"net/http"
	"time"
)

func main() {
	es := eventsource.Handler(func(lastID string, e *eventsource.Encoder, stop <-chan bool) {
		for {
			select {
			case <-time.After(200 * time.Millisecond):
				e.Encode(eventsource.Event{Data: []byte("tick")})
			case <-stop:
				return
			}
		}
	})

	http.HandleFunc("/events", func(w http.ResponseWriter, r *http.Request) {
		if r.Header.Get("Authorization") == "" {
			w.WriteHeader(http.StatusUnauthorized)
			return
		}

		es.ServeHTTP(w, r)
	})
}
Output:

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL