realtime

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2023 License: AGPL-3.0 Imports: 16 Imported by: 1

README

sense/realtime

This is kind of implemented as its own API and is even less stable than [sense](..). The [asyncapi.yaml] is an untested but nearly complete description of the Sense real-time API. I would have preferred to use a code generator, similar to how I did in the internal/client, but the code generators for AsyncAPI are not mature and I couldn't get them to produce anything useful. If you're reading this from a timeline in which they are useful, consider splitting this package into an internal/realtime (or integrate it with internal/client) and possibly merge this package or a higher-level version of it with sense.

Documentation

Overview

Package realtime implements the unofficial and unsupported Sense real-time API.

The Sense real-time API is a WebSocket API that provides real-time updates about the state of your Sense monitor. It is used by the Sense mobile app and the Sense web app.

WARNING: Sense does not provide a supported API. This package may stop working without notice.

The current implementation of this appears to be reasonably complete, however there may be a few fields with interface{} types that could be better investsigated.

The first incoming messages appear to follow this pattern:

1. Hello 2. MonitorInfo 3. DataChange 4. DeviceStates 5. RealtimeUpdate

After this, you'll get RealtimeUpdate messages once a second, with an occasional DataChange message.

Example
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net/http"
	"strings"

	"github.com/dnesting/sense/internal/senseutil"
	"github.com/dnesting/sense/realtime"
	"github.com/dnesting/sense/senseauth"
	"nhooyr.io/websocket"
)

type msg = senseutil.WSMsg

type mockTransport struct {
	roundTrip func(req *http.Request) (*http.Response, error)
}

func (t *mockTransport) RoundTrip(req *http.Request) (*http.Response, error) {
	return t.roundTrip(req)
}

func mockForExample(cl *realtime.Client, auth *senseauth.Config) {
	realtime.SetDebug(log.Default())
	ch := make(chan msg, 1)
	dialer := &senseutil.MockWSDialer{Ch: ch}
	cl.Dialer = dialer
	go func() {
		ch <- msg{T: websocket.MessageText, D: `{"type":"hello"}`, E: nil}
		ch <- msg{T: websocket.MessageText, D: `{"type":"realtime_update","payload":{"W": 590.4}}`, E: nil}
		ch <- msg{T: websocket.MessageText, D: `{"type":"realtime_update","payload":{"W": 591.4}}`, E: nil}
		ch <- msg{T: websocket.MessageText, D: `{"type":"realtime_update","payload":{"W": 592.4}}`, E: nil}
		ch <- msg{T: websocket.MessageText, D: `{"type":"realtime_update","payload":{"W": 593.4}}`, E: nil}
		close(ch)
	}()

	auth.HttpClient = &http.Client{
		Transport: &mockTransport{
			roundTrip: func(req *http.Request) (*http.Response, error) {
				return &http.Response{
					StatusCode: http.StatusOK,
					Header:     http.Header{"Content-Type": []string{"application/json"}},
					Body:       io.NopCloser(strings.NewReader(`{"access_token":"fake-token"}`)),
				}, nil
			},
		},
	}

}

func main() {
	var client realtime.Client
	ctx := context.Background()

	// authenticate
	auth := senseauth.DefaultConfig
	mockForExample(&client, &auth)

	tok, _, err := auth.PasswordCredentialsToken(ctx,
		senseauth.PasswordCredentials{
			Email:    "user@example.com",
			Password: "pass",
		})
	if err != nil {
		log.Fatal(err)
	}
	client.TokenSrc = auth.TokenSource(tok)

	// start a stream and collect 3 data points
	stopAfter := 3
	err = client.Stream(ctx, 123, func(_ context.Context, msg realtime.Message) error {
		switch msg := msg.(type) {
		case *realtime.Hello:
			fmt.Println("We're online!")
		case *realtime.RealtimeUpdate:
			fmt.Printf("Power consumption is now: %.1f W\n", msg.W)
			stopAfter--
			if stopAfter == 0 {
				return realtime.Stop
			}
		}
		return nil
	})
	if err != nil {
		log.Fatal(err)
	}

}
Output:

We're online!
Power consumption is now: 590.4 W
Power consumption is now: 591.4 W
Power consumption is now: 592.4 W

Index

Examples

Constants

This section is empty.

Variables

View Source
var Stop = errors.New("stop sentinel")

Stop is a sentinel error that can be returned from a callback to stop the stream.

Functions

func SetDebug

func SetDebug(l *log.Logger)

SetDebug enables debug logging using the given logger. Set to nil to disable.

Types

type Callback

type Callback func(context.Context, Message) error

Callback is called for each message received during a Stream call. Use [Message.GetType] or a type assertion to determine the type of the message.

type Client

type Client struct {
	BaseUrl    string
	Origin     string
	HttpClient *http.Client
	DeviceID   string
	TokenSrc   oauth2.TokenSource
	Dialer     Dialer
}

Client is a real-time data client for the Sense API. All fields are optional, and will be populated with default values if not provided.

The [sense.Client] has a [sense.Client.Stream] method that uses this client behind the scenes; you shouldn't need to instantiate one directly.

If you do want to use it directly, you'll need a oauth2.TokenSource, such as the one generated by the github.com/dnesting/sense/senseauth package.

func (*Client) Stream

func (c *Client) Stream(ctx context.Context, monitorID int, callback Callback) error

Stream opens a websocket connection to the given monitor and calls the callback for each message received. The callback can return the sentinel error Stop to stop the stream.

type Conn

type Conn interface {
	Read(ctx context.Context) (websocket.MessageType, []byte, error)
	Close(websocket.StatusCode, string) error
}

Conn is the method set we use to interact with a websocket. It is used for testing.

type DataChange

type DataChange struct {
	DeviceDataChecksum      string `json:"device_data_checksum"`
	MonitorOverviewChecksum string `json:"monitor_overview_checksum"`
	PartnerChecksum         string `json:"partner_checksum"`
	PendingEvents           struct {
		Type string `json:"type"`
		Goal struct {
			Guid           string      `json:"guid"`
			NotificationID interface{} `json:"notification_id"`
			Timestamp      interface{} `json:"timestamp"`
		} `json:"goal"`
		NewDeviceFound struct {
			DeviceID  interface{} `json:"device_id"`
			Guid      string      `json:"guid"`
			Timestamp interface{} `json:"timestamp"`
		} `json:"new_device_found"`
	}
	SettingsVersion int `json:"settings_version"`
	UserVersion     int `json:"user_version"`
}

DataChange is a message sent by the server when some specific pieces of data have changed. These are likely used to signal web or mobile clients of the need to refresh their data.

func (DataChange) GetType

func (d DataChange) GetType() string

type Delta

type Delta struct {
	Frame      int     `json:"frame"`
	Channel    int     `json:"channel"`
	StartFrame int     `json:"start_frame"`
	W          float32 `json:"w"`
}

type Device

type Device struct {
	Attrs interface{}            `json:"attrs"`
	Icon  string                 `json:"icon"`
	ID    string                 `json:"id"`
	Name  string                 `json:"name"`
	Tags  map[string]interface{} `json:"tags"`
	W     float32                `json:"w"`
}

type DeviceState

type DeviceState struct {
	DeviceID string `json:"device_id"`
	Mode     string `json:"mode"`
	State    string `json:"state"`
}

type DeviceStates

type DeviceStates struct {
	States     []DeviceState `json:"states"`
	UpdateType string        `json:"update_type"`
}

DeviceStates is sent by the server shortly after connecting and provides the current "online" state of all devices.

func (DeviceStates) GetType

func (d DeviceStates) GetType() string

type Dialer

type Dialer interface {
	Dial(ctx context.Context, url string, opts *websocket.DialOptions) (Conn, *http.Response, error)
}

Dialer is the method set we use to connect to the websockets API. It is used for testing.

type Hello

type Hello struct {
	Online bool `json:"online"`
}

Hello is the first message sent by the server after connecting.

func (Hello) GetType

func (h Hello) GetType() string

type Message

type Message interface {
	GetType() string
}

Message is a generic interface for all messages sent by the server. The GetType method will return a string indicating which type of message it is.

type MonitorInfo

type MonitorInfo struct {
	Features string `json:"features"`
}

MonitorInfo is sent by the server after connecting and returns a CSV string.

func (MonitorInfo) GetType

func (m MonitorInfo) GetType() string

type NewTimelineEvent

type NewTimelineEvent struct {
	ItemsAdded   []TimelineEvent `json:"items_added"`
	ItemsRemoved []TimelineEvent `json:"items_removed"`
	ItemsUpdated []TimelineEvent `json:"items_updated"`
	UserID       int             `json:"user_id"`
}

func (NewTimelineEvent) GetType

func (n NewTimelineEvent) GetType() string

type RealtimeUpdate

type RealtimeUpdate struct {
	C int `json:"c"`

	// Appears to be the wattage reading for each of the monitor's sensors.
	Channels []float32 `json:"channels"`
	// This appears to be the same as W but as an integer.
	DW          int     `json:"d_w"`
	DefaultCost float32 `json:"default_cost"`
	// Deltas are usually missing, but when they are present they appear to
	// contain the difference in the W value between this update and the previous.
	Deltas []Delta `json:"deltas"`
	// Devices contains details about the current consumption of known devices.
	Devices []Device `json:"devices"`
	// Epoch appears to be the Unix time_t of the start of the stream.
	Epoch int `json:"epoch"`
	// Frame appears to be a counter that increases between updates, seemingly 30 per update.
	Frame int     `json:"frame"`
	GridW float32 `json:"grid_w"`
	// This appears to be the AC frequency in Hz.
	Hz        float32 `json:"hz"`
	PowerFlow struct {
		Grid []string `json:"grid"`
	} `json:"power_flow"`
	// Appears to be the AC voltage reading on each of the monitor's sensors.
	Voltage []float32 `json:"voltage"`
	// W appears to be the total wattage observed being consumed by the monitor.
	W float32 `json:"w"`
	// These appear to be Unix time timestamps, with subsecond precision.  Possibly
	// used to gauge the latency of the stream servers.
	Stats struct {
		Brcv float32
		Mrcv float32
		Msnd float32
	}
}

RealtimeUpdate is the message periodically sent by the server with the current state of the monitor and all known devices.

func (*RealtimeUpdate) GetType

func (r *RealtimeUpdate) GetType() string

type TimelineEvent

type TimelineEvent struct {
	AllowSticky               bool                     `json:"allow_sticky"`
	Body                      string                   `json:"body"`
	BodyArgs                  []map[string]interface{} `json:"body_args"`
	BodyKey                   string                   `json:"body_key"`
	Destination               string                   `json:"destination"`
	DeviceID                  string                   `json:"device_id"`
	DeviceState               string                   `json:"device_state"`
	DeviceTransitionFromState string                   `json:"device_transition_from_state"`
	GUID                      string                   `json:"guid"`
	Icon                      string                   `json:"icon"`
	MonitorID                 int                      `json:"monitor_id"`
	ShowAction                bool                     `json:"show_action"`
	Time                      *time.Time               `json:"time"`
	Type                      string                   `json:"type"`
	UserDeviceType            string                   `json:"user_device_type"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL