datastore

package
v0.0.0-...-9714d35 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2023 License: AGPL-3.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const TS_MERGE_N_RECORDS_MAX = 10000000

number of big-record input records, affect memory usage

View Source
const TS_N_WORKERS_MAX = 10

number of workers per observatory

View Source
const TS_ROWLENGTH_MAX = 2000
View Source
const TS_ROWLENGTH_MIN = 1000

number of data points in a full timeseries row Changing these values requires rebuilding the timeseries table!

View Source
const TS_UPDATE_SINCE_MAX = 600 * time.Second

max duration between updateTimeseries calls

Variables

View Source
var MSGCACHE_FIELDS = [2]string{"status", "weather"}
Message

avoid using a map to support concurrent access to the cache

Functions

func GetRawsPG

func GetRawsPG(db *sql.DB, cancel <-chan struct{}, observatories []string, startTime, endTime *time.Time, idFrom uint64) (chan *RecordIn, int)

GetRawsPG selects records from the raw table with (id > idFrom) or (tdata in a time window), unmarshals the containing records, and returns a result channel from which these can be received by a concurrent goproc.

func GetTimeseriesPG

func GetTimeseriesPG(db *sql.DB, observatories []string, fields []string, x0, x1 int64) (map[string]map[string]Timeseries, error)

GetTimeseriesPG retrieves the timeseries for a list of observatories and fields.

func PutRawsBulkPG

func PutRawsBulkPG(db *sql.DB, cancel chan struct{}, recs chan *RecordIn, nrecs int)

PutRawsBulkPG receives records from the recs channel and inserts them into the rawdata table after doing some cleanup:

  1. It translates varnames to the new "_" delimiter from the old ":"
  2. It skips data arrays that contain the UFT-8 "replacement character" as this is not recognized as valid UFT-8 in postgres
  3. It skips empty (heartbeat) records

Types

type Cache

type Cache struct {
	// contains filtered or unexported fields
}

type Chain

type Chain struct {
	// contains filtered or unexported fields
}

type ClientStat

type ClientStat struct {
	RecvTime int64  `json:"recvTime" bson:"updateTime"` //last seen time
	Version  string `json:"version,omitempty" bson:"version"`
}

type DataArr

type DataArr [][2]interface{}

func (DataArr) Compress

func (da DataArr) Compress(npoints int) (DataArr, error)

wrapper around rebin and deduplicate methods

func (DataArr) Deduplicate

func (da DataArr) Deduplicate(npoints int) (DataArr, error)

drop some of the repeating values

func (DataArr) Len

func (da DataArr) Len() int

implement the sort.Interface

func (DataArr) Less

func (da DataArr) Less(i, j int) bool

func (DataArr) MergeFromRight

func (a DataArr) MergeFromRight(b DataArr) DataArr

Merge b into a, growing a to the required capacity if necessary This function is the DataArr analog to Timeseries.MergeFromRight.

func (DataArr) Rebin

func (da DataArr) Rebin(npoints int) (DataArr, error)

rebin data on a regular grid

func (DataArr) Swap

func (da DataArr) Swap(i, j int)

func (DataArr) ToTimeseries

func (da DataArr) ToTimeseries() (ts Timeseries)

convert DataArr to Timeseries

type DataStore

type DataStore struct {
	// contains filtered or unexported fields
}

func NewDataStore

func NewDataStore(pgSession *sql.DB, schemafile string, observatories []string) *DataStore

func (DataStore) GetDataHistories

func (ds DataStore) GetDataHistories(observatories []string, fields []string, startTime *time.Time, endTime *time.Time, nDatapoints int) (map[string]map[string]DataArr, error)

func (DataStore) PutRecord

func (ds DataStore) PutRecord(facility, clientname, clientip, version string, trecv int64, data map[string]DataArr, body []byte) (string, error)

func (*DataStore) RawGetHandle

func (ds *DataStore) RawGetHandle(w http.ResponseWriter, r *http.Request, ps httprouter.Params)

func (*DataStore) RawPostHandle

func (ds *DataStore) RawPostHandle(uc *userapi.UserCache) httprouter.Handle

func (DataStore) Replayer

func (ds DataStore) Replayer(replay replayapi.Job)

execute replay job

func (*DataStore) SnapshotGetHandle

func (ds *DataStore) SnapshotGetHandle(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

func (DataStore) StartWatching

func (ds DataStore) StartWatching(dir string, nstart int)

StartWatching uses inotify to see new files appear and disappear and schedules a file with a short delay to allow for file I/O operations to finish. The event handler catches inotify errors and restarts the watcher if necessary.

func (*DataStore) TimeseriesDownloadHandle

func (ds *DataStore) TimeseriesDownloadHandle(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

func (*DataStore) TimeseriesHandle

func (ds *DataStore) TimeseriesHandle(w http.ResponseWriter, r *http.Request, _ httprouter.Params)

func (*DataStore) TimeseriesPatchHandle

func (ds *DataStore) TimeseriesPatchHandle(w http.ResponseWriter, r *http.Request, ps httprouter.Params)

SnapshotPatchHandle triggers updating the timeseries table

type Dataflat

type Dataflat map[string][2]interface{}

Data

type Header struct {
	RecvTime      time.Time `json:"recvTime,omitempty" bson:"recvTime,omitempty"`
	Username      string    `json:"username,omitempty" bson:",omitempty"`
	Facility      string    `json:"facility,omitempty" bson:",omitempty"`
	ClientVersion string    `json:"clientVersion,omitempty" bson:"clientVersion,omitempty"`
	// contains filtered or unexported fields
}

type HeaderArr

type HeaderArr [3]Header

Header

type MessagePoint

type MessagePoint struct {
	RecvTime time.Time `json:"recvTime,omitempty"`
	Username string    `json:"username,omitempty"`
	Content  string    `json:"content,omitempty"`
}

type MsgCache

type MsgCache [2]MessagePoint // see MSGCACHE_FIELDS

type Node

type Node struct {
	// contains filtered or unexported fields
}

type RecordIn

type RecordIn struct {
	Id     uint64             `json:"-" bson:",omitempty"`
	Header Header             `json:"header"`
	Data   map[string]DataArr `json:"data,omitempty" bson:",omitempty"`
}

func (*RecordIn) FlatData

func (rec *RecordIn) FlatData() Dataflat

Only the last element of each array

type StatusRecord

type StatusRecord struct {
	Clients  map[string]ClientStat   `json:"clients,omitempty" bson:"lastUpdate"`
	Data     Dataflat                `json:"data,omitempty"`
	Messages map[string]MessagePoint `json:"messages,omitempty"`
}

type Timeseries

type Timeseries struct {
	// contains filtered or unexported fields
}

Timeseries contains the x and y coordinates of the data points in a timeseries in two separate arrays.

func (Timeseries) MergeFromRight

func (a Timeseries) MergeFromRight(b Timeseries) (ts Timeseries)

Merge b into a, growing a to the required capacity if necessary This function is the Timeseries analog of DataArr.MergeFromRight.

type VexfileInfo

type VexfileInfo struct {
	Filename string `json:"filename"`
	Checksum uint32 `json:"checksum"`
	Startvex string `json:"startvex"`
	Stopvex  string `json:"stopvex"`
	ReadTime int64  `json:"readtime"` // support incremental updates
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL