kapacitor

package module
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2016 License: MIT Imports: 36 Imported by: 0

README

Kapacitor Circle CI

Open source framework for processing, monitoring, and alerting on time series data

Installation

Kapacitor has two binaries:

  • kapacitor – a CLI program for calling the Kapacitor API.
  • kapacitord – the Kapacitor server daemon.

You can either download the binaries directly from the downloads page or go get them:

go get github.com/influxdata/kapacitor/cmd/kapacitor
go get github.com/influxdata/kapacitor/cmd/kapacitord

Configuration

An example configuration file can be found here

Kapacitor can also provide an example config for you using this command:

kapacitord config

Getting Started

This README gives you a high level overview of what Kapacitor is and what its like to use it. As well as some details of how it works. To get started using Kapacitor see this guide.

Basic Example

Kapacitor use a DSL named TICKscript to define tasks.

A simple TICKscript that alerts on high cpu usage looks like this:

stream
    .from().measurement('cpu_usage_idle')
    .groupBy('host')
    .window()
        .period(1m)
        .every(1m)
    .mapReduce(influxql.mean('value'))
    .eval(lambda: 100.0 - "mean")
        .as('used')
    .alert()
        .message('{{ .Level}}: {{ .Name }}/{{ index .Tags "host" }} has high cpu usage: {{ index .Fields "used" }}')
        .warn(lambda: "used" > 70.0)
        .crit(lambda: "used" > 85.0)

        // Send alert to hander of choice.

        // Slack
        .slack()
        .channel('#alerts')

        // VictorOps
        .victorOps()
        .routingKey('team_rocket')

        // PagerDuty
        .pagerDuty()

Place the above script into a file cpu_alert.tick then run these commands to start the task:

# Define the task (assumes cpu data is in db 'telegraf')
kapacitor define \
    -name cpu_alert \
    -type stream \
    -dbrp telegraf.default \
    -tick ./cpu_alert.tick
# Start the task
kapacitor enable cpu_alert

For more complete examples see the documentation

Documentation

Overview

A data pipeline processing engine.

See the README for more complete examples and guides.

Code Organization:

The pipeline package provides an API for how nodes can be connected to form a pipeline. The individual implementations of each node exist in this kapacitor package. The reason for the separation is to keep the exported API from the pipeline package clean as it is consumed via the TICKscripts (a DSL for Kapacitor).

Other Concepts:

Stream vs Batch -- Use of the word 'stream' indicates data arrives a single data point at a time. Use of the word 'batch' indicates data arrives in sets or batches or data points.

Task -- A task represents a concrete workload to perform. It consists of a pipeline and an identifying name. Basic CRUD operations can be performed on tasks.

Task Master -- Responsible for executing a task in a specific environment.

Replay -- Replays static datasets against tasks.

Index

Constants

View Source
const (
	// List of names for top-level exported vars
	ClusterIDVarName = "cluster_id"
	ServerIDVarName  = "server_id"
	HostVarName      = "host"
	ProductVarName   = "product"
	VersionVarName   = "version"

	NumTasksVarName         = "num_tasks"
	NumEnabledTasksVarName  = "num_enabled_tasks"
	NumSubscriptionsVarName = "num_subscriptions"

	UptimeVarName = "uptime"

	// The name of the product
	Product = "kapacitor"
)

Variables

View Source
var (
	// Global expvars
	NumTasks         = &expvar.Int{}
	NumEnabledTasks  = &expvar.Int{}
	NumSubscriptions = &expvar.Int{}
)
View Source
var ErrAborted = errors.New("edged aborted")
View Source
var ErrTaskMasterClosed = errors.New("TaskMaster is closed")
View Source
var ErrTaskMasterOpen = errors.New("TaskMaster is open")
View Source
var ErrUDFProcessStopped = errors.New("process already stopped")
View Source
var ErrWrongTaskType = errors.New("wrong task type")

Functions

func ConvertResultTimes added in v0.10.1

func ConvertResultTimes(r *Result)

func CreateDBRPMap

func CreateDBRPMap(dbrps []DBRP) map[DBRP]bool

func EvalPredicate

func EvalPredicate(se *tick.StatefulExpr, now time.Time, fields models.Fields, tags models.Tags) (bool, error)

Evaluate a given expression as a boolean predicate against a set of fields and tags

func GetFloatVar

func GetFloatVar(name string) float64

Gets an exported var and returns its float value

func GetIntVar

func GetIntVar(name string) int64

Gets an exported var and returns its int value

func GetStringVar

func GetStringVar(name string) string

Gets an exported var and returns its unquoted string contents

func NewStatistics

func NewStatistics(name string, tags map[string]string) *expvar.Map

NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, and values are placed at the key "values". The "values" map is returned so that statistics can be set.

func Uptime added in v0.2.0

func Uptime() time.Duration

func WriteBatchForRecording

func WriteBatchForRecording(w io.Writer, b models.Batch) error

func WritePointForRecording

func WritePointForRecording(w io.Writer, p models.Point, precision string) error

Types

type AlertData

type AlertData struct {
	ID      string          `json:"id"`
	Message string          `json:"message"`
	Details string          `json:"details"`
	Time    time.Time       `json:"time"`
	Level   AlertLevel      `json:"level"`
	Data    influxql.Result `json:"data"`
}

type AlertHandler

type AlertHandler func(ad *AlertData)

type AlertLevel

type AlertLevel int
const (
	OKAlert AlertLevel = iota
	InfoAlert
	WarnAlert
	CritAlert
)

func (AlertLevel) MarshalText

func (l AlertLevel) MarshalText() ([]byte, error)

func (AlertLevel) String

func (l AlertLevel) String() string

func (*AlertLevel) UnmarshalText added in v0.10.1

func (l *AlertLevel) UnmarshalText(text []byte) error

type AlertNode

type AlertNode struct {
	// contains filtered or unexported fields
}

func (*AlertNode) Err

func (n *AlertNode) Err() error

type BatchCollector

type BatchCollector interface {
	CollectBatch(models.Batch) error
	Close()
}

type BatchNode

type BatchNode struct {
	// contains filtered or unexported fields
}

func (*BatchNode) Abort added in v0.10.1

func (b *BatchNode) Abort()

func (*BatchNode) DBRPs

func (b *BatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*BatchNode) Err

func (n *BatchNode) Err() error

func (*BatchNode) Queries

func (b *BatchNode) Queries(start, stop time.Time) []string

func (*BatchNode) Start

func (b *BatchNode) Start()

type DBRP

type DBRP struct {
	Database        string `json:"db"`
	RetentionPolicy string `json:"rp"`
}

func (DBRP) String

func (d DBRP) String() string

type DerivativeNode

type DerivativeNode struct {
	// contains filtered or unexported fields
}

func (*DerivativeNode) Err

func (n *DerivativeNode) Err() error

type Edge

type Edge struct {
	// contains filtered or unexported fields
}

func (*Edge) Abort added in v0.2.1

func (e *Edge) Abort()

Abort all next and collect calls. Items in flight may or may not be processed.

func (*Edge) Close

func (e *Edge) Close()

Close the edge, this can only be called after all collect calls to the edge have finished.

func (*Edge) CollectBatch

func (e *Edge) CollectBatch(b models.Batch) error

func (*Edge) CollectMaps

func (e *Edge) CollectMaps(m *MapResult) error

func (*Edge) CollectPoint

func (e *Edge) CollectPoint(p models.Point) error

func (*Edge) Next

func (e *Edge) Next() (p models.PointInterface, ok bool)

func (*Edge) NextBatch

func (e *Edge) NextBatch() (b models.Batch, ok bool)

func (*Edge) NextMaps

func (e *Edge) NextMaps() (m *MapResult, ok bool)

func (*Edge) NextPoint

func (e *Edge) NextPoint() (p models.Point, ok bool)

type EvalNode

type EvalNode struct {
	// contains filtered or unexported fields
}

func (*EvalNode) Err

func (n *EvalNode) Err() error

type ExecutingTask

type ExecutingTask struct {
	Task *Task
	// contains filtered or unexported fields
}

A task that is ready for execution.

func NewExecutingTask

func NewExecutingTask(tm *TaskMaster, t *Task) (*ExecutingTask, error)

Create a new task from a defined kapacitor.

func (*ExecutingTask) BatchCount

func (et *ExecutingTask) BatchCount() (int, error)

func (*ExecutingTask) BatchQueries

func (et *ExecutingTask) BatchQueries(start, stop time.Time) ([][]string, error)

Get the next `num` batch queries that the batcher will run starting at time `start`.

func (*ExecutingTask) EDot added in v0.2.3

func (et *ExecutingTask) EDot() []byte

Return a graphviz .dot formatted byte array. Label edges with relavant execution information.

func (*ExecutingTask) Err

func (et *ExecutingTask) Err() error

Wait till the task finishes and return any error

func (*ExecutingTask) GetOutput

func (et *ExecutingTask) GetOutput(name string) (Output, error)

Get a named output.

func (*ExecutingTask) Snapshot added in v0.10.0

func (et *ExecutingTask) Snapshot() (*TaskSnapshot, error)

func (*ExecutingTask) StartBatching

func (et *ExecutingTask) StartBatching() error

Instruct source batch node to start querying and sending batches of data

type GroupByNode

type GroupByNode struct {
	// contains filtered or unexported fields
}

func (*GroupByNode) Err

func (n *GroupByNode) Err() error

type HTTPOutNode

type HTTPOutNode struct {
	// contains filtered or unexported fields
}

func (*HTTPOutNode) Endpoint

func (h *HTTPOutNode) Endpoint() string

func (*HTTPOutNode) Err

func (n *HTTPOutNode) Err() error

type InfluxDBOutNode

type InfluxDBOutNode struct {
	// contains filtered or unexported fields
}

func (*InfluxDBOutNode) Err

func (n *InfluxDBOutNode) Err() error

type JoinNode

type JoinNode struct {
	// contains filtered or unexported fields
}

func (*JoinNode) Err

func (n *JoinNode) Err() error

type LogService

type LogService interface {
	NewLogger(prefix string, flag int) *log.Logger
}

type MapFunc

type MapFunc func(in *tsdb.MapInput) interface{}

type MapInfo

type MapInfo struct {
	Field string
	Func  MapFunc
}

type MapNode

type MapNode struct {
	// contains filtered or unexported fields
}

func (*MapNode) Err

func (n *MapNode) Err() error

type MapResult

type MapResult struct {
	Name  string
	Group models.GroupID
	Dims  []string
	Tags  map[string]string
	TMax  time.Time
	Outs  []interface{}
}

type Node

type Node interface {
	pipeline.Node

	// wait for the node to finish processing and return any errors
	Err() error
	// contains filtered or unexported methods
}

A node that can be in an executor.

type Output

type Output interface {
	Endpoint() string
}

An output of a pipeline. Still need to improve this interface to expose different types of outputs.

type Query

type Query struct {
	// contains filtered or unexported fields
}

func NewQuery

func NewQuery(q string) (*Query, error)

func (*Query) DBRPs

func (q *Query) DBRPs() ([]DBRP, error)

Return the db rp pairs of the query

func (*Query) Dimensions

func (q *Query) Dimensions(dims []interface{}) error

Set the dimensions on the query

func (*Query) Fill

func (q *Query) Fill(option influxql.FillOption, value interface{})

func (*Query) Start

func (q *Query) Start(s time.Time)

Set the start time of the query

func (*Query) Stop

func (q *Query) Stop(s time.Time)

Set the stop time of the query

func (*Query) String

func (q *Query) String() string

type ReduceFunc

type ReduceFunc func(in []interface{}, tmax time.Time, useTMax bool, as string) interface{}

type ReduceNode

type ReduceNode struct {
	// contains filtered or unexported fields
}

func (*ReduceNode) Err

func (n *ReduceNode) Err() error

type Replay

type Replay struct {
	Setter clock.Setter
	// contains filtered or unexported fields
}

Replay engine that can replay static data sets against a specific executor and its tasks.

func NewReplay

func NewReplay(c clock.Clock) *Replay

Create a new replay engine.

func (*Replay) ReplayBatch

func (r *Replay) ReplayBatch(data []io.ReadCloser, batches []BatchCollector, recTime bool) <-chan error

Replay a data set against an executor. If source time is true then the replay will use the times stored in the recording instead of the clock time.

func (*Replay) ReplayStream

func (r *Replay) ReplayStream(data io.ReadCloser, stream StreamCollector, recTime bool, precision string) <-chan error

Replay a data set against an executor.

type Result

type Result influxql.Result

The result from an output.

func ResultFromJSON

func ResultFromJSON(in io.Reader) (r Result)

Unmarshal a Result object from JSON.

type SampleNode

type SampleNode struct {
	// contains filtered or unexported fields
}

func (*SampleNode) Err

func (n *SampleNode) Err() error

type SourceBatchNode

type SourceBatchNode struct {
	// contains filtered or unexported fields
}

func (*SourceBatchNode) Abort added in v0.10.1

func (s *SourceBatchNode) Abort()

func (*SourceBatchNode) Count

func (s *SourceBatchNode) Count() int

func (*SourceBatchNode) DBRPs

func (s *SourceBatchNode) DBRPs() ([]DBRP, error)

Return list of databases and retention policies the batcher will query.

func (*SourceBatchNode) Err

func (s *SourceBatchNode) Err() error

func (*SourceBatchNode) Queries

func (s *SourceBatchNode) Queries(start, stop time.Time) [][]string

func (*SourceBatchNode) Start

func (s *SourceBatchNode) Start()

type StatsData added in v0.2.1

type StatsData struct {
	Name   string                 `json:"name"`
	Tags   map[string]string      `json:"tags"`
	Values map[string]interface{} `json:"values"`
}

func GetStatsData

func GetStatsData() ([]StatsData, error)

Return all stats data from the expvars.

type StatsNode added in v0.10.0

type StatsNode struct {
	// contains filtered or unexported fields
}

func (*StatsNode) Err added in v0.10.0

func (n *StatsNode) Err() error

type StreamCollector

type StreamCollector interface {
	CollectPoint(models.Point) error
	Close()
}

type StreamNode

type StreamNode struct {
	// contains filtered or unexported fields
}

func (*StreamNode) Err

func (n *StreamNode) Err() error

type Task

type Task struct {
	Name             string
	Pipeline         *pipeline.Pipeline
	Type             TaskType
	DBRPs            []DBRP
	SnapshotInterval time.Duration
}

The complete definition of a task, its name, pipeline and type.

func (*Task) Dot

func (t *Task) Dot() []byte

type TaskMaster

type TaskMaster struct {
	HTTPDService interface {
		AddRoutes([]httpd.Route) error
		DelRoutes([]httpd.Route)
		URL() string
	}
	TaskStore interface {
		SaveSnapshot(name string, snapshot *TaskSnapshot) error
		HasSnapshot(name string) bool
		LoadSnapshot(name string) (*TaskSnapshot, error)
	}
	DeadmanService pipeline.DeadmanService

	UDFService UDFService

	InfluxDBService interface {
		NewClient() (*client.Client, error)
	}
	SMTPService interface {
		Global() bool
		SendMail(to []string, subject string, msg string) error
	}
	OpsGenieService interface {
		Global() bool
		Alert(teams []string, recipients []string, messageType, message, entityID string, t time.Time, details interface{}) error
	}
	VictorOpsService interface {
		Global() bool
		Alert(routingKey, messageType, message, entityID string, t time.Time, extra interface{}) error
	}
	PagerDutyService interface {
		Global() bool
		Alert(incidentKey, desc string, details interface{}) error
	}
	SlackService interface {
		Global() bool
		Alert(channel, message string, level AlertLevel) error
	}
	HipChatService interface {
		Global() bool
		Alert(room, token, message string, level AlertLevel) error
	}
	AlertaService interface {
		Alert(token, resource, event, environment, severity, status, group, value, message, origin string, data interface{}) error
	}
	SensuService interface {
		Alert(name, output string, level AlertLevel) error
	}
	TalkService interface {
		Alert(title, text string) error
	}
	LogService LogService
	// contains filtered or unexported fields
}

An execution framework for a set of tasks.

func NewTaskMaster

func NewTaskMaster(l LogService) *TaskMaster

Create a new Executor with a given clock.

func (*TaskMaster) BatchCollectors

func (tm *TaskMaster) BatchCollectors(name string) []BatchCollector

func (*TaskMaster) Close

func (tm *TaskMaster) Close() error

func (*TaskMaster) CreateTICKScope added in v0.10.0

func (tm *TaskMaster) CreateTICKScope() *tick.Scope

func (*TaskMaster) DelFork

func (tm *TaskMaster) DelFork(name string)

func (*TaskMaster) Drain added in v0.2.1

func (tm *TaskMaster) Drain()

func (*TaskMaster) ExecutingDot added in v0.2.3

func (tm *TaskMaster) ExecutingDot(name string) string

func (*TaskMaster) IsExecuting added in v0.2.1

func (tm *TaskMaster) IsExecuting(name string) bool

func (*TaskMaster) New

func (tm *TaskMaster) New() *TaskMaster

Returns a new TaskMaster instance with the same services as the current one.

func (*TaskMaster) NewFork

func (tm *TaskMaster) NewFork(taskName string, dbrps []DBRP) (*Edge, error)

func (*TaskMaster) NewTask added in v0.10.0

func (tm *TaskMaster) NewTask(
	name,
	script string,
	tt TaskType,
	dbrps []DBRP,
	snapshotInterval time.Duration,
) (*Task, error)

Create a new task in the context of a TaskMaster

func (*TaskMaster) Open

func (tm *TaskMaster) Open() (err error)

func (*TaskMaster) SnapshotTask added in v0.10.0

func (tm *TaskMaster) SnapshotTask(name string) (*TaskSnapshot, error)

func (*TaskMaster) StartTask

func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error)

func (*TaskMaster) StopTask

func (tm *TaskMaster) StopTask(name string) error

func (*TaskMaster) StopTasks added in v0.10.1

func (tm *TaskMaster) StopTasks()

func (*TaskMaster) Stream

func (tm *TaskMaster) Stream(name string) (StreamCollector, error)

func (*TaskMaster) WritePoints added in v0.2.1

func (tm *TaskMaster) WritePoints(pts *cluster.WritePointsRequest) error

type TaskSnapshot added in v0.10.0

type TaskSnapshot struct {
	NodeSnapshots map[string][]byte
}

type TaskType

type TaskType int

The type of a task

const (
	StreamTask TaskType = iota
	BatchTask
)

func (TaskType) String

func (t TaskType) String() string

type UDFNode added in v0.10.0

type UDFNode struct {
	// contains filtered or unexported fields
}

User defined function

func (*UDFNode) Err added in v0.10.0

func (n *UDFNode) Err() error

type UDFProcess added in v0.10.0

type UDFProcess struct {
	PointIn chan<- models.Point

	BatchIn chan<- models.Batch

	PointOut <-chan models.Point

	BatchOut <-chan models.Batch
	// contains filtered or unexported fields
}

Wraps an external process and sends and receives data over STDIN and STDOUT. Lines received over STDERR are logged via normal Kapacitor logging.

Once a UDFProcess is created and started the owner can send points or batches to the subprocess by writing them to the PointIn or BatchIn channels respectively, and according to the type of process created.

The UDFProcess may be Aborted at anytime for various reasons. It is the owner's responsibility via the abortCallback to stop writing to the *In channels since no more selects on the channels will be performed.

Calling Stop on the process should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.

Callling Info returns information about available options the process has.

Calling Init is required to process data. The behavior is undefined if you send points/batches to the process without calling Init.

func NewUDFProcess added in v0.10.0

func NewUDFProcess(
	commander command.Commander,
	l *log.Logger,
	timeout time.Duration,
	abortCallback func(),
) *UDFProcess

func (*UDFProcess) Abort added in v0.10.0

func (p *UDFProcess) Abort(err error)

Abort the process. Data in-flight will not be processed.

func (*UDFProcess) Info added in v0.10.0

func (p *UDFProcess) Info() (UDFProcessInfo, error)

Get information about the process, available options etc. Info need not be called every time a process is started.

func (*UDFProcess) Init added in v0.10.0

func (p *UDFProcess) Init(options []*udf.Option) error

Initialize the process with a set of Options. Calling Init is required even if you do not have any specific Options, just pass nil

func (*UDFProcess) Restore added in v0.10.0

func (p *UDFProcess) Restore(snapshot []byte) error

Request to restore a snapshot.

func (*UDFProcess) Snapshot added in v0.10.0

func (p *UDFProcess) Snapshot() ([]byte, error)

Request a snapshot from the process.

func (*UDFProcess) Start added in v0.10.0

func (p *UDFProcess) Start() error

Start the UDFProcess

func (*UDFProcess) Stop added in v0.10.0

func (p *UDFProcess) Stop() error

Stop the UDFProcess cleanly.

Calling Stop should only be done once the owner has stopped writing to the *In channel, at which point the remaining data will be processed and the subprocess will be allowed to exit cleanly.

type UDFProcessInfo added in v0.10.0

type UDFProcessInfo struct {
	Commander command.Commander
	Timeout   time.Duration
	Wants     pipeline.EdgeType
	Provides  pipeline.EdgeType
	Options   map[string]*udf.OptionInfo
}

type UDFService added in v0.10.0

type UDFService interface {
	FunctionList() []string
	FunctionInfo(name string) (UDFProcessInfo, bool)
}

type UnionNode

type UnionNode struct {
	// contains filtered or unexported fields
}

func (*UnionNode) Err

func (n *UnionNode) Err() error

type WhereNode

type WhereNode struct {
	// contains filtered or unexported fields
}

func (*WhereNode) Err

func (n *WhereNode) Err() error

type WindowNode

type WindowNode struct {
	// contains filtered or unexported fields
}

func (*WindowNode) Err

func (n *WindowNode) Err() error

Directories

Path Synopsis
A clock that provides blocking calls that wait until absolute times have occurred.
A clock that provides blocking calls that wait until absolute times have occurred.
cmd
Contains integration and end-to-end tests
Contains integration and end-to-end tests
Provides a set of structures for passing data around Kapacitor.
Provides a set of structures for passing data around Kapacitor.
Provides an API for constructing data processing pipelines.
Provides an API for constructing data processing pipelines.
services
httpd
Provides an HTTP API exposing many components of Kapacitor.
Provides an HTTP API exposing many components of Kapacitor.
reporting
Sends anonymous reports to InfluxData
Sends anonymous reports to InfluxData
stats
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
The stats service collects the exported stats and submits them to the Kapacitor stream under the configured database and retetion policy.
udf
udp
TICKscript is a simple invocation chaining DSL.
TICKscript is a simple invocation chaining DSL.
cmd/tickdoc
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
Tickdoc is a simple utility similiar to godoc that generates documentation from comments.
udf
Package udf is a generated protocol buffer package.
Package udf is a generated protocol buffer package.
Provides an io.Writer that filters log messages based on a log level.
Provides an io.Writer that filters log messages based on a log level.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL