signalflow

package
v2.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

Package signalflow contains a SignalFx SignalFlow client, which can be used to execute analytics jobs against the SignalFx backend.

Not all SignalFlow messages are handled at this time, and some will be silently dropped. All of the most important and useful ones are supported though.

The client will automatically attempt to reconnect to the backend if the connection is broken after a short delay.

SignalFlow is documented at https://dev.splunk.com/observability/docs/signalflow/messages.

Example
package main

import (
	"context"
	"flag"
	"log"
	"os"
	"time"

	"github.com/signalfx/signalflow-client-go/v2/signalflow"
)

func main() {
	var (
		realm       string
		accessToken string
		program     string
		duration    time.Duration
	)

	flag.StringVar(&realm, "realm", "", "SignalFx Realm")
	flag.StringVar(&accessToken, "access-token", "", "SignalFx Org Access Token")
	flag.StringVar(&program, "program", "data('cpu.utilization').count().publish()", "The SignalFlow program to execute")
	flag.DurationVar(&duration, "duration", 30*time.Second, "How long to run the job before sending Stop message")
	flag.Parse()

	if realm == "" || accessToken == "" {
		flag.Usage()
		os.Exit(1)
	}

	timer := time.After(duration)

	c, err := signalflow.NewClient(
		signalflow.StreamURLForRealm(realm),
		signalflow.AccessToken(accessToken),
		signalflow.OnError(func(err error) {
			log.Printf("Error in SignalFlow client: %v\n", err)
		}))
	if err != nil {
		log.Printf("Error creating client: %v\n", err)
		return
	}
	defer c.Close()

	log.Printf("Executing program for %v: %s\n", duration, program)
	comp, err := c.Execute(context.Background(), &signalflow.ExecuteRequest{
		Program: program,
	})
	if err != nil {
		log.Printf("Could not send execute request: %v\n", err)
		return
	}

	go func() {
		<-timer
		if err := comp.Stop(context.Background()); err != nil {
			log.Printf("Failed to stop computation: %v\n", err)
		}
	}()

	// If you want to limit how long to wait for the metadata to come in
	// you can use a timeout context.
	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()
	resolution, err := comp.Resolution(ctx)
	log.Printf("Resolution: %v (err: %v)\n", resolution, err)
	maxDelay, err := comp.MaxDelay(ctx)
	log.Printf("Max Delay: %v (err: %v)\n", maxDelay, err)
	lag, err := comp.Lag(ctx)
	log.Printf("Detected Lag: %v (err: %v)\n", lag, err)

	go func() {
		for msg := range comp.Expirations() {
			log.Printf("Got expiration notice for TSID %s\n", msg.TSID)
		}
	}()

	go func() {
		for msg := range comp.Info() {
			log.Printf("Got info message %s\n", msg.MessageBlock.ContentsRaw)
		}
	}()

	for msg := range comp.Data() {
		// This will run as long as there is data, or until the websocket gets
		// disconnected.
		if len(msg.Payloads) == 0 {
			log.Println("No data available")
			continue
		}
		for _, pl := range msg.Payloads {
			ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
			meta, err := comp.TSIDMetadata(ctx, pl.TSID)
			cancel()
			if err != nil {
				log.Printf("Failed to get metadata for tsid %s: %v\n", pl.TSID, err)
				continue
			}
			log.Printf("%s (%s) %v %v: %v\n", meta.OriginatingMetric, meta.Metric, meta.CustomProperties, meta.InternalProperties, pl.Value())
		}
	}

	err = comp.Err()
	if err != nil {
		log.Printf("Job error: %v", comp.Err())
		return
	}
	log.Println("Job completed")
}
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMetadataTimeout = errors.New("metadata value did not come in time")

Functions

This section is empty.

Types

type AuthRequest

type AuthRequest struct {
	// This should not be set manually.
	Type AuthType `json:"type"`
	// The Auth token for the org
	Token     string `json:"token"`
	UserAgent string `json:"userAgent,omitempty"`
}

type AuthType

type AuthType string

func (AuthType) MarshalJSON

func (at AuthType) MarshalJSON() ([]byte, error)

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client for SignalFlow via websockets (SSE is not currently supported).

func NewClient

func NewClient(options ...ClientParam) (*Client, error)

NewClient makes a new SignalFlow client that will immediately try and connect to the SignalFlow backend.

func (*Client) Close

func (c *Client) Close()

Close the client and shutdown any ongoing connections and goroutines. The client cannot be reused after Close. Calling any of the client methods after Close() is undefined and will likely result in a panic.

func (*Client) Execute

func (c *Client) Execute(ctx context.Context, req *ExecuteRequest) (*Computation, error)

Execute a SignalFlow job and return a channel upon which informational messages and data will flow. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-a-computation

func (*Client) Stop

func (c *Client) Stop(ctx context.Context, req *StopRequest) error

Stop sends a job stop request message to the backend. It does not wait for jobs to actually be stopped. See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Stop-a-computation

type ClientParam

type ClientParam func(*Client) error

ClientParam is the common type of configuration functions for the SignalFlow client

func AccessToken

func AccessToken(token string) ClientParam

AccessToken can be used to provide a SignalFx organization access token or user access token to the SignalFlow client.

func OnError

func OnError(f OnErrorFunc) ClientParam

func ReadTimeout

func ReadTimeout(timeout time.Duration) ClientParam

ReadTimeout sets the duration to wait between messages that come on the websocket. If the resolution of the job is very low, this should be increased.

func StreamURL

func StreamURL(streamEndpoint string) ClientParam

StreamURL lets you set the full URL to the stream endpoint, including the path.

func StreamURLForRealm

func StreamURLForRealm(realm string) ClientParam

StreamURLForRealm can be used to configure the websocket url for a specific SignalFx realm.

func UserAgent

func UserAgent(userAgent string) ClientParam

UserAgent allows setting the `userAgent` field when authenticating to SignalFlow. This can be useful for accounting how many jobs are started from each client.

func WriteTimeout

func WriteTimeout(timeout time.Duration) ClientParam

WriteTimeout sets the maximum duration to wait to send a single message when writing messages to the SignalFlow server over the WebSocket connection.

type Computation

type Computation struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Computation is a single running SignalFlow job

func (*Computation) Data

func (c *Computation) Data() <-chan *messages.DataMessage

Data returns the channel on which data messages come. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.

func (*Computation) Detach

func (c *Computation) Detach(ctx context.Context) error

Detach the computation on the backend

func (*Computation) DetachWithReason

func (c *Computation) DetachWithReason(ctx context.Context, reason string) error

DetachWithReason detaches the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel

func (*Computation) Err

func (c *Computation) Err() error

Err returns the last fatal error that caused the computation to stop, if any. Will be nil if the computation stopped in an expected manner.

func (*Computation) Events

func (c *Computation) Events() <-chan *messages.EventMessage

Events returns a channel that receives event/alert messages from the signalflow computation.

func (*Computation) Expirations

func (c *Computation) Expirations() <-chan *messages.ExpiredTSIDMessage

Expirations returns a channel that will be sent messages about expired TSIDs, i.e. time series that are no longer valid for this computation. This channel will be closed when the computation is finished. To prevent goroutine leaks, you should read all messages from this channel until it is closed.

func (*Computation) GroupByMissingProperties

func (c *Computation) GroupByMissingProperties(ctx context.Context) ([]string, error)

GroupByMissingProperties are timeseries that don't contain the required dimensions. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Handle

func (c *Computation) Handle(ctx context.Context) (string, error)

Handle of the computation. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Info

func (c *Computation) Info() <-chan *messages.InfoMessage

Info returns a channel that receives info messages from the signalflow computation.

func (*Computation) Lag

func (c *Computation) Lag(ctx context.Context) (time.Duration, error)

Lag detected for the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) LimitSize

func (c *Computation) LimitSize(ctx context.Context) (int, error)

LimitSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MatchedNoTimeseriesQuery

func (c *Computation) MatchedNoTimeseriesQuery(ctx context.Context) (string, error)

MatchedNoTimeseriesQuery if it matched no active timeseries. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MatchedSize

func (c *Computation) MatchedSize(ctx context.Context) (int, error)

MatchedSize detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) MaxDelay

func (c *Computation) MaxDelay(ctx context.Context) (time.Duration, error)

MaxDelay detected of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Resolution

func (c *Computation) Resolution(ctx context.Context) (time.Duration, error)

Resolution of the job. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

func (*Computation) Stop

func (c *Computation) Stop(ctx context.Context) error

Stop the computation on the backend.

func (*Computation) StopWithReason

func (c *Computation) StopWithReason(ctx context.Context, reason string) error

StopWithReason stops the computation with a given reason. This reason will be reflected in the control message that signals the end of the job/channel.

func (*Computation) TSIDMetadata

func (c *Computation) TSIDMetadata(ctx context.Context, tsid idtool.ID) (*messages.MetadataProperties, error)

TSIDMetadata for a particular tsid. Will wait as long as the given ctx is not closed. If ctx is closed an error will be returned.

type ComputationError

type ComputationError struct {
	Code      int
	Message   string
	ErrorType string
}

ComputationError exposes the underlying metadata of a computation error

func (*ComputationError) Error

func (e *ComputationError) Error() string

type DetachRequest

type DetachRequest struct {
	// This should not be set manually
	Type    DetachType `json:"type"`
	Channel string     `json:"channel"`
	Reason  string     `json:"reason"`
}

type DetachType

type DetachType string

func (DetachType) MarshalJSON

func (DetachType) MarshalJSON() ([]byte, error)

type ExecuteRequest

type ExecuteRequest struct {
	// This should not be set manually
	Type         ExecuteType   `json:"type"`
	Program      string        `json:"program"`
	Channel      string        `json:"channel"`
	Start        time.Time     `json:"-"`
	Stop         time.Time     `json:"-"`
	Resolution   time.Duration `json:"-"`
	MaxDelay     time.Duration `json:"-"`
	StartMs      int64         `json:"start"`
	StopMs       int64         `json:"stop"`
	ResolutionMs int64         `json:"resolution"`
	MaxDelayMs   int64         `json:"maxDelay"`
	Immediate    bool          `json:"immediate"`
	Timezone     string        `json:"timezone"`
}

See https://dev.splunk.com/observability/docs/signalflow/messages/websocket_request_messages#Execute-message-properties for details on the fields.

func (ExecuteRequest) MarshalJSON

func (er ExecuteRequest) MarshalJSON() ([]byte, error)

MarshalJSON does some assignments to allow using more native Go types for time/duration.

type ExecuteType

type ExecuteType string

func (ExecuteType) MarshalJSON

func (ExecuteType) MarshalJSON() ([]byte, error)

type FakeBackend

type FakeBackend struct {
	sync.Mutex

	AccessToken string
	// contains filtered or unexported fields
}

FakeBackend is useful for testing, both internal to this package and externally. It supports basic messages and allows for the specification of metadata and data messages that map to a particular program.

func NewRunningFakeBackend

func NewRunningFakeBackend() *FakeBackend

func (*FakeBackend) AddProgramError

func (f *FakeBackend) AddProgramError(program string, errorMsg string)

func (*FakeBackend) AddProgramTSIDs

func (f *FakeBackend) AddProgramTSIDs(program string, tsids []idtool.ID)

func (*FakeBackend) AddTSIDMetadata

func (f *FakeBackend) AddTSIDMetadata(tsid idtool.ID, props *messages.MetadataProperties)

func (*FakeBackend) Client

func (f *FakeBackend) Client() (*Client, error)

func (*FakeBackend) KillExistingConnections

func (f *FakeBackend) KillExistingConnections()

func (*FakeBackend) RemoveTSIDData

func (f *FakeBackend) RemoveTSIDData(tsid idtool.ID)

func (*FakeBackend) Restart

func (f *FakeBackend) Restart()

func (*FakeBackend) RunningJobsForProgram

func (f *FakeBackend) RunningJobsForProgram(program string) int

RunningJobsForProgram returns how many currently executing jobs there are for a particular program text.

func (*FakeBackend) ServeHTTP

func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*FakeBackend) SetLogger

func (f *FakeBackend) SetLogger(logger *log.Logger)

SetLogger sets the internal logger.

func (*FakeBackend) SetTSIDFloatData

func (f *FakeBackend) SetTSIDFloatData(tsid idtool.ID, val float64)

func (*FakeBackend) Start

func (f *FakeBackend) Start()

func (*FakeBackend) Stop

func (f *FakeBackend) Stop()

func (*FakeBackend) URL

func (f *FakeBackend) URL() string

type OnErrorFunc

type OnErrorFunc func(err error)

type StopRequest

type StopRequest struct {
	// This should not be set manually
	Type   StopType `json:"type"`
	Handle string   `json:"handle"`
	Reason string   `json:"reason"`
}

type StopType

type StopType string

func (StopType) MarshalJSON

func (StopType) MarshalJSON() ([]byte, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL