agent

package
v1.7.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: MIT Imports: 13 Imported by: 165

README

UDF Agents and Servers

A UDF is a User Defined Function, meaning that you can write your own functions/algorithms and plug them into Kapacitor. Your custom function runs in its own process and Kapacitor communicates with it via a defined protocol, see udf.proto. To facilitate working with the protocol several agents have been written in various languages that abstract the protocol communication through an interface in the respective languages. You can find those agent implementations in this directory and subdirectories based on language name.

Example uses of the agents can be found in the examples directory. These examples are working examples and are executed as part of the testing suite, see server_test.go.

Child process vs Socket

There are two approaches for writing UDFs.

  • A child process based approach where Kapacitor spawns a child process and communicates over STDIN/STDOUT.
  • A socket based approach where you start the UDF process externally and Kapacitor connects to it over a socket.

For the socket based approach there will only ever be one instance of your UDF process running. Each use of the UDF in a TICKscript will be a new connection the socket. Where as each use of a process based UDF means a new child process is spawned for each.

Design

The protocol for communicating with Kapacitor consists of Request and Response messages. The agents wrap the communication and serialization and expose an interface that needs to be implemented to handle each request/response. In addition to the request/response paradigm agents provide a way to stream data back to Kapacitor. Your UDF is in control of when new points or batches are sent back to Kapacitor.

Agents and Servers

There are two main objects provided in the current implementations, an Agent and a Server. The Agent is responsible for managing the communication over input and output streams. The Server is responsible for accepting new connections and creating new Agents to handle those new connections.

Both process based and socket based UDFs will need to use an Agent to handle the communication/serialization aspects of the protocol. Only socket based UDFs need use the Server.

Writing an Agent for a new Language

The UDF protocol is designed to be simple and consists of reading and writing protocol buffer messages.

In order to write a UDF in the language of your choice your language must have protocol buffer support and be able to read and write to a socket.

The basic steps are:

  1. Add the language to the udf/io.go generate comment so the udf.proto code exists for your language.
  2. Implement a Varint encoder/decoder, this is trivial see the python implementation.
  3. Implement a method for reading and writing streamed protobuf messages. See udf.proto for more details.
  4. Create an interface for handling each of the request/responses.
  5. Write a loop for reading from an input stream and calling the handler interface, and write responses to an output stream.
  6. Provide an thread safe mechanism for writing points and batches to the output stream independent of the handler interface. This is easily accomplished with a synchronized write method, see the python implementation.
  7. Implement the examples using your new agent.
  8. Add your example to the test suite in cmd/kapacitord/run/server_test.go.

For process based UDFs it is expected that the process terminate after STDIN is closed and the remaining requests processed. After STDIN is closed, the agent process can continue to send Responses to Kapacitor as long as a keepalive timeout does not occur. Once a keepalive timeout is reached and after a 2*keepalive_time grace period, if the process has not terminated then it will be forcefully terminated.

Docker

It is expected that the example can run inside the test suite. Since generating different protocol buffer code requires different plugins and libraries to run we make use of Docker to provide the necessary environment. This makes testing the code easier as the developer does not have to install each supported language locally.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	EdgeType_name = map[int32]string{
		0: "STREAM",
		1: "BATCH",
	}
	EdgeType_value = map[string]int32{
		"STREAM": 0,
		"BATCH":  1,
	}
)

Enum value maps for EdgeType.

View Source
var (
	ValueType_name = map[int32]string{
		0: "BOOL",
		1: "INT",
		2: "DOUBLE",
		3: "STRING",
		4: "DURATION",
	}
	ValueType_value = map[string]int32{
		"BOOL":     0,
		"INT":      1,
		"DOUBLE":   2,
		"STRING":   3,
		"DURATION": 4,
	}
)

Enum value maps for ValueType.

Functions

func ReadMessage added in v1.3.0

func ReadMessage(buf *[]byte, r ByteReadReader, msg proto.Message) error

Read a message from io.ByteReader by first reading a varint size, and then reading and decoding the message object. If buf is not big enough a new buffer will be allocated to replace buf.

func WriteMessage added in v1.3.0

func WriteMessage(msg proto.Message, w io.Writer) error

Write the message to the io.Writer with a varint size header.

Types

type Accepter added in v0.13.0

type Accepter interface {
	// Accept new connections from the listener and handle them accordingly.
	// The typical action is to create a new Agent with the connection as both its in and out objects.
	Accept(net.Conn)
}

type Agent

type Agent struct {

	// A channel for writing Responses, specifically Batch and Point responses.
	Responses chan<- *Response

	// The handler for requests.
	Handler Handler
	// contains filtered or unexported fields
}

Go implementation of a Kapacitor UDF agent. This agent is responsible for reading and writing messages over a socket.

The Agent requires a Handler object in order to fulfill requests.

func New

func New(in io.ReadCloser, out io.WriteCloser) *Agent

Create a new Agent is the provided in/out objects. To create an Agent that reads from STDIN/STDOUT of the process use New(os.Stdin, os.Stdout)

func (*Agent) Start

func (a *Agent) Start() error

Start the Agent, you must set an Handler on the agent before starting.

func (*Agent) Wait

func (a *Agent) Wait() error

Wait for the Agent to terminate. The Agent will not terminate till the Responses channel is closed. You will need to close this channel externally, typically in the Stop method for the Handler. The Agent will terminate if the In reader is closed or an error occurs.

type BeginBatch added in v1.3.0

type BeginBatch struct {
	Name   string            `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Group  string            `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"`
	Tags   map[string]string `` /* 149-byte string literal not displayed */
	Size   int64             `protobuf:"varint,4,opt,name=size,proto3" json:"size,omitempty"`
	ByName bool              `protobuf:"varint,5,opt,name=byName,proto3" json:"byName,omitempty"`
	// contains filtered or unexported fields
}

Indicates the beginning of a batch. All subsequent points should be considered part of the batch until EndBatch arrives. This includes grouping. Batches of differing groups may not be interleaved.

All the meta data but tmax is provided, since tmax may not be known at the beginning of a batch.

Size is the number of points in the batch. If size is 0 then the batch has an undetermined size.

func (*BeginBatch) Descriptor deprecated added in v1.3.0

func (*BeginBatch) Descriptor() ([]byte, []int)

Deprecated: Use BeginBatch.ProtoReflect.Descriptor instead.

func (*BeginBatch) GetByName added in v1.5.1

func (x *BeginBatch) GetByName() bool

func (*BeginBatch) GetGroup added in v1.5.1

func (x *BeginBatch) GetGroup() string

func (*BeginBatch) GetName added in v1.5.1

func (x *BeginBatch) GetName() string

func (*BeginBatch) GetSize added in v1.5.1

func (x *BeginBatch) GetSize() int64

func (*BeginBatch) GetTags added in v1.3.0

func (x *BeginBatch) GetTags() map[string]string

func (*BeginBatch) ProtoMessage added in v1.3.0

func (*BeginBatch) ProtoMessage()

func (*BeginBatch) ProtoReflect added in v1.5.8

func (x *BeginBatch) ProtoReflect() protoreflect.Message

func (*BeginBatch) Reset added in v1.3.0

func (x *BeginBatch) Reset()

func (*BeginBatch) String added in v1.3.0

func (x *BeginBatch) String() string

type ByteReadReader added in v1.3.0

type ByteReadReader interface {
	io.Reader
	io.ByteReader
}

Interface for reading messages If you have an io.Reader wrap your reader in a bufio Reader to stasify this interface.

Example: brr := bufio.NewReader(reader)

type EdgeType added in v1.3.0

type EdgeType int32
const (
	EdgeType_STREAM EdgeType = 0
	EdgeType_BATCH  EdgeType = 1
)

func (EdgeType) Descriptor added in v1.5.8

func (EdgeType) Descriptor() protoreflect.EnumDescriptor

func (EdgeType) Enum added in v1.5.8

func (x EdgeType) Enum() *EdgeType

func (EdgeType) EnumDescriptor deprecated added in v1.3.0

func (EdgeType) EnumDescriptor() ([]byte, []int)

Deprecated: Use EdgeType.Descriptor instead.

func (EdgeType) Number added in v1.5.8

func (x EdgeType) Number() protoreflect.EnumNumber

func (EdgeType) String added in v1.3.0

func (x EdgeType) String() string

func (EdgeType) Type added in v1.5.8

type EndBatch added in v1.3.0

type EndBatch struct {
	Name   string            `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Group  string            `protobuf:"bytes,2,opt,name=group,proto3" json:"group,omitempty"`
	Tmax   int64             `protobuf:"varint,3,opt,name=tmax,proto3" json:"tmax,omitempty"`
	Tags   map[string]string `` /* 149-byte string literal not displayed */
	ByName bool              `protobuf:"varint,5,opt,name=byName,proto3" json:"byName,omitempty"`
	// contains filtered or unexported fields
}

Indicates the end of a batch and contains all meta data associated with the batch. The same meta information is provided for ease of use with the addition of tmax since it may not be know at BeginBatch.

func (*EndBatch) Descriptor deprecated added in v1.3.0

func (*EndBatch) Descriptor() ([]byte, []int)

Deprecated: Use EndBatch.ProtoReflect.Descriptor instead.

func (*EndBatch) GetByName added in v1.5.1

func (x *EndBatch) GetByName() bool

func (*EndBatch) GetGroup added in v1.5.1

func (x *EndBatch) GetGroup() string

func (*EndBatch) GetName added in v1.5.1

func (x *EndBatch) GetName() string

func (*EndBatch) GetTags added in v1.3.0

func (x *EndBatch) GetTags() map[string]string

func (*EndBatch) GetTmax added in v1.5.1

func (x *EndBatch) GetTmax() int64

func (*EndBatch) ProtoMessage added in v1.3.0

func (*EndBatch) ProtoMessage()

func (*EndBatch) ProtoReflect added in v1.5.8

func (x *EndBatch) ProtoReflect() protoreflect.Message

func (*EndBatch) Reset added in v1.3.0

func (x *EndBatch) Reset()

func (*EndBatch) String added in v1.3.0

func (x *EndBatch) String() string

type ErrorResponse added in v1.3.0

type ErrorResponse struct {
	Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

Sent from the process to Kapacitor indicating an error has occurred. If an ErrorResponse is received, Kapacitor will terminate the process.

func (*ErrorResponse) Descriptor deprecated added in v1.3.0

func (*ErrorResponse) Descriptor() ([]byte, []int)

Deprecated: Use ErrorResponse.ProtoReflect.Descriptor instead.

func (*ErrorResponse) GetError added in v1.5.1

func (x *ErrorResponse) GetError() string

func (*ErrorResponse) ProtoMessage added in v1.3.0

func (*ErrorResponse) ProtoMessage()

func (*ErrorResponse) ProtoReflect added in v1.5.8

func (x *ErrorResponse) ProtoReflect() protoreflect.Message

func (*ErrorResponse) Reset added in v1.3.0

func (x *ErrorResponse) Reset()

func (*ErrorResponse) String added in v1.3.0

func (x *ErrorResponse) String() string

type Handler

type Handler interface {
	// Return the InfoResponse. Describing the properties of this Handler
	Info() (*InfoResponse, error)
	// Initialize the Handler with the provided options.
	Init(*InitRequest) (*InitResponse, error)
	// Create a snapshot of the running state of the handler.
	Snapshot() (*SnapshotResponse, error)
	// Restore a previous snapshot.
	Restore(*RestoreRequest) (*RestoreResponse, error)

	// A batch has begun.
	BeginBatch(*BeginBatch) error
	// A point has arrived.
	Point(*Point) error
	// The batch is complete.
	EndBatch(*EndBatch) error

	// Gracefully stop the Handler.
	// No other methods will be called.
	Stop()
}

The Agent calls the appropriate methods on the Handler as it receives requests over a socket.

Returning an error from any method will cause the Agent to stop and an ErrorResponse to be sent. Some *Response objects (like SnapshotResponse) allow for returning their own error within the object itself. These types of errors will not stop the Agent and Kapacitor will deal with them appropriately.

The Handler is called from a single goroutine, meaning methods will not be called concurrently.

To write Points/Batches back to the Agent/Kapacitor use the Agent.Responses channel.

type InfoRequest added in v1.3.0

type InfoRequest struct {
	// contains filtered or unexported fields
}

Request that the process return information about available Options.

func (*InfoRequest) Descriptor deprecated added in v1.3.0

func (*InfoRequest) Descriptor() ([]byte, []int)

Deprecated: Use InfoRequest.ProtoReflect.Descriptor instead.

func (*InfoRequest) ProtoMessage added in v1.3.0

func (*InfoRequest) ProtoMessage()

func (*InfoRequest) ProtoReflect added in v1.5.8

func (x *InfoRequest) ProtoReflect() protoreflect.Message

func (*InfoRequest) Reset added in v1.3.0

func (x *InfoRequest) Reset()

func (*InfoRequest) String added in v1.3.0

func (x *InfoRequest) String() string

type InfoResponse added in v1.3.0

type InfoResponse struct {
	Wants    EdgeType               `protobuf:"varint,1,opt,name=wants,proto3,enum=agent.EdgeType" json:"wants,omitempty"`
	Provides EdgeType               `protobuf:"varint,2,opt,name=provides,proto3,enum=agent.EdgeType" json:"provides,omitempty"`
	Options  map[string]*OptionInfo `` /* 155-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (*InfoResponse) Descriptor deprecated added in v1.3.0

func (*InfoResponse) Descriptor() ([]byte, []int)

Deprecated: Use InfoResponse.ProtoReflect.Descriptor instead.

func (*InfoResponse) GetOptions added in v1.3.0

func (x *InfoResponse) GetOptions() map[string]*OptionInfo

func (*InfoResponse) GetProvides added in v1.5.1

func (x *InfoResponse) GetProvides() EdgeType

func (*InfoResponse) GetWants added in v1.5.1

func (x *InfoResponse) GetWants() EdgeType

func (*InfoResponse) ProtoMessage added in v1.3.0

func (*InfoResponse) ProtoMessage()

func (*InfoResponse) ProtoReflect added in v1.5.8

func (x *InfoResponse) ProtoReflect() protoreflect.Message

func (*InfoResponse) Reset added in v1.3.0

func (x *InfoResponse) Reset()

func (*InfoResponse) String added in v1.3.0

func (x *InfoResponse) String() string

type InitRequest added in v1.3.0

type InitRequest struct {
	Options []*Option `protobuf:"bytes,1,rep,name=options,proto3" json:"options,omitempty"`
	TaskID  string    `protobuf:"bytes,2,opt,name=taskID,proto3" json:"taskID,omitempty"`
	NodeID  string    `protobuf:"bytes,3,opt,name=nodeID,proto3" json:"nodeID,omitempty"`
	// contains filtered or unexported fields
}

Request that the process initialize itself with the provided options.

func (*InitRequest) Descriptor deprecated added in v1.3.0

func (*InitRequest) Descriptor() ([]byte, []int)

Deprecated: Use InitRequest.ProtoReflect.Descriptor instead.

func (*InitRequest) GetNodeID added in v1.5.1

func (x *InitRequest) GetNodeID() string

func (*InitRequest) GetOptions added in v1.3.0

func (x *InitRequest) GetOptions() []*Option

func (*InitRequest) GetTaskID added in v1.5.1

func (x *InitRequest) GetTaskID() string

func (*InitRequest) ProtoMessage added in v1.3.0

func (*InitRequest) ProtoMessage()

func (*InitRequest) ProtoReflect added in v1.5.8

func (x *InitRequest) ProtoReflect() protoreflect.Message

func (*InitRequest) Reset added in v1.3.0

func (x *InitRequest) Reset()

func (*InitRequest) String added in v1.3.0

func (x *InitRequest) String() string

type InitResponse added in v1.3.0

type InitResponse struct {
	Success bool   `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
	Error   string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

Respond to Kapacitor whether initialization was successful.

func (*InitResponse) Descriptor deprecated added in v1.3.0

func (*InitResponse) Descriptor() ([]byte, []int)

Deprecated: Use InitResponse.ProtoReflect.Descriptor instead.

func (*InitResponse) GetError added in v1.5.1

func (x *InitResponse) GetError() string

func (*InitResponse) GetSuccess added in v1.5.1

func (x *InitResponse) GetSuccess() bool

func (*InitResponse) ProtoMessage added in v1.3.0

func (*InitResponse) ProtoMessage()

func (*InitResponse) ProtoReflect added in v1.5.8

func (x *InitResponse) ProtoReflect() protoreflect.Message

func (*InitResponse) Reset added in v1.3.0

func (x *InitResponse) Reset()

func (*InitResponse) String added in v1.3.0

func (x *InitResponse) String() string

type KeepaliveRequest added in v1.3.0

type KeepaliveRequest struct {

	// The number of nanoseconds since the epoch.
	// Used only for debugging keepalive requests.
	Time int64 `protobuf:"varint,1,opt,name=time,proto3" json:"time,omitempty"`
	// contains filtered or unexported fields
}

Request that the process respond with a Keepalive to verify it is responding.

func (*KeepaliveRequest) Descriptor deprecated added in v1.3.0

func (*KeepaliveRequest) Descriptor() ([]byte, []int)

Deprecated: Use KeepaliveRequest.ProtoReflect.Descriptor instead.

func (*KeepaliveRequest) GetTime added in v1.5.1

func (x *KeepaliveRequest) GetTime() int64

func (*KeepaliveRequest) ProtoMessage added in v1.3.0

func (*KeepaliveRequest) ProtoMessage()

func (*KeepaliveRequest) ProtoReflect added in v1.5.8

func (x *KeepaliveRequest) ProtoReflect() protoreflect.Message

func (*KeepaliveRequest) Reset added in v1.3.0

func (x *KeepaliveRequest) Reset()

func (*KeepaliveRequest) String added in v1.3.0

func (x *KeepaliveRequest) String() string

type KeepaliveResponse added in v1.3.0

type KeepaliveResponse struct {

	// The number of nanoseconds since the epoch.
	// Used only for debugging keepalive requests.
	Time int64 `protobuf:"varint,1,opt,name=time,proto3" json:"time,omitempty"`
	// contains filtered or unexported fields
}

Respond to KeepaliveRequest

func (*KeepaliveResponse) Descriptor deprecated added in v1.3.0

func (*KeepaliveResponse) Descriptor() ([]byte, []int)

Deprecated: Use KeepaliveResponse.ProtoReflect.Descriptor instead.

func (*KeepaliveResponse) GetTime added in v1.5.1

func (x *KeepaliveResponse) GetTime() int64

func (*KeepaliveResponse) ProtoMessage added in v1.3.0

func (*KeepaliveResponse) ProtoMessage()

func (*KeepaliveResponse) ProtoReflect added in v1.5.8

func (x *KeepaliveResponse) ProtoReflect() protoreflect.Message

func (*KeepaliveResponse) Reset added in v1.3.0

func (x *KeepaliveResponse) Reset()

func (*KeepaliveResponse) String added in v1.3.0

func (x *KeepaliveResponse) String() string

type Option added in v1.3.0

type Option struct {
	Name   string         `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Values []*OptionValue `protobuf:"bytes,2,rep,name=values,proto3" json:"values,omitempty"`
	// contains filtered or unexported fields
}

func (*Option) Descriptor deprecated added in v1.3.0

func (*Option) Descriptor() ([]byte, []int)

Deprecated: Use Option.ProtoReflect.Descriptor instead.

func (*Option) GetName added in v1.5.1

func (x *Option) GetName() string

func (*Option) GetValues added in v1.3.0

func (x *Option) GetValues() []*OptionValue

func (*Option) ProtoMessage added in v1.3.0

func (*Option) ProtoMessage()

func (*Option) ProtoReflect added in v1.5.8

func (x *Option) ProtoReflect() protoreflect.Message

func (*Option) Reset added in v1.3.0

func (x *Option) Reset()

func (*Option) String added in v1.3.0

func (x *Option) String() string

type OptionInfo added in v1.3.0

type OptionInfo struct {
	ValueTypes []ValueType `protobuf:"varint,1,rep,packed,name=valueTypes,proto3,enum=agent.ValueType" json:"valueTypes,omitempty"`
	// contains filtered or unexported fields
}

func (*OptionInfo) Descriptor deprecated added in v1.3.0

func (*OptionInfo) Descriptor() ([]byte, []int)

Deprecated: Use OptionInfo.ProtoReflect.Descriptor instead.

func (*OptionInfo) GetValueTypes added in v1.5.1

func (x *OptionInfo) GetValueTypes() []ValueType

func (*OptionInfo) ProtoMessage added in v1.3.0

func (*OptionInfo) ProtoMessage()

func (*OptionInfo) ProtoReflect added in v1.5.8

func (x *OptionInfo) ProtoReflect() protoreflect.Message

func (*OptionInfo) Reset added in v1.3.0

func (x *OptionInfo) Reset()

func (*OptionInfo) String added in v1.3.0

func (x *OptionInfo) String() string

type OptionValue added in v1.3.0

type OptionValue struct {
	Type ValueType `protobuf:"varint,1,opt,name=type,proto3,enum=agent.ValueType" json:"type,omitempty"`
	// Types that are assignable to Value:
	//
	//	*OptionValue_BoolValue
	//	*OptionValue_IntValue
	//	*OptionValue_DoubleValue
	//	*OptionValue_StringValue
	//	*OptionValue_DurationValue
	Value isOptionValue_Value `protobuf_oneof:"value"`
	// contains filtered or unexported fields
}

func (*OptionValue) Descriptor deprecated added in v1.3.0

func (*OptionValue) Descriptor() ([]byte, []int)

Deprecated: Use OptionValue.ProtoReflect.Descriptor instead.

func (*OptionValue) GetBoolValue added in v1.3.0

func (x *OptionValue) GetBoolValue() bool

func (*OptionValue) GetDoubleValue added in v1.3.0

func (x *OptionValue) GetDoubleValue() float64

func (*OptionValue) GetDurationValue added in v1.3.0

func (x *OptionValue) GetDurationValue() int64

func (*OptionValue) GetIntValue added in v1.3.0

func (x *OptionValue) GetIntValue() int64

func (*OptionValue) GetStringValue added in v1.3.0

func (x *OptionValue) GetStringValue() string

func (*OptionValue) GetType added in v1.5.1

func (x *OptionValue) GetType() ValueType

func (*OptionValue) GetValue added in v1.3.0

func (m *OptionValue) GetValue() isOptionValue_Value

func (*OptionValue) ProtoMessage added in v1.3.0

func (*OptionValue) ProtoMessage()

func (*OptionValue) ProtoReflect added in v1.5.8

func (x *OptionValue) ProtoReflect() protoreflect.Message

func (*OptionValue) Reset added in v1.3.0

func (x *OptionValue) Reset()

func (*OptionValue) String added in v1.3.0

func (x *OptionValue) String() string

type OptionValue_BoolValue added in v1.3.0

type OptionValue_BoolValue struct {
	BoolValue bool `protobuf:"varint,2,opt,name=boolValue,proto3,oneof"`
}

type OptionValue_DoubleValue added in v1.3.0

type OptionValue_DoubleValue struct {
	DoubleValue float64 `protobuf:"fixed64,4,opt,name=doubleValue,proto3,oneof"`
}

type OptionValue_DurationValue added in v1.3.0

type OptionValue_DurationValue struct {
	DurationValue int64 `protobuf:"varint,6,opt,name=durationValue,proto3,oneof"`
}

type OptionValue_IntValue added in v1.3.0

type OptionValue_IntValue struct {
	IntValue int64 `protobuf:"varint,3,opt,name=intValue,proto3,oneof"`
}

type OptionValue_StringValue added in v1.3.0

type OptionValue_StringValue struct {
	StringValue string `protobuf:"bytes,5,opt,name=stringValue,proto3,oneof"`
}

type Point added in v1.3.0

type Point struct {
	Time            int64              `protobuf:"varint,1,opt,name=time,proto3" json:"time,omitempty"`
	Name            string             `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	Database        string             `protobuf:"bytes,3,opt,name=database,proto3" json:"database,omitempty"`
	RetentionPolicy string             `protobuf:"bytes,4,opt,name=retentionPolicy,proto3" json:"retentionPolicy,omitempty"`
	Group           string             `protobuf:"bytes,5,opt,name=group,proto3" json:"group,omitempty"`
	Dimensions      []string           `protobuf:"bytes,6,rep,name=dimensions,proto3" json:"dimensions,omitempty"`
	Tags            map[string]string  `` /* 149-byte string literal not displayed */
	FieldsDouble    map[string]float64 `` /* 167-byte string literal not displayed */
	FieldsInt       map[string]int64   `` /* 160-byte string literal not displayed */
	FieldsString    map[string]string  `` /* 166-byte string literal not displayed */
	FieldsBool      map[string]bool    `` /* 163-byte string literal not displayed */
	ByName          bool               `protobuf:"varint,11,opt,name=byName,proto3" json:"byName,omitempty"`
	// contains filtered or unexported fields
}

Message containing information about a single data point. Can be sent on it's own or bookended by BeginBatch and EndBatch messages.

func (*Point) Descriptor deprecated added in v1.3.0

func (*Point) Descriptor() ([]byte, []int)

Deprecated: Use Point.ProtoReflect.Descriptor instead.

func (*Point) GetByName added in v1.5.1

func (x *Point) GetByName() bool

func (*Point) GetDatabase added in v1.5.1

func (x *Point) GetDatabase() string

func (*Point) GetDimensions added in v1.5.1

func (x *Point) GetDimensions() []string

func (*Point) GetFieldsBool added in v1.4.0

func (x *Point) GetFieldsBool() map[string]bool

func (*Point) GetFieldsDouble added in v1.3.0

func (x *Point) GetFieldsDouble() map[string]float64

func (*Point) GetFieldsInt added in v1.3.0

func (x *Point) GetFieldsInt() map[string]int64

func (*Point) GetFieldsString added in v1.3.0

func (x *Point) GetFieldsString() map[string]string

func (*Point) GetGroup added in v1.5.1

func (x *Point) GetGroup() string

func (*Point) GetName added in v1.5.1

func (x *Point) GetName() string

func (*Point) GetRetentionPolicy added in v1.5.1

func (x *Point) GetRetentionPolicy() string

func (*Point) GetTags added in v1.3.0

func (x *Point) GetTags() map[string]string

func (*Point) GetTime added in v1.5.1

func (x *Point) GetTime() int64

func (*Point) ProtoMessage added in v1.3.0

func (*Point) ProtoMessage()

func (*Point) ProtoReflect added in v1.5.8

func (x *Point) ProtoReflect() protoreflect.Message

func (*Point) Reset added in v1.3.0

func (x *Point) Reset()

func (*Point) String added in v1.3.0

func (x *Point) String() string

type Request added in v1.3.0

type Request struct {

	// Types that are assignable to Message:
	//
	//	*Request_Info
	//	*Request_Init
	//	*Request_Keepalive
	//	*Request_Snapshot
	//	*Request_Restore
	//	*Request_Begin
	//	*Request_Point
	//	*Request_End
	Message isRequest_Message `protobuf_oneof:"message"`
	// contains filtered or unexported fields
}

Request message wrapper -- sent from Kapacitor to process

func (*Request) Descriptor deprecated added in v1.3.0

func (*Request) Descriptor() ([]byte, []int)

Deprecated: Use Request.ProtoReflect.Descriptor instead.

func (*Request) GetBegin added in v1.3.0

func (x *Request) GetBegin() *BeginBatch

func (*Request) GetEnd added in v1.3.0

func (x *Request) GetEnd() *EndBatch

func (*Request) GetInfo added in v1.3.0

func (x *Request) GetInfo() *InfoRequest

func (*Request) GetInit added in v1.3.0

func (x *Request) GetInit() *InitRequest

func (*Request) GetKeepalive added in v1.3.0

func (x *Request) GetKeepalive() *KeepaliveRequest

func (*Request) GetMessage added in v1.3.0

func (m *Request) GetMessage() isRequest_Message

func (*Request) GetPoint added in v1.3.0

func (x *Request) GetPoint() *Point

func (*Request) GetRestore added in v1.3.0

func (x *Request) GetRestore() *RestoreRequest

func (*Request) GetSnapshot added in v1.3.0

func (x *Request) GetSnapshot() *SnapshotRequest

func (*Request) ProtoMessage added in v1.3.0

func (*Request) ProtoMessage()

func (*Request) ProtoReflect added in v1.5.8

func (x *Request) ProtoReflect() protoreflect.Message

func (*Request) Reset added in v1.3.0

func (x *Request) Reset()

func (*Request) String added in v1.3.0

func (x *Request) String() string

type Request_Begin added in v1.3.0

type Request_Begin struct {
	// Data flow responses
	Begin *BeginBatch `protobuf:"bytes,16,opt,name=begin,proto3,oneof"`
}

type Request_End added in v1.3.0

type Request_End struct {
	End *EndBatch `protobuf:"bytes,18,opt,name=end,proto3,oneof"`
}

type Request_Info added in v1.3.0

type Request_Info struct {
	// Management requests
	Info *InfoRequest `protobuf:"bytes,1,opt,name=info,proto3,oneof"`
}

type Request_Init added in v1.3.0

type Request_Init struct {
	Init *InitRequest `protobuf:"bytes,2,opt,name=init,proto3,oneof"`
}

type Request_Keepalive added in v1.3.0

type Request_Keepalive struct {
	Keepalive *KeepaliveRequest `protobuf:"bytes,3,opt,name=keepalive,proto3,oneof"`
}

type Request_Point added in v1.3.0

type Request_Point struct {
	Point *Point `protobuf:"bytes,17,opt,name=point,proto3,oneof"`
}

type Request_Restore added in v1.3.0

type Request_Restore struct {
	Restore *RestoreRequest `protobuf:"bytes,5,opt,name=restore,proto3,oneof"`
}

type Request_Snapshot added in v1.3.0

type Request_Snapshot struct {
	Snapshot *SnapshotRequest `protobuf:"bytes,4,opt,name=snapshot,proto3,oneof"`
}

type Response added in v1.3.0

type Response struct {

	// Types that are assignable to Message:
	//
	//	*Response_Info
	//	*Response_Init
	//	*Response_Keepalive
	//	*Response_Snapshot
	//	*Response_Restore
	//	*Response_Error
	//	*Response_Begin
	//	*Response_Point
	//	*Response_End
	Message isResponse_Message `protobuf_oneof:"message"`
	// contains filtered or unexported fields
}

Response message wrapper -- sent from process to Kapacitor

func (*Response) Descriptor deprecated added in v1.3.0

func (*Response) Descriptor() ([]byte, []int)

Deprecated: Use Response.ProtoReflect.Descriptor instead.

func (*Response) GetBegin added in v1.3.0

func (x *Response) GetBegin() *BeginBatch

func (*Response) GetEnd added in v1.3.0

func (x *Response) GetEnd() *EndBatch

func (*Response) GetError added in v1.3.0

func (x *Response) GetError() *ErrorResponse

func (*Response) GetInfo added in v1.3.0

func (x *Response) GetInfo() *InfoResponse

func (*Response) GetInit added in v1.3.0

func (x *Response) GetInit() *InitResponse

func (*Response) GetKeepalive added in v1.3.0

func (x *Response) GetKeepalive() *KeepaliveResponse

func (*Response) GetMessage added in v1.3.0

func (m *Response) GetMessage() isResponse_Message

func (*Response) GetPoint added in v1.3.0

func (x *Response) GetPoint() *Point

func (*Response) GetRestore added in v1.3.0

func (x *Response) GetRestore() *RestoreResponse

func (*Response) GetSnapshot added in v1.3.0

func (x *Response) GetSnapshot() *SnapshotResponse

func (*Response) ProtoMessage added in v1.3.0

func (*Response) ProtoMessage()

func (*Response) ProtoReflect added in v1.5.8

func (x *Response) ProtoReflect() protoreflect.Message

func (*Response) Reset added in v1.3.0

func (x *Response) Reset()

func (*Response) String added in v1.3.0

func (x *Response) String() string

type Response_Begin added in v1.3.0

type Response_Begin struct {
	// Data flow responses
	Begin *BeginBatch `protobuf:"bytes,16,opt,name=begin,proto3,oneof"`
}

type Response_End added in v1.3.0

type Response_End struct {
	End *EndBatch `protobuf:"bytes,18,opt,name=end,proto3,oneof"`
}

type Response_Error added in v1.3.0

type Response_Error struct {
	Error *ErrorResponse `protobuf:"bytes,6,opt,name=error,proto3,oneof"`
}

type Response_Info added in v1.3.0

type Response_Info struct {
	// Management responses
	Info *InfoResponse `protobuf:"bytes,1,opt,name=info,proto3,oneof"`
}

type Response_Init added in v1.3.0

type Response_Init struct {
	Init *InitResponse `protobuf:"bytes,2,opt,name=init,proto3,oneof"`
}

type Response_Keepalive added in v1.3.0

type Response_Keepalive struct {
	Keepalive *KeepaliveResponse `protobuf:"bytes,3,opt,name=keepalive,proto3,oneof"`
}

type Response_Point added in v1.3.0

type Response_Point struct {
	Point *Point `protobuf:"bytes,17,opt,name=point,proto3,oneof"`
}

type Response_Restore added in v1.3.0

type Response_Restore struct {
	Restore *RestoreResponse `protobuf:"bytes,5,opt,name=restore,proto3,oneof"`
}

type Response_Snapshot added in v1.3.0

type Response_Snapshot struct {
	Snapshot *SnapshotResponse `protobuf:"bytes,4,opt,name=snapshot,proto3,oneof"`
}

type RestoreRequest added in v1.3.0

type RestoreRequest struct {
	Snapshot []byte `protobuf:"bytes,1,opt,name=snapshot,proto3" json:"snapshot,omitempty"`
	// contains filtered or unexported fields
}

Request that the process restore its state from a snapshot.

func (*RestoreRequest) Descriptor deprecated added in v1.3.0

func (*RestoreRequest) Descriptor() ([]byte, []int)

Deprecated: Use RestoreRequest.ProtoReflect.Descriptor instead.

func (*RestoreRequest) GetSnapshot added in v1.5.1

func (x *RestoreRequest) GetSnapshot() []byte

func (*RestoreRequest) ProtoMessage added in v1.3.0

func (*RestoreRequest) ProtoMessage()

func (*RestoreRequest) ProtoReflect added in v1.5.8

func (x *RestoreRequest) ProtoReflect() protoreflect.Message

func (*RestoreRequest) Reset added in v1.3.0

func (x *RestoreRequest) Reset()

func (*RestoreRequest) String added in v1.3.0

func (x *RestoreRequest) String() string

type RestoreResponse added in v1.3.0

type RestoreResponse struct {
	Success bool   `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
	Error   string `protobuf:"bytes,2,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

Respond with success or failure to a RestoreRequest

func (*RestoreResponse) Descriptor deprecated added in v1.3.0

func (*RestoreResponse) Descriptor() ([]byte, []int)

Deprecated: Use RestoreResponse.ProtoReflect.Descriptor instead.

func (*RestoreResponse) GetError added in v1.5.1

func (x *RestoreResponse) GetError() string

func (*RestoreResponse) GetSuccess added in v1.5.1

func (x *RestoreResponse) GetSuccess() bool

func (*RestoreResponse) ProtoMessage added in v1.3.0

func (*RestoreResponse) ProtoMessage()

func (*RestoreResponse) ProtoReflect added in v1.5.8

func (x *RestoreResponse) ProtoReflect() protoreflect.Message

func (*RestoreResponse) Reset added in v1.3.0

func (x *RestoreResponse) Reset()

func (*RestoreResponse) String added in v1.3.0

func (x *RestoreResponse) String() string

type Server added in v0.13.0

type Server struct {
	// contains filtered or unexported fields
}

A server accepts connections on a listener and spawns new Agents for each connection.

func NewServer added in v0.13.0

func NewServer(l net.Listener, a Accepter) *Server

Create a new server.

func (*Server) Serve added in v0.13.0

func (s *Server) Serve() error

Server starts the server and blocks.

func (*Server) Stop added in v0.13.0

func (s *Server) Stop()

Stop closes the listener and stops all server activity.

func (*Server) StopOnSignals added in v0.13.0

func (s *Server) StopOnSignals(signals ...os.Signal)

StopOnSignals registers a signal handler to stop the Server for the given signals.

type SnapshotRequest added in v1.3.0

type SnapshotRequest struct {
	// contains filtered or unexported fields
}

Request that the process provide a snapshot of its state.

func (*SnapshotRequest) Descriptor deprecated added in v1.3.0

func (*SnapshotRequest) Descriptor() ([]byte, []int)

Deprecated: Use SnapshotRequest.ProtoReflect.Descriptor instead.

func (*SnapshotRequest) ProtoMessage added in v1.3.0

func (*SnapshotRequest) ProtoMessage()

func (*SnapshotRequest) ProtoReflect added in v1.5.8

func (x *SnapshotRequest) ProtoReflect() protoreflect.Message

func (*SnapshotRequest) Reset added in v1.3.0

func (x *SnapshotRequest) Reset()

func (*SnapshotRequest) String added in v1.3.0

func (x *SnapshotRequest) String() string

type SnapshotResponse added in v1.3.0

type SnapshotResponse struct {
	Snapshot []byte `protobuf:"bytes,1,opt,name=snapshot,proto3" json:"snapshot,omitempty"`
	// contains filtered or unexported fields
}

Respond to Kapacitor with a serialized snapshot of the running state.

func (*SnapshotResponse) Descriptor deprecated added in v1.3.0

func (*SnapshotResponse) Descriptor() ([]byte, []int)

Deprecated: Use SnapshotResponse.ProtoReflect.Descriptor instead.

func (*SnapshotResponse) GetSnapshot added in v1.5.1

func (x *SnapshotResponse) GetSnapshot() []byte

func (*SnapshotResponse) ProtoMessage added in v1.3.0

func (*SnapshotResponse) ProtoMessage()

func (*SnapshotResponse) ProtoReflect added in v1.5.8

func (x *SnapshotResponse) ProtoReflect() protoreflect.Message

func (*SnapshotResponse) Reset added in v1.3.0

func (x *SnapshotResponse) Reset()

func (*SnapshotResponse) String added in v1.3.0

func (x *SnapshotResponse) String() string

type ValueType added in v1.3.0

type ValueType int32
const (
	ValueType_BOOL     ValueType = 0
	ValueType_INT      ValueType = 1
	ValueType_DOUBLE   ValueType = 2
	ValueType_STRING   ValueType = 3
	ValueType_DURATION ValueType = 4
)

func (ValueType) Descriptor added in v1.5.8

func (ValueType) Descriptor() protoreflect.EnumDescriptor

func (ValueType) Enum added in v1.5.8

func (x ValueType) Enum() *ValueType

func (ValueType) EnumDescriptor deprecated added in v1.3.0

func (ValueType) EnumDescriptor() ([]byte, []int)

Deprecated: Use ValueType.Descriptor instead.

func (ValueType) Number added in v1.5.8

func (x ValueType) Number() protoreflect.EnumNumber

func (ValueType) String added in v1.3.0

func (x ValueType) String() string

func (ValueType) Type added in v1.5.8

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL