gripcontrol

package module
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2023 License: MIT Imports: 8 Imported by: 2

README

go-gripcontrol

Author: Konstantin Bokarius kon@fanout.io

A GRIP library for Go.

License

go-gripcontrol is offered under the MIT license. See the LICENSE file.

Installation

go get github.com/fanout/go-gripcontrol

go-gripcontrol requires jwt-go 2.2.0 and go-pubcontrol 1.0.1. To ensure that the correct version of both of these dependencies are installed use godeps:

go get github.com/tools/godep
cd $GOPATH/src/github.com/fanout/go-gripcontrol
$GOPATH/bin/godep restore

Usage

Examples for how to publish HTTP response and HTTP stream messages to GRIP proxy endpoints via the GripPubControl class.

package main

import "github.com/fanout/go-pubcontrol"
import "github.com/fanout/go-gripcontrol"
import "encoding/base64"

func main() {
    // GripPubControl can be initialized with or without an endpoint configuration.
    // Each endpoint can include optional JWT authentication info.
    // Multiple endpoints can be included in a single configuration.

    // Initialize GripPubControl with a single endpoint:
    decodedKey, err := base64.StdEncoding.DecodeString("<key>")
    if err != nil {
        panic("Failed to base64 decode the key")
    }
    pub := gripcontrol.NewGripPubControl([]map[string]interface{} {
            map[string]interface{} {
            "control_uri": "https://api.fanout.io/realm/<realm>",
            "control_iss": "<realm>", 
            "key": decodedKey}})

    // Add new endpoints by applying an endpoint configuration:
    pub.ApplyGripConfig([]map[string]interface{} {
            map[string]interface{} { "control_uri": "<myendpoint_uri_1>" },
            map[string]interface{} { "control_uri": "<myendpoint_uri_2>" }})

    // Remove all configured endpoints:
    pub.RemoveAllClients()

    // Explicitly add an endpoint as a PubControlClient instance:
    client := pubcontrol.NewPubControlClient("<myendpoint_uri>")
    // Optionally set JWT auth: client.SetAuthJwt(<claim>, "<key>")
    // Optionally set basic auth: client.SetAuthBasic("<user>", "<password>")
    pub.AddClient(client)

    // Publish across all configured endpoints:
    err = pub.PublishHttpResponse("<channel>", "Test publish!!", "", "")
    if err != nil {
        panic("Publish failed with: " + err.Error())
    }
    err = pub.PublishHttpStream("<channel>", "Test publish!!", "", "")
    if err != nil {
        panic("Publish failed with: " + err.Error())
    }
}

Validate the Grip-Sig request header from incoming GRIP messages. This ensures that the message was sent from a valid source and is not expired. Note that when using Fanout.io the key is the realm key, and when using Pushpin the key is configurable in Pushpin's settings.

isValid := gripcontrol.ValidateSig(request.Header["Grip-Sig"][0], "<key>")

Long polling example via response headers. The client connects to a GRIP proxy over HTTP and the proxy forwards the request to the origin. The origin subscribes the client to a channel and instructs it to long poll via the response headers. Note that with the recent versions of Apache it's not possible to send a 304 response containing custom headers, in which case the response body should be used instead (next usage example below).

package main

import "github.com/fanout/go-gripcontrol"
import "net/http"

func HandleRequest(writer http.ResponseWriter, request *http.Request) {
    // Validate the Grip-Sig header:
    if !gripcontrol.ValidateSig(request.Header["Grip-Sig"][0], "<key>") {
        http.Error(writer, "GRIP authorization failed", http.StatusUnauthorized)
        return
    }

    // Create channel header containing channel information:
    channel := gripcontrol.CreateGripChannelHeader([]*gripcontrol.Channel {
            &gripcontrol.Channel{Name: "<channel>"}})

    // Instruct the client to long poll via the response headers:
    writer.Header().Set("Grip-Hold", "response")
    writer.Header().Set("Grip-Channel", channel)
    // To optionally set a timeout value in seconds:
    // writer.Header().Set("Grip-Timeout", "<timeout_value>")
}

func main() {
    http.HandleFunc("/", HandleRequest)
    http.ListenAndServe(":80", nil)
}

Long polling example via response body. The client connects to a GRIP proxy over HTTP and the proxy forwards the request to the origin. The origin subscribes the client to a channel and instructs it to long poll via the response body.

package main

import "github.com/fanout/go-gripcontrol"
import "net/http"
import "io"

func HandleRequest(writer http.ResponseWriter, request *http.Request) {
    // Validate the Grip-Sig header:
    if !gripcontrol.ValidateSig(request.Header["Grip-Sig"][0], "<key>") {
        http.Error(writer, "GRIP authorization failed", http.StatusUnauthorized)
        return
    }

    // Create channel list containing channel information:
    channel := []*gripcontrol.Channel {&gripcontrol.Channel{Name: "<channel>"}}

    // Create hold response body:
    body, err := gripcontrol.CreateHoldResponse(channel, nil, nil)
    // Or to optionally set a timeout value in seconds:
    // timeout := <timeout_value>
    // body, err := gripcontrol.CreateHoldResponse(channel, nil, &timeout)
    if err != nil {
        panic("Failed to create hold response: " + err.Error())
    }

    // Instruct the client to long poll via the response body:
    writer.Header().Set("Content-Type", "application/grip-instruct")
    io.WriteString(writer, body)
}

func main() {
    http.HandleFunc("/", HandleRequest)
    http.ListenAndServe(":80", nil)
}

WebSocket example using golang.org/x/net/websocket. A client connects to a GRIP proxy via WebSockets and the proxy forward the request to the origin. The origin accepts the connection over a WebSocket and responds with a control message indicating that the client should be subscribed to a channel. Note that in order for the GRIP proxy to properly interpret the control messages, the origin must provide a 'grip' extension in the 'Sec-WebSocket-Extensions' header.

package main

import "time"
import "net/http"
import "github.com/gorilla/websocket"
import "github.com/fanout/go-pubcontrol"
import "github.com/fanout/go-gripcontrol"

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool { return true },
}

func GripWebSocketHandler(writer http.ResponseWriter, request *http.Request) {
    // Create the WebSocket control message:
    wsControlMessage, err := gripcontrol.WebSocketControlMessage("subscribe",
            map[string]interface{} { "channel": "<channel>" })
    if err != nil {
        panic("Unable to create control message: " + err.Error())
    }

    // Ensure that the GRIP proxy processes control messages by upgrading
    // with the Sec-WebSocket-Extensions header:
    conn, _ := upgrader.Upgrade(writer, request, http.Header {
            "Sec-WebSocket-Extensions": []string {"grip; message-prefix=\"\""}})

    // Subscribe the WebSocket to a channel:
    conn.WriteMessage(1, []byte("c:" + wsControlMessage))

    // Wait 3 seconds and publish a message to the subscribed channel:
    time.Sleep(3 * time.Second)
    pub := gripcontrol.NewGripPubControl([]map[string]interface{} {
            map[string]interface{} { "control_uri": "<myendpoint_uri>" }})
    format := &gripcontrol.WebSocketMessageFormat {
            Content: []byte("Test WebSocket Publish!!") } 
    item := pubcontrol.NewItem([]pubcontrol.Formatter{format}, "", "")
    err = pub.Publish("<channel>", item)
    if err != nil {
        panic("Publish failed with: " + err.Error())
    }
}

func main() {
    http.HandleFunc("/", GripWebSocketHandler)
    http.ListenAndServe(":80", nil)
}

WebSocket over HTTP example. In this case, a client connects to a GRIP proxy via WebSockets and the GRIP proxy communicates with the origin via HTTP.

package main

import "github.com/fanout/go-gripcontrol"
import "github.com/fanout/go-pubcontrol"
import "io/ioutil"
import "net/http"
import "time"
import "io"

