sseread

package module
v1.0.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2024 License: Apache-2.0 Imports: 7 Imported by: 1

README

Server Sent Events Reader

GoDoc Go Report Card codecov Go version Follow mojocn

This is a straightforward library illustrating the method to read Server Sent Events (SSE) stream from the Response.Body in Golang.

Usage

download the library using go get -u github.com/mojocn/sseread@latest

simple examples of how to use the library.

  1. read SSE by callback
  2. read SSE by channel
  3. cloudflare AI text generation example

Testing

# git clone https://github.com/mojocn/sseread.git && cd sseread
go test -v

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Read

func Read(responseBody io.Reader, callback func(event *Event)) (err error)

Read reads from an io.Reader, parses the data as Server-Sent Events, and invokes the provided callback function for each event. It returns an error if any occurs during reading or parsing the events.

Example

ExampleRead is a function that demonstrates how to read Server-Sent Events (SSE) from a specific URL.

package main

import (
	"fmt"
	"net/http"
	"strings"

	"github.com/mojocn/sseread"
)

func main() {
	// Send a GET request to the specified URL.
	response, err := http.Get("https://mojotv.cn/api/sse") //API source code from https://github.com/mojocn/gptchat/blob/main/app/api/sse/route.ts
	// If an error occurs during the GET request, print the error and return from the function.
	if err != nil {
		fmt.Println(err)
		return
	}

	// Get the Content-Type header from the response and convert it to lowercase.
	ct := strings.ToLower(response.Header.Get("Content-Type"))
	// If the Content-Type is not "text/event-stream", print a message and continue.
	if !strings.Contains(ct, "text/event-stream") {
		fmt.Println("expect content-type: text/event-stream, but actual", ct)
	}

	// Ensure the response body is closed when the function returns.
	defer response.Body.Close() //don't forget to close the response body

	// Declare a slice to store the SSE messages.
	var messages []sseread.Event
	// Read the SSE messages from the response body.
	err = sseread.Read(response.Body, func(msg *sseread.Event) {
		// If the message is not nil, append it to the messages slice.
		if msg != nil {
			messages = append(messages, *msg)
		}
		// Print the ID, event type, and data of the message.
		fmt.Println(msg.ID, msg.Event, string(msg.Data))
	})
	// If an error occurs while reading the SSE messages, print the error and return from the function.
	if err != nil {
		fmt.Println(err)
		return
	}
	// Print the number of messages read.
	fmt.Printf("length of messages is %d", len(messages))
}

func ReadCh

func ReadCh(responseBody io.Reader) (messages <-chan *Event, err error)

ReadCh reads from an io.Reader, parses the data as Server-Sent Events, and sends each event on a channel. It returns the channel of events and an error if any occurs during reading or parsing the events.

Example

ExampleReadCh is a function that demonstrates how to read Server-Sent Events (SSE) from a specific URL using channels.

package main

import (
	"fmt"
	"net/http"
	"strings"

	"github.com/mojocn/sseread"
)

func main() {
	// Send a GET request to the specified URL.
	response, err := http.Get("https://mojotv.cn/api/sse") //API source code from https://github.com/mojocn/gptchat/blob/main/app/api/sse/route.ts
	// If an error occurs during the GET request, print the error and return from the function.
	if err != nil {
		fmt.Println(err)
		return
	}

	// Get the Content-Type header from the response and convert it to lowercase.
	ct := strings.ToLower(response.Header.Get("Content-Type"))
	// If the Content-Type is not "text/event-stream", print a message and continue.
	if !strings.Contains(ct, "text/event-stream") {
		fmt.Println("expect content-type: text/event-stream, but actual", ct)
	}

	// Ensure the response body is closed when the function returns.
	defer response.Body.Close() //don't forget to close the response body

	// Read the SSE messages from the response body into a channel.
	channel, err := sseread.ReadCh(response.Body)
	// If an error occurs while reading the SSE messages, print the error and return from the function.
	if err != nil {
		fmt.Println(err)
		return
	}
	// Declare a slice to store the SSE messages.
	var messages []sseread.Event
	// Loop over the channel to receive the SSE messages.
	for msg := range channel {
		// If the message is not nil, append it to the messages slice.
		if msg != nil {
			messages = append(messages, *msg)
		}
		// Print the ID, event type, and data of the message.
		fmt.Println(msg.ID, msg.Event, string(msg.Data))
	}
	// Print the number of messages read.
	fmt.Printf("length of messages is %d", len(messages))
}

Types

type CfTextGenerationArg added in v1.0.7

type CfTextGenerationArg struct {
	Stream   bool                  `json:"stream,omitempty"`
	Messages []CfTextGenerationMsg `json:"messages,omitempty"`
}

CfTextGenerationArg represents the arguments for text generation in Cloudflare AI.

type CfTextGenerationMsg added in v1.0.7

type CfTextGenerationMsg struct {
	Role    string `json:"role"`
	Content string `json:"content"`
}

CfTextGenerationMsg represents a message for text generation in Cloudflare AI.

type CfTextGenerationResponse added in v1.0.7

type CfTextGenerationResponse struct {
	Response string `json:"response"`
	P        string `json:"p"`
}

https://developers.cloudflare.com/workers-ai/models/zephyr-7b-beta-awq/#using-streaming CfTextGenerationResponse represents the response structure for text generation from the Cloudflare AI API.

type CloudflareAI added in v1.0.7

type CloudflareAI struct {
	AccountID string // AccountID is the identifier for the Cloudflare account.
	APIToken  string // APIToken is the authentication token for accessing the Cloudflare AI service.
}

CloudflareAI represents the configuration for accessing the Cloudflare AI service.

func (*CloudflareAI) Do added in v1.0.7

func (c *CloudflareAI) Do(model string, arg *CfTextGenerationArg) (*http.Response, error)

Do executes the Cloudflare AI model with the specified model and arguments. It returns the HTTP response and an error, if any.

Example
package main

import (
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"github.com/mojocn/sseread"
)

func main() {
	// Retrieve the account ID and API token from the environment variables
	accountID := os.Getenv("CF_ACCOUNT_ID")
	apiToken := os.Getenv("CF_API_TOKEN")

	cf := &sseread.CloudflareAI{
		AccountID: accountID,
		APIToken:  apiToken,
	}

	// Send the POST request
	response, err := cf.Do("@cf/meta/llama-2-7b-chat-fp8b", &sseread.CfTextGenerationArg{
		Stream: true,
		Messages: []sseread.CfTextGenerationMsg{
			{Role: "system", Content: "You are a chatbot."},
			{Role: "user", Content: "What is your name?"},
		}})
	if err != nil {
		fmt.Println(err)
		return
	}

	// Ensure the response body is closed after the function returns
	defer response.Body.Close()

	// Check the response status code
	if response.StatusCode != http.StatusOK {
		all, err := io.ReadAll(response.Body)
		if err != nil {
			fmt.Println(err)
			return
		}
		log.Fatal(string(all))
		return
	}

	// Read the response body as Server-Sent Events
	channel, err := sseread.ReadCh(response.Body)
	if err != nil {
		fmt.Println(err)
		return
	}

	// Initialize an empty string to store the full text of the responses
	fulltext := ""

	// Iterate over the events from the channel
	for event := range channel {
		if event == nil || event.IsSkip() {
			continue
		}

		// Parse the JSON object from the event data
		e := new(sseread.CfTextGenerationResponse)
		err := json.Unmarshal(event.Data, e)
		if err != nil {
			log.Fatal(err, string(event.Data))
		} else {
			// Append the response to the fulltext string
			fulltext += e.Response
		}
	}

	// Log the full text of the responses
	fmt.Println(fulltext)
}

type Event

type Event struct {
	ID    string          // ID is the unique identifier for the event.
	Retry uint            // Retry is the number of times the event should be retried.
	Event string          // Event is the type of the event.
	Data  json.RawMessage // Data is the raw JSON data associated with the event. Or just convert it to string.
}

Event represents a Server-Sent Event. It contains fields for the event ID, retry count, event type, and event data.

func (*Event) IsSkip added in v1.0.6

func (e *Event) IsSkip() bool

IsSkip is a method of the Event struct that checks if the event should be skipped. It returns true if the Data field of the event is empty, null, undefined, or "[DONE]". Otherwise, it returns false.

func (*Event) ParseEventLine

func (e *Event) ParseEventLine(lineType string, lineData []byte)

ParseEventLine is a method of the Event struct that parses an event field based on the event type. It takes an event type and event data as input, and updates the corresponding field in the Event struct.

type EventLineParser

type EventLineParser interface {
	// ParseEventLine is a method that takes an event type and event data as input and parses the event field.
	ParseEventLine(lineType string, lineData []byte)
}

EventLineParser is an interface that defines a method for parsing event fields.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL