README
¶
Hertz-SSE
(This is a community driven project)
English | 中文
Server-Sent events is a specification for implementing server-side-push for web frontend applications, through plain-old HTTP. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C. This repository is a fork of manucorporat/sse and r3labs/sse for Hertz.
Install
go get github.com/hertz-contrib/sse
Example
Server
see: examples/server/quickstart/main.go
package main
import (
"context"
"net/http"
"time"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/app/server"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/hertz-contrib/sse"
)
func main() {
h := server.Default()
h.GET("/sse", func(ctx context.Context, c *app.RequestContext) {
// client can tell server last event it received with Last-Event-ID header
lastEventID := sse.GetLastEventID(c)
hlog.CtxInfof(ctx, "last event ID: %s", lastEventID)
// you must set status code and response headers before first render call
c.SetStatusCode(http.StatusOK)
s := sse.NewStream(c)
count := 0
sendCountLimit := 10
for t := range time.NewTicker(1 * time.Second).C {
event := &sse.Event{
Event: "timestamp",
Data: []byte(t.Format(time.RFC3339)),
}
err := s.Publish(event)
if err != nil {
return
}
count++
if count >= sendCountLimit {
// send end flag to client
err := s.Publish(&sse.Event{
Event: "end",
Data: []byte("end flag"),
})
if err != nil {
return
}
break
}
}
})
h.Spin()
}
Client
see: examples/client/quickstart/main.go
package main
import (
"context"
"sync"
"time"
"github.com/cloudwego/hertz/pkg/app/client"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/cloudwego/hertz/pkg/protocol"
"github.com/hertz-contrib/sse"
)
var wg sync.WaitGroup
func main() {
wg.Add(2)
go func() {
// create Hertz client
hCli, err := client.NewClient()
if err != nil {
hlog.Errorf("create Hertz client failed, err: %v", err)
return
}
// inject Hertz client to create SSE client
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
if err != nil {
hlog.Errorf("create SSE client failed, err: %v", err)
return
}
// touch off when connected to the server
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
hlog.Infof("client1 connect to server success")
})
// touch off when the connection is shutdown
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
hlog.Infof("client1 disconnect to server success")
})
events := make(chan *sse.Event)
errChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
go func() {
// build the req sent with each SSE request
req := &protocol.Request{}
req.SetRequestURI("http://127.0.0.1:8888/sse")
cErr := c.SubscribeWithContext(ctx, func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
}
}, sse.WithRequest(req))
errChan <- cErr
}()
go func() {
time.Sleep(5 * time.Second)
cancel()
hlog.Info("client1 subscribe cancel")
}()
for {
select {
case e := <-events:
hlog.Infof("client1, %+v", e)
case err := <-errChan:
if err == nil {
hlog.Info("client1, ctx done, read stop")
} else {
hlog.CtxErrorf(ctx, "client1, err = %s", err.Error())
}
wg.Done()
return
}
}
}()
go func() {
// create Hertz client
hCli, err := client.NewClient()
if err != nil {
hlog.Errorf("create Hertz client failed, err: %v", err)
return
}
// inject Hertz client to create SSE client
c, err := sse.NewClientWithOptions(sse.WithHertzClient(hCli))
if err != nil {
hlog.Errorf("create SSE client failed, err: %v", err)
return
}
// touch off when connected to the server
c.SetOnConnectCallback(func(ctx context.Context, client *sse.Client) {
hlog.Infof("client2 connect to server success")
})
// touch off when the connection is shutdown
c.SetDisconnectCallback(func(ctx context.Context, client *sse.Client) {
hlog.Infof("client2 disconnect to server success")
})
events := make(chan *sse.Event, 10)
errChan := make(chan error)
go func() {
// build the req sent with each SSE request
req := &protocol.Request{}
req.SetRequestURI("http://127.0.0.1:8888/sse")
cErr := c.Subscribe(func(msg *sse.Event) {
if msg.Data != nil {
events <- msg
return
}
}, sse.WithRequest(req))
errChan <- cErr
}()
streamClosed := false
for {
select {
case e := <-events:
hlog.Infof("client2, %+v", e)
time.Sleep(2 * time.Second) // do something blocked
// When the event ends, you should break out of the loop.
if checkEventEnd(e) {
wg.Done()
return
}
case err := <-errChan:
if err == nil {
// err is nil means read io.EOF, stream is closed
streamClosed = true
hlog.Info("client2, stream closed")
// continue read channel events
continue
}
hlog.CtxErrorf(context.Background(), "client2, err = %s", err.Error())
wg.Done()
return
default:
if streamClosed {
hlog.Info("client2, events is empty and stream closed")
wg.Done()
return
}
}
}
}()
wg.Wait()
}
func checkEventEnd(e *sse.Event) bool {
// check e.Data or e.Event. It depends on the definition of the server
return e.Event == "end" || string(e.Data) == "end flag"
}
Real-world examples
This repository comes with two server-examples to demonstrate how to build realtime applications with server-sent event.
Stock Price (examples/server/stockprice)
A web server that push (randomly generated) stock price periodically.
- Run
exmaples/server/chat/main.go
to start server. - Send a GET request to
/price
curl -N --location 'localhost:8888/price'
#id:1681141432283
#event:AAPL
#data:92.607347
#
#id:1681141432283
#event:AMZN
#data:73.540894
#
#id:1681141433283
#event:AAPL
#data:23.536702
#
#id:1681141433283
#event:AMZN
#data:63.156229
#
Chat Server (examples/server/chat)
A chat server that push new messages to clients using server-sent events. It supports both direct and broadcast messaging.
- Run
examples/server/chat/main.go
to start server. - Send a get request to
/chat/sse
.
# receive message on behalf of user hertz
curl -N --location 'http://localhost:8888/chat/sse?username=hertz'
- Open a new terminal and send messages to hertz.
# send a broadcast message
curl --location --request POST 'http://localhost:8888/chat/broadcast?from=kitex&message=cloudwego'
# send a direct message
curl --location --request POST 'http://localhost:8888/chat/direct?from=kitex&message=hello%20hertz&to=hertz'
On the first terminal, you should see 2 messages.
curl -N --location 'http://localhost:8888/chat/sse?username=hertz'
#event:broadcast
#data:{"Type":"broadcast","From":"kitex","To":"","Message":"cloudwego","Timestamp":"2023-04-10T23:48:55.019742+08:00"}
#
#event:direct
#data:{"Type":"direct","From":"kitex","To":"hertz","Message":"hello hertz","Timestamp":"2023-04-10T23:48:56.212855+08:00"}
Benchmark Results
All benchmarks are stored for each commit, they can be viewed here:
Documentation
¶
Index ¶
- Constants
- func Encode(w io.Writer, e *Event) (err error)
- func GetLastEventID(c *app.RequestContext) string
- type Client
- func (c *Client) GetBody() []byte
- func (c *Client) GetHeaders() map[string]string
- func (c *Client) GetHertzClient() do.Doer
- func (c *Client) GetLastEventID() []byte
- func (c *Client) GetMethod() string
- func (c *Client) GetURL() string
- func (c *Client) SetBody(body []byte)
- func (c *Client) SetDisconnectCallback(fn ConnCallback)
- func (c *Client) SetEncodingBase64(encodingBase64 bool)
- func (c *Client) SetHeaders(headers map[string]string)
- func (c *Client) SetHertzClient(hertzClient do.Doer)
- func (c *Client) SetMaxBufferSize(size int)
- func (c *Client) SetMethod(method string)
- func (c *Client) SetOnConnectCallback(fn ConnCallback)
- func (c *Client) SetResponseCallback(responseCallback ResponseCallback)
- func (c *Client) SetURL(url string)
- func (c *Client) Subscribe(handler func(msg *Event), opts ...SubscribeOption) error
- func (c *Client) SubscribeWithContext(ctx context.Context, handler func(msg *Event), opts ...SubscribeOption) (err error)
- type ClientOption
- type ClientOptions
- type ConnCallback
- type Event
- type EventStreamReader
- type ResponseCallback
- type Stream
- type SubscribeOption
- type SubscribeOptions
Constants ¶
const ( ContentType = "text/event-stream" LastEventID = "Last-Event-ID" )
Variables ¶
This section is empty.
Functions ¶
func GetLastEventID ¶
func GetLastEventID(c *app.RequestContext) string
GetLastEventID retrieve Last-Event-ID header if present.
Types ¶
type Client ¶ added in v0.0.2
type Client struct {
// contains filtered or unexported fields
}
Client handles an incoming server stream
func NewClient ¶ added in v0.0.2
NewClient creates a new client Deprecated, pls use NewClientWithOptions
func NewClientWithOptions ¶ added in v0.1.0
func NewClientWithOptions(opts ...ClientOption) (*Client, error)
NewClientWithOptions creates a new Client with specified ClientOption
func (*Client) GetHeaders ¶ added in v0.0.2
GetHeaders get sse client headers Deprecated
func (*Client) GetHertzClient ¶ added in v0.0.2
GetHertzClient get sse client Deprecated
func (*Client) GetLastEventID ¶ added in v0.0.2
GetLastEventID get sse client lastEventID Deprecated If you use a Client to initiate multiple SSE requests, the results returned by GetLastEventID do not identify which SSE request belongs to
func (*Client) SetBody ¶ added in v0.0.3
SetBody set sse client request body Deprecated, set in protocol.Request and use WithRequest instead
func (*Client) SetDisconnectCallback ¶ added in v0.0.2
func (c *Client) SetDisconnectCallback(fn ConnCallback)
SetDisconnectCallback specifies the function to run when the connection disconnects
func (*Client) SetEncodingBase64 ¶ added in v0.0.2
SetEncodingBase64 set sse client whether use the base64
func (*Client) SetHeaders ¶ added in v0.0.2
SetHeaders set sse client headers Deprecated, set in protocol.Request and use WithRequest instead
func (*Client) SetHertzClient ¶ added in v0.0.2
SetHertzClient set sse client Deprecated, set in NewClientWithOptions(WithHertzClient(hCli))
func (*Client) SetMaxBufferSize ¶ added in v0.0.2
SetMaxBufferSize set sse client MaxBufferSize
func (*Client) SetMethod ¶ added in v0.0.2
SetMethod set sse client request method Deprecated, set in protocol.Request and use WithRequest instead
func (*Client) SetOnConnectCallback ¶ added in v0.0.2
func (c *Client) SetOnConnectCallback(fn ConnCallback)
SetOnConnectCallback specifies the function to run when the connection is successful
func (*Client) SetResponseCallback ¶ added in v0.0.2
func (c *Client) SetResponseCallback(responseCallback ResponseCallback)
SetResponseCallback set sse client responseCallback
func (*Client) SetURL ¶ added in v0.0.2
SetURL set sse client url Deprecated, set in protocol.Request and use WithRequest instead
func (*Client) Subscribe ¶ added in v0.0.2
func (c *Client) Subscribe(handler func(msg *Event), opts ...SubscribeOption) error
Subscribe to a data stream
func (*Client) SubscribeWithContext ¶ added in v0.0.2
func (c *Client) SubscribeWithContext(ctx context.Context, handler func(msg *Event), opts ...SubscribeOption) (err error)
SubscribeWithContext to a data stream with context
type ClientOption ¶ added in v0.1.0
type ClientOption func(*ClientOptions)
func WithHertzClient ¶ added in v0.1.0
func WithHertzClient(cli do.Doer) ClientOption
WithHertzClient specifies the underlying Hertz Client
type ClientOptions ¶ added in v0.1.0
type ClientOptions struct {
// contains filtered or unexported fields
}
type ConnCallback ¶ added in v0.0.2
ConnCallback defines a function to be called on a particular connection event
type EventStreamReader ¶ added in v0.0.2
type EventStreamReader struct {
// contains filtered or unexported fields
}
EventStreamReader scans an io.Reader looking for EventStream messages.
func NewEventStreamReader ¶ added in v0.0.2
func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader
NewEventStreamReader creates an instance of EventStreamReader.
type ResponseCallback ¶ added in v0.0.2
type ResponseCallback func(ctx context.Context, req *protocol.Request, resp *protocol.Response) error
ResponseCallback validates a response
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
func NewStream ¶
func NewStream(c *app.RequestContext) *Stream
NewStream creates a new stream for publishing Event.
func NewStreamWithWriter ¶ added in v0.0.6
func NewStreamWithWriter(c *app.RequestContext, writer network.ExtWriter) *Stream
NewStreamWithWriter creates a new stream with customize network.ExtWriter for publishing Event.
type SubscribeOption ¶ added in v0.1.0
type SubscribeOption func(*SubscribeOptions)
func WithRequest ¶ added in v0.1.0
func WithRequest(req *protocol.Request) SubscribeOption
WithRequest specifies the request sent by the Hertz Client each time Subscribe or SubscribeWithContext is performed. Request-related configuration items set via client.SetXX will not take effect when WithRequest is specified: (client.SetURL, client.SetBody, client.SetMethod, client.SetHeaders).
type SubscribeOptions ¶ added in v0.1.0
type SubscribeOptions struct {
// contains filtered or unexported fields
}
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
client/quickstart
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.
|
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. |
server/quickstart
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.
|
* Copyright 2024 CloudWeGo Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. |