func HandleRequest(writer http.ResponseWriter, request *http.Request) {
    // Validate the Grip-Sig header:
    if !gripcontrol.ValidateSig(request.Header["Grip-Sig"][0], "<key>") {
        http.Error(writer, "GRIP authorization failed", http.StatusUnauthorized)
        return
    }

    // Set the headers required by the GRIP proxy:
    writer.Header().Set("Sec-WebSocket-Extensions", "grip; message-prefix=\"\"")
    writer.Header().Set("Content-Type", "application/websocket-events")

    // Decode the incoming WebSocket events:
    body, _ := ioutil.ReadAll(request.Body)
    inEvents, err := gripcontrol.DecodeWebSocketEvents(string(body))
    if err != nil {
        panic("Failed to decode WebSocket events: " + err.Error())
    }

    if inEvents[0].Type == "OPEN" {
        // Create the WebSocket control message:
        wsControlMessage, err := gripcontrol.WebSocketControlMessage("subscribe",
                map[string]interface{} { "channel": "<channel>" })
        if err != nil {
            panic("Unable to create control message: " + err.Error())
        }

        // Open the WebSocket and subscribe it to a channel:
        outEvents := []*gripcontrol.WebSocketEvent {
                &gripcontrol.WebSocketEvent { Type: "OPEN" },
                &gripcontrol.WebSocketEvent { Type: "TEXT",
                        Content: "c:" + wsControlMessage }}
        io.WriteString(writer, gripcontrol.EncodeWebSocketEvents(outEvents))

        go func() {
            // Wait 3 seconds and publish a message to the subscribed channel:
            time.Sleep(3 * time.Second)
            pub := gripcontrol.NewGripPubControl([]map[string]interface{} {
                    map[string]interface{} { "control_uri": "<myendpoint_uri>" }})
            format := &gripcontrol.WebSocketMessageFormat {
                    Content: []byte("Test WebSocket Publish!!") } 
            item := pubcontrol.NewItem([]pubcontrol.Formatter{format}, "", "")
            err = pub.Publish("<channel>", item)
            if err != nil {
                panic("Publish failed with: " + err.Error())
            }
        }()
    }
}

func main() {
    http.HandleFunc("/", HandleRequest)
    http.ListenAndServe(":80", nil)
}

Parse a GRIP URI to extract the URI, ISS, and key values. The values will be returned in a hash containing 'control_uri', 'control_iss', and 'key' keys.

config := gripcontrol.ParseGripUri(
    "http://api.fanout.io/realm/<myrealm>?iss=<myrealm>" +
    "&key=base64:<myrealmkey>")

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateGripChannelHeader

func CreateGripChannelHeader(channels []*Channel) string

Create a GRIP channel header for the specified channels. The channels parameter can be specified as a string representing the channel name, a Channel instance, or an array of Channel instances. The returned GRIP channel header is used when sending instructions to GRIP proxies via HTTP headers.

func CreateHold

func CreateHold(mode string, channels []*Channel, response interface{},
	timeout *int) (string, error)

Create GRIP hold instructions for the specified mode, channels, response and optional timeout value. The response parameter can be specified as either a string / byte array representing the response body or a Response instance.

func CreateHoldResponse

func CreateHoldResponse(channels []*Channel, response interface{},
	timeout *int) (string, error)

A convenience method for creating GRIP hold response instructions for HTTP long-polling. This method simply passes the specified parameters to the create_hold method with 'response' as the hold mode.

func CreateHoldStream

func CreateHoldStream(channels []*Channel,
	response interface{}) (string, error)

A convenience method for creating GRIP hold stream instructions for HTTP streaming. This method simply passes the specified parameters to the create_hold method with 'stream' as the hold mode.

func EncodeWebSocketEvents

func EncodeWebSocketEvents(events []*WebSocketEvent) string

Encode the specified array of WebSocketEvent instances. The returned string value should then be passed to a GRIP proxy in the body of an HTTP response when using the WebSocket-over-HTTP protocol.

func ParseGripUri

func ParseGripUri(rawUri string) (map[string]interface{}, error)

Parse the specified GRIP URI into a config object that can then be passed to the GripPubControl struct. The URI can include 'iss' and 'key' JWT authentication query parameters as well as any other required query string parameters. The JWT 'key' query parameter can be provided as-is or in base64 encoded format.

func ValidateSig

func ValidateSig(token, key string) bool

Validate the specified JWT token and key. This method is used to validate the GRIP-SIG header coming from GRIP proxies such as Pushpin or Fanout.io. Note that the token expiration is also verified.

func WebSocketControlMessage

func WebSocketControlMessage(messageType string,
	args map[string]interface{}) (string, error)

Generate a WebSocket control message with the specified type and optional arguments. WebSocket control messages are passed to GRIP proxies and example usage includes subscribing/unsubscribing a WebSocket connection to/from a channel.

Types

type Channel

type Channel struct {
	Name   string
	PrevId string
}

The Channel class is used to represent a channel in for a GRIP proxy and tracks the previous ID of the last message.

type GripFormatError

type GripFormatError struct {
	// contains filtered or unexported fields
}

An error object used to represent a GRIP formatting error.

func (GripFormatError) Error

func (e GripFormatError) Error() string

The function used to retrieve the message associated with a GripFormatError.

type GripPubControl

type GripPubControl struct {
	*pubcontrol.PubControl
}

The GripPubControl struct allows consumers to easily publish HTTP response and HTTP stream format messages to GRIP proxies. Configuring GripPubControl is slightly different from configuring PubControl in that the 'uri' and 'iss' keys in each config entry should have a 'control_' prefix. GripPubControl inherits from PubControl and therefore also provides all of the same functionality.

func NewGripPubControl

func NewGripPubControl(config []map[string]interface{}) *GripPubControl

Initialize with or without a configuration. A configuration can be applied after initialization via the apply_grip_config method.

func (*GripPubControl) ApplyGripConfig

func (gpc *GripPubControl) ApplyGripConfig(config []map[string]interface{})

Apply the specified GRIP configuration to this GripPubControl instance. The configuration object can either be a hash or an array of hashes where each hash corresponds to a single PubControlClient instance. Each hash will be parsed and a PubControlClient will be created either using just a URI or a URI and JWT authentication information.

func (*GripPubControl) PublishHttpResponse

func (gpc *GripPubControl) PublishHttpResponse(channel string,
	http_response interface{}, id, prevId string) error

Publish an HTTP response format message to all of the configured PubControlClients with a specified channel, message, and optional ID, previous ID, and callback. Note that the 'http_response' parameter can be provided as either an HttpResponseFormat instance or a string / byte array (in which case an HttpResponseFormat instance will automatically be created and have the 'body' field set to the specified value).

func (*GripPubControl) PublishHttpStream

func (gpc *GripPubControl) PublishHttpStream(channel string,
	http_stream interface{}, id, prevId string) error

Publish an HTTP stream format message to all of the configured PubControlClients with a specified channel, message, and optional ID, previous ID, and callback. Note that the 'http_stream' parameter can be provided as either an HttpStreamFormat instance or a string / byte array (in which case an HttpStreamFormat instance will automatically be created and have the 'content' field set to the specified value).

type GripPublishError

type GripPublishError struct {
	// contains filtered or unexported fields
}

An error object representing an error encountered during publishing.

func (GripPublishError) Error

func (e GripPublishError) Error() string

The function used to retrieve the message associated with a GripPublishError.

type HttpResponseFormat

type HttpResponseFormat struct {
	Code    int
	Reason  string
	Headers map[string]string
	Body    []byte
}

The HttpResponseFormat struct is the format used to publish messages to HTTP response clients connected to a GRIP proxy.

func (*HttpResponseFormat) Export

func (format *HttpResponseFormat) Export() interface{}

Export the message into the required format and include only the fields that are set. The body is exported as base64 as 'body-bin' (as opposed to 'body') if the value is a buffer.

func (*HttpResponseFormat) Name

func (format *HttpResponseFormat) Name() string

The name used when publishing this format.

type HttpStreamFormat

type HttpStreamFormat struct {
	Content []byte
	Close   bool
}

The HttpStreamFormat struct is the format used to publish messages to HTTP stream clients connected to a GRIP proxy.

func (*HttpStreamFormat) Export

func (format *HttpStreamFormat) Export() interface{}

Exports the message in the required format depending on whether the message content is binary or not, or whether the connection should be closed.

func (*HttpStreamFormat) Name

func (format *HttpStreamFormat) Name() string

The name used when publishing this format.

type Response

type Response struct {
	Code    int
	Reason  string
	Headers map[string]string
	Body    []byte
}

The Response struct is used to represent a set of HTTP response data. Populated instances of this struct are serialized to JSON and passed to the GRIP proxy in the body. The GRIP proxy then parses the message and deserialized the JSON into an HTTP response that is passed back to the client.

type WebSocketEvent

type WebSocketEvent struct {
	Type    string
	Content string
}

The WebSocketEvent struct represents WebSocket event information that is used with the GRIP WebSocket-over-HTTP protocol. It includes information about the type of event as well as an optional content field.

func DecodeWebSocketEvents

func DecodeWebSocketEvents(body string) ([]*WebSocketEvent, error)

Decode the specified HTTP request body into an array of WebSocketEvent instances when using the WebSocket-over-HTTP protocol. A RuntimeError is raised if the format is invalid.

type WebSocketMessageFormat

type WebSocketMessageFormat struct {
	Content []byte
	Binary  bool
}

The WebSocketMessageFormat struct is the format used to publish data to WebSocket clients connected to GRIP proxies.

func (*WebSocketMessageFormat) Export

func (format *WebSocketMessageFormat) Export() interface{}

Exports the message in the required format depending on whether the binary field is set to true or false.

func (*WebSocketMessageFormat) Name

func (format *WebSocketMessageFormat) Name() string

The name used when publishing this format.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL