Documentation ¶
Overview ¶
Package signalflow contains a SignalFx SignalFlow client, which can be used to execute analytics jobs against the SignalFx backend.
The client currently only supports the execute request. Not all SignalFlow messages are handled at this time, and some will be silently dropped.
The client will automatically attempt to reconnect to the backend if the connection is broken. There is a 5 second delay between retries.
Example ¶
c, err := NewClient( StreamURLForRealm("us1"), AccessToken("MY_ORG_ACCESS_TOKEN")) if err != nil { log.Printf("Error creating client: %v", err) return } comp, err := c.Execute(&ExecuteRequest{ Program: "data('cpu.utilization').publish()", }) if err != nil { log.Printf("Could not send execute request: %v", err) return } fmt.Printf("Resolution: %v\n", comp.Resolution()) fmt.Printf("Max Delay: %v\n", comp.MaxDelay()) fmt.Printf("Detected Lag: %v\n", comp.Lag()) for msg := range comp.Data() { // This will run as long as there is data, or until the websocket gets // disconnected. If a websocket error occurs, the job will NOT be // automatically restarted. if len(msg.Payloads) == 0 { fmt.Printf("\rNo data available") continue } for _, pl := range msg.Payloads { meta := comp.TSIDMetadata(pl.TSID) fmt.Printf("%s %v: %v\n", meta.OriginatingMetric, meta.CustomProperties, pl.Value()) } fmt.Println("") } err = comp.Err() if err != nil { log.Printf("Error: %v", comp.Err()) } else { log.Printf("Job completed") }
Output:
Index ¶
- Variables
- type AuthRequest
- type AuthType
- type Channel
- type Client
- type ClientParam
- func AccessToken(token string) ClientParam
- func MetadataTimeout(timeout time.Duration) ClientParam
- func ReadTimeout(timeout time.Duration) ClientParam
- func StreamURL(streamEndpoint string) ClientParam
- func StreamURLForRealm(realm string) ClientParam
- func UserAgent(userAgent string) ClientParam
- func WriteTimeout(timeout time.Duration) ClientParam
- type Computation
- func (c *Computation) Channel() *Channel
- func (c *Computation) Data() <-chan *messages.DataMessage
- func (c *Computation) Done() <-chan struct{}
- func (c *Computation) Err() error
- func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage
- func (c *Computation) Handle() string
- func (c *Computation) IsFinished() bool
- func (c *Computation) Lag() time.Duration
- func (c *Computation) MaxDelay() time.Duration
- func (c *Computation) Resolution() time.Duration
- func (c *Computation) Stop() error
- func (c *Computation) StopWithReason(reason string) error
- func (c *Computation) TSIDMetadata(tsid idtool.ID) *messages.MetadataProperties
- type DetachRequest
- type DetachType
- type ExecuteRequest
- type ExecuteType
- type FakeBackend
- func (f *FakeBackend) AddProgramError(program string, errorMsg string)
- func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)
- func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)
- func (f *FakeBackend) Client() (*Client, error)
- func (f *FakeBackend) KillExistingConnections()
- func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)
- func (f *FakeBackend) Restart()
- func (f *FakeBackend) RunningJobsForProgram(program string) int
- func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)
- func (f *FakeBackend) Start()
- func (f *FakeBackend) Stop()
- func (f *FakeBackend) URL() string
- type StopRequest
- type StopType
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ReconnectDelay = 5 * time.Second
How long to wait between connections in case of a bad connection.
Functions ¶
This section is empty.
Types ¶
type AuthRequest ¶
type Channel ¶
Channel is a queue of messages that all pertain to the same computation.
func (*Channel) AcceptMessage ¶
AcceptMessage from a websocket. This might block if nothing is reading from the channel but generally a computation should always be doing so.
type Client ¶
Client for SignalFlow via websockets (SSE is not currently supported).
func NewClient ¶
func NewClient(options ...ClientParam) (*Client, error)
NewClient makes a new SignalFlow client that will immediately try and connect to the SignalFlow backend.
func (*Client) Close ¶
func (c *Client) Close()
Close the client and shutdown any ongoing connections and goroutines. The client cannot be reused after Close.
func (*Client) Execute ¶
func (c *Client) Execute(req *ExecuteRequest) (*Computation, error)
Execute a SignalFlow job and return a channel upon which informational messages and data will flow.
func (*Client) Stop ¶
func (c *Client) Stop(req *StopRequest) error
Stop sends a job stop request message to the backend. It does not wait for jobs to actually be stopped.
type ClientParam ¶
ClientParam is the common type of configuration functions for the SignalFlow client
func AccessToken ¶
func AccessToken(token string) ClientParam
AccessToken can be used to provide a SignalFx organization access token or user access token to the SignalFlow client.
func MetadataTimeout ¶
func MetadataTimeout(timeout time.Duration) ClientParam
MetadataTimeout is the default amount of time that calls to metadata accessors on a SignalFlow Computation instance will wait to receive the metadata from the backend before failing and returning a zero value. Usually metadata comes in very quickly from the stream after the job start.
func ReadTimeout ¶
func ReadTimeout(timeout time.Duration) ClientParam
ReadTimeout sets the duration to wait between messages that come on the websocket. If the resolution of the job is very low, this should be increased.
func StreamURL ¶
func StreamURL(streamEndpoint string) ClientParam
StreamURL lets you set the full URL to the stream endpoint, including the path.
func StreamURLForRealm ¶
func StreamURLForRealm(realm string) ClientParam
StreamURLForRealm can be used to configure the websocket url for a specific SignalFx realm.
func UserAgent ¶
func UserAgent(userAgent string) ClientParam
UserAgent allows setting the `userAgent` field when authenticating to SignalFlow. This can be useful for accounting how many jobs are started from each client.
func WriteTimeout ¶
func WriteTimeout(timeout time.Duration) ClientParam
WriteTimeout sets the maximum duration to wait to send a single message when writing messages to the SignalFlow server over the WebSocket connection.
type Computation ¶
type Computation struct { // The timeout to wait for metadata when a metadata access function is // called. This will default to what is set on the client, but can be // overridden by changing this field directly. MetadataTimeout time.Duration // contains filtered or unexported fields }
Computation is a single running SignalFlow job
func (*Computation) Channel ¶
func (c *Computation) Channel() *Channel
Channel returns the underlying Channel instance used by this computation.
func (*Computation) Data ¶
func (c *Computation) Data() <-chan *messages.DataMessage
Data returns the channel on which data messages come.
func (*Computation) Done ¶
func (c *Computation) Done() <-chan struct{}
Done passes through the computation context's Done channel for use in select statements to know when the computation is finished or an error occurred.
func (*Computation) Err ¶
func (c *Computation) Err() error
Err returns the last fatal error that caused the computation to stop, if any. Will be nil if the computation stopped in an expected manner.
func (*Computation) Expirations ¶ added in v1.6.2
func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage
Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series that are no longer valid for this computation.
func (*Computation) IsFinished ¶
func (c *Computation) IsFinished() bool
IsFinished returns true if the computation is done and no more data should be expected from it.
func (*Computation) Lag ¶
func (c *Computation) Lag() time.Duration
Lag detected for the job. This will wait for a short while for the lag message to come on the websocket, but will return 0 after a timeout if it does not come.
func (*Computation) MaxDelay ¶
func (c *Computation) MaxDelay() time.Duration
MaxDelay detected of the job. This will wait for a short while for the max delay message to come on the websocket, but will return 0 after a timeout if it does not come.
func (*Computation) Resolution ¶
func (c *Computation) Resolution() time.Duration
Resolution of the job. This will wait for a short while for the resolution message to come on the websocket, but will return 0 after a timeout if it does not come.
func (*Computation) StopWithReason ¶
func (c *Computation) StopWithReason(reason string) error
StopWithReason stops the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel.
func (*Computation) TSIDMetadata ¶
func (c *Computation) TSIDMetadata(tsid idtool.ID) *messages.MetadataProperties
TSIDMetadata for a particular tsid. This will wait for a short while for the tsid metadata message to come on the websocket, but will return nil after a timeout if it does not come.
type DetachRequest ¶
type DetachRequest struct { Type DetachType `json:"type"` Channel string `json:"channel"` Reason string `json:"reason"` }
type DetachType ¶
type DetachType string
func (DetachType) MarshalJSON ¶
func (DetachType) MarshalJSON() ([]byte, error)
type ExecuteRequest ¶
type ExecuteRequest struct { Type ExecuteType `json:"type"` Program string `json:"program"` Channel string `json:"channel"` Start time.Time `json:"-"` Stop time.Time `json:"-"` Resolution time.Duration `json:"-"` MaxDelay time.Duration `json:"-"` StartMs int64 `json:"start"` StopMs int64 `json:"stop"` ResolutionMs int64 `json:"resolution"` MaxDelayMs int64 `json:"maxDelay"` Immediate bool `json:"immediate"` Timezone string `json:"timezone"` }
func (ExecuteRequest) MarshalJSON ¶
func (er ExecuteRequest) MarshalJSON() ([]byte, error)
MarshalJSON does some assignments to allow using more native Go types for time/duration.
type ExecuteType ¶
type ExecuteType string
func (ExecuteType) MarshalJSON ¶
func (ExecuteType) MarshalJSON() ([]byte, error)
type FakeBackend ¶ added in v1.6.2
FakeBackend is useful for testing, both internal to this package and externally. It supports basic messages and allows for the specification of metadata and data messages that map to a particular program.
func NewRunningFakeBackend ¶ added in v1.6.2
func NewRunningFakeBackend() *FakeBackend
func (*FakeBackend) AddProgramError ¶ added in v1.6.2
func (f *FakeBackend) AddProgramError(program string, errorMsg string)
func (*FakeBackend) AddProgramTSIDs ¶ added in v1.6.2
func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)
func (*FakeBackend) AddTSIDMetadata ¶ added in v1.6.2
func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)
func (*FakeBackend) Client ¶ added in v1.6.2
func (f *FakeBackend) Client() (*Client, error)
func (*FakeBackend) KillExistingConnections ¶ added in v1.6.2
func (f *FakeBackend) KillExistingConnections()
func (*FakeBackend) RemoveTSIDData ¶ added in v1.6.2
func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)
func (*FakeBackend) Restart ¶ added in v1.6.2
func (f *FakeBackend) Restart()
func (*FakeBackend) RunningJobsForProgram ¶ added in v1.6.2
func (f *FakeBackend) RunningJobsForProgram(program string) int
RunningJobsForProgram returns how many currently executing jobs there are for a particular program text.
func (*FakeBackend) ServeHTTP ¶ added in v1.6.2
func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*FakeBackend) SetTSIDFloatData ¶ added in v1.6.2
func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)
func (*FakeBackend) Start ¶ added in v1.6.2
func (f *FakeBackend) Start()
func (*FakeBackend) Stop ¶ added in v1.6.2
func (f *FakeBackend) Stop()
func (*FakeBackend) URL ¶ added in v1.6.2
func (f *FakeBackend) URL() string