signalflow

package module
v2.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 20 Imported by: 1

README

SignalFx SignalFlow Go Client

This is a client for SignalFlow that lets you stream and analyze metric data in real-time for your organization.

[!WARNING]
github.com/signalfx/signalfx-go/signalflow/v2 package is deprecated. Use github.com/signalfx/signalflow-client-go/v2/signalflow instead.

Installation

You must use Go 1.19+ for the v2 of this client.

go get github.com/signalfx/signalfx-go/signalflow/v2@latest

If you do not want to upgrade Go you can use the v1 version of the old client that supports older Go versions:

go get github.com/signalfx/signalfx-go@v1.30.0

Usage

The package must be imported like so:

import (
  "github.com/signalfx/signalfx-go/signalflow/v2"
)

See ./example/main.go for an example of how to use the client.

SignalFlow itself is documented at https://dev.splunk.com/observability/docs/signalflow/messages.

Migration from v1

If you previously used v1 of this module, you can migrate to v2 by doing the following:

  • You must use Go 1.19+.

  • Remove any uses of the MetadataTimeout client option. This has been replaced by the addition of a ctx argument on all of the metadata getters (see below).

  • Add a context.Context as the first argument to any of the Computation metadata getter methods. You can control how long to wait for metadata by using a context with a timeout or cancel. See the example.

  • Add a context.Context as the first argument to each of the Client SignalFlow method calls. This context can be used to cancel the calls in case of connection trouble. Previously these calls could hang indefinitely.

  • Remove references to the Computation.Done() method, which returned a channel that would be closed when the computation was finished. You can know if the computation is finished based on when the Data channel is closed.

  • Computation.Events was removed entirely as it wasn't implemented correctly. Reach out if you have a desire for it.

Documentation

Overview

Package signalflow contains a SignalFx SignalFlow client, which can be used to execute analytics jobs against the SignalFx backend.

Not all SignalFlow messages are handled at this time, and some will be silently dropped. All of the most important and useful ones are supported though.

The client will automatically attempt to reconnect to the backend if the connection is broken after a short delay.

SignalFlow is documented at https://dev.splunk.com/observability/docs/signalflow/messages.

Deprecated: Use github.com/signalfx/signalflow-client-go/v2/signalflow instead.

Index

Constants

This section is empty.

Variables

View Source
var ErrMetadataTimeout = errors.New("metadata value did not come in time")

Functions

This section is empty.

Types

type AuthRequest

type AuthRequest struct {
	// This should not be set manually.
	Type AuthType `json:"type"`
	// The Auth token for the org
	Token     string `json:"token"`
	UserAgent string `json:"userAgent,omitempty"`
}

type AuthType

type AuthType string

func (AuthType) MarshalJSON

func (at AuthType) MarshalJSON() ([]byte, error)

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client for SignalFlow via websockets (SSE is not currently supported).

func NewClient

func NewClient(options ...ClientParam) (*Client, error)

NewClient makes a new SignalFlow client that will immediately try and connect to the SignalFlow backend.

func (*Client) Close

func (c *Client) Close()

Close the client and shutdown any ongoing connections and goroutines. The client cannot be reused after Close. Calling any of the client methods after Close() is undefined and will likely result in a panic.

func (*Client) Execute

func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*Computation, error)

Execute a SignalFlow job and return a channel upon which informational messages and data will flow. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-a-computation

func (*Client) Stop

func (c *Client) Stop(ctx context.Context, req *StopRequest) error

Stop sends a job stop request message to the backend. It does not wait for jobs to actually be stopped. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Stop-a-computation

type ClientParam

type ClientParam func(*Client) error

ClientParam is the common type of configuration functions for the SignalFlow client

func AccessToken

func AccessToken(token string) ClientParam

AccessToken can be used to provide a SignalFx organization access token or user access token to the SignalFlow client.

func OnError

func OnError(f OnErrorFunc) ClientParam

func ReadTimeout

func ReadTimeout(timeout time.Duration) ClientParam

ReadTimeout sets the duration to wait between messages that come on the websocket. If the resolution of the job is very low, this should be increased.

func StreamURL

func StreamURL(streamEndpoint string) ClientParam

StreamURL lets you set the full URL to the stream endpoint, including the path.

func StreamURLForRealm

func StreamURLForRealm(realm string) ClientParam

StreamURLForRealm can be used to configure the websocket url for a specific SignalFx realm.

func UserAgent

func UserAgent(userAgent string) ClientParam

UserAgent allows setting the `userAgent` field when authenticating to SignalFlow. This can be useful for accounting how many jobs are started from each client.

func WriteTimeout

func WriteTimeout(timeout time.Duration) ClientParam

WriteTimeout sets the maximum duration to wait to send a single message when writing messages to the SignalFlow server over the WebSocket connection.

type Computation

type Computation struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Computation is a single running SignalFlow job

func (*Computation) Data

func (c *Computation) Data() <-chan *messages.DataMessage

Data returns the channel on which data messages come. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.

func (*Computation) Detach

func (c *Computation) Detach(ctx context.Context) error

Detach the computation on the backend

func (*Computation) DetachWithReason

func (c *Computation) DetachWithReason(ctx context.Context, reason string) error

DetachWithReason detaches the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel

func (*Computation) Err

func (c *Computation) Err() error

Err returns the last fatal error that caused the computation to stop, if any. Will be nil if the computation stopped in an expected manner.

func (*Computation) Events added in v2.1.0

func (c *Computation) Events() <-chan *messages.EventMessage

Events returns a channel that receives event/alert messages from the signalflow computation.

func (*Computation) Expirations

func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage

Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series that are no longer valid for this computation. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.

func (*Computation) GroupByMissingProperties

func (c *Computation) GroupByMissingProperties(ctx context.Context) ([]string, error)

GroupByMissingProperties are timeseries that don't contain the required dimensions. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Handle

func (c *Computation) Handle(ctx context.Context) (string, error)

Handle of the computation. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Info added in v2.2.0

func (c *Computation) Info() <-chan *messages.InfoMessage

Info returns a channel that receives info messages from the signalflow computation.

func (*Computation) Lag

func (c *Computation) Lag(ctx context.Context) (time.Duration, error)

Lag detected for the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) LimitSize

func (c *Computation) LimitSize(ctx context.Context) (int, error)

LimitSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MatchedNoTimeseriesQuery

func (c *Computation) MatchedNoTimeseriesQuery(ctx context.Context) (string, error)

MatchedNoTimeseriesQuery if it matched no active timeseries. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MatchedSize

func (c *Computation) MatchedSize(ctx context.Context) (int, error)

MatchedSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MaxDelay

func (c *Computation) MaxDelay(ctx context.Context) (time.Duration, error)

MaxDelay detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Resolution

func (c *Computation) Resolution(ctx context.Context) (time.Duration, error)

Resolution of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Stop

func (c *Computation) Stop(ctx context.Context) error

Stop the computation on the backend.

func (*Computation) StopWithReason

func (c *Computation) StopWithReason(ctx context.Context, reason string) error

StopWithReason stops the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel.

func (*Computation) TSIDMetadata

func (c *Computation) TSIDMetadata(ctx context.Context, tsid idtool.ID) (*messages.MetadataProperties, error)

TSIDMetadata for a particular tsid. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

type ComputationError

type ComputationError struct {
	Code      int
	Message   string
	ErrorType string
}

ComputationError exposes the underlying metadata of a computation error

func (*ComputationError) Error

func (e *ComputationError) Error() string

type DetachRequest

type DetachRequest struct {
	// This should not be set manually
	Type    DetachType `json:"type"`
	Channel string     `json:"channel"`
	Reason  string     `json:"reason"`
}

type DetachType

type DetachType string

func (DetachType) MarshalJSON

func (DetachType) MarshalJSON() ([]byte, error)

type ExecuteRequest

type ExecuteRequest struct {
	// This should not be set manually
	Type         ExecuteType   `json:"type"`
	Program      string        `json:"program"`
	Channel      string        `json:"channel"`
	Start        time.Time     `json:"-"`
	Stop         time.Time     `json:"-"`
	Resolution   time.Duration `json:"-"`
	MaxDelay     time.Duration `json:"-"`
	StartMs      int64         `json:"start"`
	StopMs       int64         `json:"stop"`
	ResolutionMs int64         `json:"resolution"`
	MaxDelayMs   int64         `json:"maxDelay"`
	Immediate    bool          `json:"immediate"`
	Timezone     string        `json:"timezone"`
}

See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-message-properties for details on the fields.

func (ExecuteRequest) MarshalJSON

func (er ExecuteRequest) MarshalJSON() ([]byte, error)

MarshalJSON does some assignments to allow using more native Go types for time/duration.

type ExecuteType

type ExecuteType string

func (ExecuteType) MarshalJSON

func (ExecuteType) MarshalJSON() ([]byte, error)

type FakeBackend

type FakeBackend struct {
	sync.Mutex

	AccessToken string
	// contains filtered or unexported fields
}

FakeBackend is useful for testing, both internal to this package and externally. It supports basic messages and allows for the specification of metadata and data messages that map to a particular program.

func NewRunningFakeBackend

func NewRunningFakeBackend() *FakeBackend

func (*FakeBackend) AddProgramError

func (f *FakeBackend) AddProgramError(program string, errorMsg string)

func (*FakeBackend) AddProgramTSIDs

func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)

func (*FakeBackend) AddTSIDMetadata

func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)

func (*FakeBackend) Client

func (f *FakeBackend) Client() (*Client, error)

func (*FakeBackend) KillExistingConnections

func (f *FakeBackend) KillExistingConnections()

func (*FakeBackend) RemoveTSIDData

func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)

func (*FakeBackend) Restart

func (f *FakeBackend) Restart()

func (*FakeBackend) RunningJobsForProgram

func (f *FakeBackend) RunningJobsForProgram(program string) int

RunningJobsForProgram returns how many currently executing jobs there are for a particular program text.

func (*FakeBackend) ServeHTTP

func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*FakeBackend) SetTSIDFloatData

func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)

func (*FakeBackend) Start

func (f *FakeBackend) Start()

func (*FakeBackend) Stop

func (f *FakeBackend) Stop()

func (*FakeBackend) URL

func (f *FakeBackend) URL() string

type OnErrorFunc

type OnErrorFunc func(err error)

type StopRequest

type StopRequest struct {
	// This should not be set manually
	Type   StopType `json:"type"`
	Handle string   `json:"handle"`
	Reason string   `json:"reason"`
}

type StopType

type StopType string

func (StopType) MarshalJSON

func (StopType) MarshalJSON() ([]byte, error)

Directories

Path Synopsis
package main shows a basic usage pattern of the SiganlFlow client.
package main shows a basic usage pattern of the SiganlFlow client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL