timeline

package module
v1.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2020 License: MIT Imports: 21 Imported by: 1

README

timeline

A library to send points to some OpenTSDB.

Documentation

Index

Constants

View Source
const (
	// FlattenerName - the name
	FlattenerName string = "flattener"

	// Avg - aggregation
	Avg FlatOperation = 0

	// Sum - aggregation
	Sum FlatOperation = 1

	// Count - aggregation
	Count FlatOperation = 2

	// Max - aggregation
	Max FlatOperation = 3

	// Min - aggregation
	Min FlatOperation = 4
)
View Source
const (
	// AccumulatorName - the name
	AccumulatorName string = "accumulator"
)

Variables

View Source
var (
	// ErrInvalidPayloadSize - raised when the transport receives an invalid payload size
	ErrInvalidPayloadSize error = errors.New("invalid payload size")
)
View Source
var (
	// ErrNotStored - thrown when a hash was not stored
	ErrNotStored error = fmt.Errorf("hash is not stored")
)

Functions

This section is empty.

Types

type Accumulator added in v1.1.0

type Accumulator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Accumulator - the struct

func NewAccumulator added in v1.1.0

func NewAccumulator(configuration *DataTransformerConfig) *Accumulator

NewAccumulator - creates a new instance

func (*Accumulator) Add added in v1.1.0

func (a *Accumulator) Add(hash string) error

Add - adds one more to the reference

func (*Accumulator) BuildContextualLogger added in v1.7.0

func (a *Accumulator) BuildContextualLogger(path ...string)

BuildContextualLogger - build the contextual logger using more info

func (*Accumulator) GetName added in v1.1.0

func (a *Accumulator) GetName() string

GetName - returns the processor's name

func (*Accumulator) ProcessCycle added in v1.10.0

func (d *Accumulator) ProcessCycle()

ProcessCycle - forces a new cycle process

func (*Accumulator) ProcessMapEntry added in v1.1.0

func (a *Accumulator) ProcessMapEntry(entry DataProcessorEntry) bool

ProcessMapEntry - sends the data to the transport

func (*Accumulator) SetTransport added in v1.1.0

func (d *Accumulator) SetTransport(transport Transport)

SetTransport - sets the transport

func (*Accumulator) Start added in v1.1.0

func (d *Accumulator) Start()

Start - starts the processor cycle

func (*Accumulator) Stop added in v1.1.0

func (a *Accumulator) Stop()

Stop - terminates the processing cycle

func (*Accumulator) Store added in v1.1.0

func (a *Accumulator) Store(item interface{}, ttl time.Duration) (string, error)

Store - stores a reference

func (*Accumulator) StoreCustomHash added in v1.3.0

func (a *Accumulator) StoreCustomHash(item interface{}, ttl time.Duration, hash string) error

StoreCustomHash - stores a custom reference

type Backend

type Backend struct {
	Host string `json:"host,omitempty"`
	Port int    `json:"port,omitempty"`
}

Backend - the destiny opentsdb backend

type CustomSerializerConfig added in v1.8.0

type CustomSerializerConfig struct {
	TimestampProperty string `json:"timestampProperty,omitempty"`
	ValueProperty     string `json:"valueProperty,omitempty"`
}

CustomSerializerConfig - configures a customized serialization transport

type DataProcessor added in v1.1.0

type DataProcessor interface {

	// Start - starts the data processor (do not start if you want to call ProcessCycle() manually)
	Start()

	// Stop - stops the data processor
	Stop()

	// GetName - returns the processor's name
	GetName() string

	// SetTransport - sets the transport
	SetTransport(transport Transport)

	// ProcessMapEntry - process a map entry and return true to delete the entry
	ProcessMapEntry(entry DataProcessorEntry) (deleteAfter bool)

	// BuildContextualLogger - build the contextual logger using more info
	BuildContextualLogger(path ...string)

	// ProcessCycle - forces a new cycle process
	ProcessCycle()
}

DataProcessor - a interface for data processors

type DataProcessorEntry added in v1.9.3

type DataProcessorEntry interface {

	// Lock - locks this resource
	Lock()

	// Unlock - unlocks this resource
	Unlock()

	// Release - release the stored resources
	Release()
}

DataProcessorEntry - an item from the data processor

type DataTransformerConfig added in v1.7.0

type DataTransformerConfig struct {
	CycleDuration     funks.Duration    `json:"cycleDuration,omitempty"`
	HashingAlgorithm  hashing.Algorithm `json:"hashingAlgorithm,omitempty"`
	HashSize          int               `json:"hashSize,omitempty"`
	PrintStackOnError bool              `json:"printStackOnError,omitempty"`
	// contains filtered or unexported fields
}

DataTransformerConfig - flattener configuration

type DefaultTransportConfig added in v1.7.0

type DefaultTransportConfig struct {
	TransportBufferSize  int            `json:"transportBufferSize,omitempty"`
	BatchSendInterval    funks.Duration `json:"batchSendInterval,omitempty"`
	RequestTimeout       funks.Duration `json:"requestTimeout,omitempty"`
	SerializerBufferSize int            `json:"serializerBufferSize,omitempty"`
	DebugInput           bool           `json:"debugInput,omitempty"`
	DebugOutput          bool           `json:"debugOutput,omitempty"`
	TimeBetweenBatches   funks.Duration `json:"timeBetweenBatches,omitempty"`
	PrintStackOnError    bool           `json:"printStackOnError,omitempty"`
}

DefaultTransportConfig - the default fields used by the transport configuration

func (*DefaultTransportConfig) Validate added in v1.7.0

func (c *DefaultTransportConfig) Validate() error

Validate - validates the default itens from the configuration

type FlatOperation

type FlatOperation uint8

FlatOperation - the type of the aggregation used

type Flattener

type Flattener struct {
	// contains filtered or unexported fields
}

Flattener - controls the timeline's point flattening

func NewFlattener

func NewFlattener(configuration *DataTransformerConfig) *Flattener

NewFlattener - creates a new flattener

func (*Flattener) Add

func (f *Flattener) Add(point *FlattenerPoint) error

Add - adds a new entry to the flattening process

func (*Flattener) BuildContextualLogger added in v1.7.0

func (f *Flattener) BuildContextualLogger(path ...string)

BuildContextualLogger - build the contextual logger using more info

func (*Flattener) GetName added in v1.1.0

func (f *Flattener) GetName() string

GetName - returns the processor's name

func (*Flattener) ProcessCycle added in v1.10.0

func (d *Flattener) ProcessCycle()

ProcessCycle - forces a new cycle process

func (*Flattener) ProcessMapEntry added in v1.1.0

func (f *Flattener) ProcessMapEntry(entry DataProcessorEntry) bool

ProcessMapEntry - process the values from an entry

func (*Flattener) SetTransport added in v1.1.0

func (d *Flattener) SetTransport(transport Transport)

SetTransport - sets the transport

func (*Flattener) Start

func (d *Flattener) Start()

Start - starts the processor cycle

func (*Flattener) Stop added in v1.1.0

func (d *Flattener) Stop()

Stop - terminates the processing cycle

type FlattenerPoint

type FlattenerPoint struct {
	// contains filtered or unexported fields
}

FlattenerPoint - a flattener's point containing the value

func (*FlattenerPoint) GetHash added in v1.1.0

func (fp *FlattenerPoint) GetHash() string

GetHash - returns the hash

type HTTPTransport

type HTTPTransport struct {
	// contains filtered or unexported fields
}

HTTPTransport - implements the HTTP transport

func NewHTTPTransport

func NewHTTPTransport(configuration *HTTPTransportConfig, customSerializer serializer.Serializer) (*HTTPTransport, error)

NewHTTPTransport - creates a new HTTP event manager with a customized serializer

func (*HTTPTransport) AccumulatedDataToDataChannelItem added in v1.1.0

func (t *HTTPTransport) AccumulatedDataToDataChannelItem(point *accumulatedData) (interface{}, error)

AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item

func (*HTTPTransport) BuildContextualLogger added in v1.7.0

func (t *HTTPTransport) BuildContextualLogger(path ...string)

BuildContextualLogger - build the contextual logger using more info

func (*HTTPTransport) Close

func (t *HTTPTransport) Close()

Close - closes this transport

func (*HTTPTransport) ConfigureBackend

func (t *HTTPTransport) ConfigureBackend(backend *Backend) error

ConfigureBackend - configures the backend

func (*HTTPTransport) DataChannel

func (t *HTTPTransport) DataChannel(item interface{})

DataChannel - send a new point

func (*HTTPTransport) DataChannelItemToAccumulatedData added in v1.1.0

func (t *HTTPTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConfig, instance interface{}, calculateHash bool) (Hashable, error)

DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data

func (*HTTPTransport) DataChannelItemToFlattenerPoint added in v1.1.0

func (t *HTTPTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConfig, instance interface{}, operation FlatOperation) (Hashable, error)

DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one

func (*HTTPTransport) FlattenerPointToDataChannelItem added in v1.1.0

func (t *HTTPTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)

FlattenerPointToDataChannelItem - converts the flattened point to the data channel one

func (*HTTPTransport) MatchType

func (t *HTTPTransport) MatchType(tt transportType) bool

MatchType - checks if this transport implementation matches the given type

func (*HTTPTransport) SendData added in v1.10.0

func (t *HTTPTransport) SendData() error

SendData - releases the point buffer and send all data

func (*HTTPTransport) Serialize

func (t *HTTPTransport) Serialize(item interface{}) (string, error)

Serialize - renders the text using the configured serializer

func (*HTTPTransport) SerializePayload added in v1.8.0

func (t *HTTPTransport) SerializePayload(dataList []interface{}) (payload []string, err error)

SerializePayload - serializes a list of generic data

func (*HTTPTransport) Start

func (t *HTTPTransport) Start(manualMode bool) error

Start - starts this transport

func (*HTTPTransport) TransferData

func (t *HTTPTransport) TransferData(payload []string) error

TransferData - transfers the data to the backend throught this transport

type HTTPTransportConfig

type HTTPTransportConfig struct {
	DefaultTransportConfig
	ServiceEndpoint        string            `json:"serviceEndpoint,omitempty"`
	Method                 string            `json:"method,omitempty"`
	ExpectedResponseStatus int               `json:"expectedResponseStatus,omitempty"`
	Headers                map[string]string `json:"headers,omitempty"`
	CustomSerializerConfig
}

HTTPTransportConfig - has all http transport configurations

type Hashable added in v1.1.0

type Hashable interface {

	// GetHash - return this instance hash
	GetHash() string
}

Hashable - a struct with hash function

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager - the parent of all event managers

func NewManager

func NewManager(transport Transport, flattener, accumulator DataProcessor, backend *Backend, customContext ...string) (*Manager, error)

NewManager - creates a timeline manager

func (*Manager) Flatten added in v1.8.0

func (m *Manager) Flatten(operation FlatOperation, genericItem interface{}) error

Flatten - flatten a point

func (*Manager) FlattenJSON added in v1.8.0

func (m *Manager) FlattenJSON(operation FlatOperation, name string, parameters ...interface{}) error

FlattenJSON - flatten a point

func (*Manager) FlattenOpenTSDB

func (m *Manager) FlattenOpenTSDB(operation FlatOperation, value float64, timestamp int64, metric string, tags ...interface{}) error

FlattenOpenTSDB - flatten a point

func (*Manager) GetTransport

func (m *Manager) GetTransport() Transport

GetTransport - returns the configured transport

func (*Manager) IncrementAccumulatedData added in v1.1.0

func (m *Manager) IncrementAccumulatedData(hash string) error

IncrementAccumulatedData - stores a data to accumulate

func (*Manager) ProcessCycle added in v1.10.0

func (m *Manager) ProcessCycle()

ProcessCycle - call process cycle manually

func (*Manager) Send added in v1.8.0

func (m *Manager) Send(genericItem interface{})

Send - sends a new data using the current transport

func (*Manager) SendData added in v1.10.0

func (m *Manager) SendData() error

SendData - send data manually

func (*Manager) SendJSON added in v1.8.0

func (m *Manager) SendJSON(schemaName string, parameters ...interface{}) error

SendJSON - sends a new data using the json transport

func (*Manager) SendOpenTSDB

func (m *Manager) SendOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) error

SendOpenTSDB - sends a new data using the openTSDB transport

func (*Manager) Serialize added in v1.8.0

func (m *Manager) Serialize(genericItem interface{}) (string, error)

Serialize - serializes a point using the json serializer

func (*Manager) SerializeJSON added in v1.8.0

func (m *Manager) SerializeJSON(schemaName string, parameters ...interface{}) (string, error)

SerializeJSON - serializes a point using the json serializer

func (*Manager) SerializeOpenTSDB

func (m *Manager) SerializeOpenTSDB(value float64, timestamp int64, metric string, tags ...interface{}) (string, error)

SerializeOpenTSDB - serializes a point using the opentsdb serializer

func (*Manager) Shutdown

func (m *Manager) Shutdown()

Shutdown - shuts down the transport

func (*Manager) Start

func (m *Manager) Start(manualMode bool) error

Start - starts the manager

func (*Manager) StoreDataToAccumulate added in v1.8.0

func (m *Manager) StoreDataToAccumulate(ttl time.Duration, genericItem interface{}) (string, error)

StoreDataToAccumulate - stores a data to accumulate

func (*Manager) StoreDataToAccumulateJSON added in v1.8.0

func (m *Manager) StoreDataToAccumulateJSON(ttl time.Duration, name string, parameters ...interface{}) (string, error)

StoreDataToAccumulateJSON - stores a data to accumulate

func (*Manager) StoreDataToAccumulateOpenTSDB added in v1.1.0

func (m *Manager) StoreDataToAccumulateOpenTSDB(ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) (string, error)

StoreDataToAccumulateOpenTSDB - stores a data to accumulate

func (*Manager) StoreHashedDataToAccumulateJSON added in v1.8.0

func (m *Manager) StoreHashedDataToAccumulateJSON(hash string, ttl time.Duration, name string, parameters ...interface{}) error

StoreHashedDataToAccumulateJSON - stores a data with custom hash to accumulate

func (*Manager) StoreHashedDataToAccumulateOpenTSDB added in v1.3.0

func (m *Manager) StoreHashedDataToAccumulateOpenTSDB(hash string, ttl time.Duration, value float64, timestamp int64, metric string, tags ...interface{}) error

StoreHashedDataToAccumulateOpenTSDB - stores a data with custom hash to accumulate

type OpenTSDBTransport

type OpenTSDBTransport struct {
	// contains filtered or unexported fields
}

OpenTSDBTransport - implements the openTSDB transport

func NewOpenTSDBTransport

func NewOpenTSDBTransport(configuration *OpenTSDBTransportConfig) (*OpenTSDBTransport, error)

NewOpenTSDBTransport - creates a new openTSDB event manager

func (*OpenTSDBTransport) AccumulatedDataToDataChannelItem added in v1.1.0

func (t *OpenTSDBTransport) AccumulatedDataToDataChannelItem(point *accumulatedData) (interface{}, error)

AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item

func (*OpenTSDBTransport) BuildContextualLogger added in v1.7.0

func (t *OpenTSDBTransport) BuildContextualLogger(path ...string)

BuildContextualLogger - build the contextual logger using more info

func (*OpenTSDBTransport) Close

func (t *OpenTSDBTransport) Close()

Close - closes this transport

func (*OpenTSDBTransport) ConfigureBackend

func (t *OpenTSDBTransport) ConfigureBackend(backend *Backend) error

ConfigureBackend - configures the backend

func (*OpenTSDBTransport) DataChannel

func (t *OpenTSDBTransport) DataChannel(item interface{})

DataChannel - send a new point

func (*OpenTSDBTransport) DataChannelItemToAccumulatedData added in v1.1.0

func (t *OpenTSDBTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConfig, instance interface{}, calculateHash bool) (Hashable, error)

DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data

func (*OpenTSDBTransport) DataChannelItemToFlattenerPoint added in v1.1.0

func (t *OpenTSDBTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConfig, instance interface{}, operation FlatOperation) (Hashable, error)

DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one

func (*OpenTSDBTransport) FlattenerPointToDataChannelItem added in v1.1.0

func (t *OpenTSDBTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)

FlattenerPointToDataChannelItem - converts the flattened point to the data channel one

func (*OpenTSDBTransport) MatchType

func (t *OpenTSDBTransport) MatchType(tt transportType) bool

MatchType - checks if this transport implementation matches the given type

func (*OpenTSDBTransport) SendData added in v1.10.0

func (t *OpenTSDBTransport) SendData() error

SendData - releases the point buffer and send all data

func (*OpenTSDBTransport) Serialize

func (t *OpenTSDBTransport) Serialize(item interface{}) (string, error)

Serialize - renders the text using the configured serializer

func (*OpenTSDBTransport) SerializePayload added in v1.8.0

func (t *OpenTSDBTransport) SerializePayload(dataList []interface{}) (payload []string, err error)

SerializePayload - serializes a list of generic data

func (*OpenTSDBTransport) Start

func (t *OpenTSDBTransport) Start(manualMode bool) error

Start - starts this transport

func (*OpenTSDBTransport) TransferData

func (t *OpenTSDBTransport) TransferData(payload []string) error

TransferData - transfers the data to the backend throught this transport

type OpenTSDBTransportConfig

type OpenTSDBTransportConfig struct {
	DefaultTransportConfig
	ReadBufferSize int            `json:"readBufferSize,omitempty"`
	MaxReadTimeout funks.Duration `json:"maxReadTimeout,omitempty"`
	TCPUDPTransportConfig
}

OpenTSDBTransportConfig - has all opentsdb transport configurations

type TCPUDPTransportConfig added in v1.8.0

type TCPUDPTransportConfig struct {
	ReconnectionTimeout    funks.Duration `json:"reconnectionTimeout,omitempty"`
	MaxReconnectionRetries int            `json:"maxReconnectionRetries,omitempty"`
	DisconnectAfterWrites  bool           `json:"disconnectAfterWrites,omitempty"`
}

TCPUDPTransportConfig - defines some common parameters for a tcp/udp connection

type Transport

type Transport interface {

	// Send - send a new point
	DataChannel(item interface{})

	// ConfigureBackend - configures the backend
	ConfigureBackend(backend *Backend) error

	// TransferData - transfers the data using this specific implementation
	TransferData(payload []string) error

	// SerializePayload - serializes a list of generic data
	SerializePayload(dataList []interface{}) (payload []string, err error)

	// Start - starts this transport (pass true if you want to call SendData() manually)
	Start(manualMode bool) error

	// SendData - releases the point buffer and send all data
	SendData() error

	// Close - closes this transport
	Close()

	// MatchType - checks if this transport implementation matches the given type
	MatchType(tt transportType) bool

	// Serialize - renders the text using the configured serializer
	Serialize(item interface{}) (string, error)

	// DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point
	DataChannelItemToFlattenerPoint(configuration *DataTransformerConfig, item interface{}, operation FlatOperation) (Hashable, error)

	// FlattenerPointToDataChannelItem - converts the flattened point to the data channel item
	FlattenerPointToDataChannelItem(item *FlattenerPoint) (interface{}, error)

	// DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data
	DataChannelItemToAccumulatedData(configuration *DataTransformerConfig, item interface{}, calculateHash bool) (Hashable, error)

	// AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item
	AccumulatedDataToDataChannelItem(item *accumulatedData) (interface{}, error)

	// BuildContextualLogger - build the contextual logger using more info
	BuildContextualLogger(path ...string)
}

Transport - the implementation type to send a event

type UDPTransport added in v1.8.0

type UDPTransport struct {
	// contains filtered or unexported fields
}

UDPTransport - implements the UDP transport

func NewUDPTransport added in v1.8.0

func NewUDPTransport(configuration *UDPTransportConfig, customSerializer serializer.Serializer) (*UDPTransport, error)

NewUDPTransport - creates a new HTTP event manager with a customized serializer

func (*UDPTransport) AccumulatedDataToDataChannelItem added in v1.8.0

func (t *UDPTransport) AccumulatedDataToDataChannelItem(point *accumulatedData) (interface{}, error)

AccumulatedDataToDataChannelItem - converts the accumulated data to the data channel item

func (*UDPTransport) BuildContextualLogger added in v1.8.0

func (t *UDPTransport) BuildContextualLogger(path ...string)

BuildContextualLogger - build the contextual logger using more info

func (*UDPTransport) Close added in v1.8.0

func (t *UDPTransport) Close()

Close - closes this transport

func (*UDPTransport) ConfigureBackend added in v1.8.0

func (t *UDPTransport) ConfigureBackend(backend *Backend) error

ConfigureBackend - configures the backend

func (*UDPTransport) DataChannel added in v1.8.0

func (t *UDPTransport) DataChannel(item interface{})

DataChannel - send a new point

func (*UDPTransport) DataChannelItemToAccumulatedData added in v1.8.0

func (t *UDPTransport) DataChannelItemToAccumulatedData(configuration *DataTransformerConfig, instance interface{}, calculateHash bool) (Hashable, error)

DataChannelItemToAccumulatedData - converts the data channel item to the accumulated data

func (*UDPTransport) DataChannelItemToFlattenerPoint added in v1.8.0

func (t *UDPTransport) DataChannelItemToFlattenerPoint(configuration *DataTransformerConfig, instance interface{}, operation FlatOperation) (Hashable, error)

DataChannelItemToFlattenerPoint - converts the data channel item to the flattened point one

func (*UDPTransport) FlattenerPointToDataChannelItem added in v1.8.0

func (t *UDPTransport) FlattenerPointToDataChannelItem(point *FlattenerPoint) (interface{}, error)

FlattenerPointToDataChannelItem - converts the flattened point to the data channel one

func (*UDPTransport) MatchType added in v1.8.0

func (t *UDPTransport) MatchType(tt transportType) bool

MatchType - checks if this transport implementation matches the given type

func (*UDPTransport) SendData added in v1.10.0

func (t *UDPTransport) SendData() error

SendData - releases the point buffer and send all data

func (*UDPTransport) Serialize added in v1.8.0

func (t *UDPTransport) Serialize(item interface{}) (string, error)

Serialize - renders the text using the configured serializer

func (*UDPTransport) SerializePayload added in v1.8.0

func (t *UDPTransport) SerializePayload(dataList []interface{}) (payload []string, err error)

SerializePayload - serializes a list of generic data

func (*UDPTransport) Start added in v1.8.0

func (t *UDPTransport) Start(manualMode bool) error

Start - starts this transport

func (*UDPTransport) TransferData added in v1.8.0

func (t *UDPTransport) TransferData(payload []string) error

TransferData - transfers the data to the backend throught this transport

type UDPTransportConfig added in v1.8.0

UDPTransportConfig - has all udp transport configurations

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL