hammy

package
v0.0.0-...-374fadf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2013 License: BSD-3-Clause Imports: 26 Imported by: 0

Documentation

Overview

Trigger processor for monitoring data

Index

Constants

View Source
const CouchbaseDataBucketQuantum = 7200 // 2 hours

Variables

This section is empty.

Functions

func CouchbaseSaverBucketKey

func CouchbaseSaverBucketKey(hostKey string, itemKey string, timestamp uint64) string

func SetConfigDefaults

func SetConfigDefaults(cfg *Config) error

Setup defaults for empty values in configs Returns an error if mandatory field omited

func StartHttp

func StartHttp(rh RequestHandler, cfg Config, metricsNamespace string) error

Start http interface and lock goroutine untill fatal error

func StatesEq

func StatesEq(a *State, b *State) bool

Compare two States true if equal, false otherwise

Types

type Cmd

type Cmd struct {
	Cmd     string
	Options map[string]interface{}
}

Command

type CmdBuffer

type CmdBuffer []Cmd

Commads queue from trigger

func NewCmdBuffer

func NewCmdBuffer(size uint32) *CmdBuffer

Create new CmdBuffer

type CmdBufferProcessor

type CmdBufferProcessor interface {
	Process(key string, cmdb *CmdBuffer) error
}

Iterface for command commiter Returns data for next processing stage or error

type CmdBufferProcessorImpl

type CmdBufferProcessorImpl struct {
	// Send buffer
	SBuffer SendBuffer
	// Data saver
	Saver SendBuffer
	// FIXME Other commands
	CmdOutput io.Writer
}

func (*CmdBufferProcessorImpl) Process

func (cbp *CmdBufferProcessorImpl) Process(key string, cmdb *CmdBuffer) error

type Config

type Config struct {
	// Http interface for incoming data
	IncomingHttp struct {
		// Addr for incomming-data
		// e.g. "0.0.0.0:4000" or ":4000" for ipv6
		Addr string
	}
	// Http interface for data requests
	DataHttp struct {
		// Addr for data request
		// e.g. "0.0.0.0:4000" or ":4000" for ipv6
		Addr string
		// Http prefix
		// e.g. /data (default)
		Prefix string
	}
	// Logging options
	Log struct {
		// Files for logging (stderr if empty)
		// For hammyd daemon
		HammyDFile string
		// For hammydatad daemon
		HammyDataDFile string
		// FIXME CmdOutput
		CmdOutputFile string
	}
	// Debug and statistics
	Debug struct {
		// Addrs for debug and statistic information
		// e.g. "localhost:6060" (default)
		// For hammyd daemon
		HammyDAddr string
		// For hammydatad daemon
		HammyDataDAddr string
	}
	// Workers
	Workers struct {
		// Count of workers
		PoolSize uint
		// Worker cmd
		CmdLine string
		// Worker live limit
		MaxIter uint
		// Worker timeout (before kill)
		Timeout uint
	}
	// Send buffer
	SendBuffer struct {
		SleepTime float32
	}
	// Coucbase for triggers configuration
	CouchbaseTriggers struct {
		// Use this implementation
		Active bool
		// e.g. "http://dev-couchbase.example.com:8091/"
		ConnectTo string
		// e.g. "default"
		Pool string
		// e.g. "default"
		Bucket string
	}
	// Coucbase for state storage
	CouchbaseStates struct {
		// Use this implementation
		Active bool
		// e.g. "http://dev-couchbase.example.com:8091/"
		ConnectTo string
		// e.g. "default"
		Pool string
		// e.g. "default"
		Bucket string
		// TTL in seconds, default 86400 (day)
		Ttl int
	}
	// Data saver
	CouchbaseSaver struct {
		// Use this implementation
		Active bool
		// e.g. "http://dev-couchbase.example.com:8091/"
		ConnectTo string
		// e.g. "default"
		Pool string
		// e.g. "default"
		Bucket string
		// Internal write queue size
		QueueSize uint
		// Connections for saving
		SavePoolSize uint
		// TTL in seconds, default 259200 (3 days)
		Ttl int
	}
	// Data reader
	CouchbaseDataReader struct {
		// Use this implementation
		Active bool
		// e.g. "http://dev-couchbase.example.com:8091/"
		ConnectTo string
		// e.g. "default"
		Pool string
		// e.g. "default"
		Bucket string
	}
	// MySQL for triggers configuration
	MySQLTriggers struct {
		// Use this implementation
		Active bool
		// Database to connect
		Database string
		// DB user
		User string
		// DB user password
		Password string
		// table that contains triggers (host, trigger)
		Table string
		// Limit for parallel connections
		MaxConn int
	}
	// MySQL for states
	MySQLStates struct {
		// Use this implementation
		Active bool
		// Database to connect
		Database string
		// DB user
		User string
		// DB user password
		Password string
		// table that contains states (host, state, cas)
		Table string
		// Limit for parallel connections
		MaxConn int
	}
	// MySQL historical data saver
	MySQLSaver struct {
		// Use this implementation
		Active bool
		// Database to connect
		Database string
		// DB user
		User string
		// DB user password
		Password string
		// table that contains numeric history
		Table string
		// table that contains text history
		LogTable string
		// table that contains hosts
		HostTable string
		// table that contains items
		ItemTable string
		// Limit for parallel connections
		MaxConn int
	}
	// MySQL historical data reader
	MySQLDataReader struct {
		// Use this implementation
		Active bool
		// Database to connect
		Database string
		// DB user
		User string
		// DB user password
		Password string
		// table that contains numeric history
		Table string
		// table that contains text history
		LogTable string
		// table that contains hosts
		HostTable string
		// table that contains items
		ItemTable string
		// Limit for parallel connections
		MaxConn int
	}
}

Programm configuration

type CouchbaseDataReader

type CouchbaseDataReader struct {
	// contains filtered or unexported fields
}

Reads data from write cache (couchbase-based)

func NewCouchbaseDataReader

func NewCouchbaseDataReader(cfg Config) (*CouchbaseDataReader, error)

Create new saver

func (*CouchbaseDataReader) Read

func (cr *CouchbaseDataReader) Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)

type CouchbaseSaver

type CouchbaseSaver struct {
	Ttl uint32
	// contains filtered or unexported fields
}

Saves historical data to write chache (based on couchbase)

func NewCouchbaseSaver

func NewCouchbaseSaver(cfg Config) (*CouchbaseSaver, error)

Create new saver

func (*CouchbaseSaver) Push

func (s *CouchbaseSaver) Push(data *IncomingData)

Enqueue data for saving

type CouchbaseStateKeeper

type CouchbaseStateKeeper struct {
	Client *couchbase.Client
	Pool   *couchbase.Pool
	Bucket *couchbase.Bucket
	Ttl    uint32
}

func NewCouchbaseStateKeeper

func NewCouchbaseStateKeeper(cfg Config) (*CouchbaseStateKeeper, error)

func (*CouchbaseStateKeeper) Get

func (*CouchbaseStateKeeper) MGet

func (sk *CouchbaseStateKeeper) MGet(keys []string) (states map[string]StateKeeperAnswer)

func (*CouchbaseStateKeeper) Set

func (sk *CouchbaseStateKeeper) Set(key string, data State, cas *uint64) (retry bool, err error)

type CouchbaseTriggersGetter

type CouchbaseTriggersGetter struct {
	Client *couchbase.Client
	Pool   *couchbase.Pool
	Bucket *couchbase.Bucket
}

func NewCouchbaseTriggersGetter

func NewCouchbaseTriggersGetter(cfg Config) (*CouchbaseTriggersGetter, error)

func (*CouchbaseTriggersGetter) MGet

func (tg *CouchbaseTriggersGetter) MGet(keys []string) (triggers map[string]string, err error)

type CounterMetric

type CounterMetric struct {
	// contains filtered or unexported fields
}

func (*CounterMetric) Add

func (m *CounterMetric) Add(n uint64)

type DataReader

type DataReader interface {
	Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)
}

Reads data from write cache or storage

type DataTimeSorter

type DataTimeSorter struct {
	Data *[]IncomingValueData
}

Struct for sorting []IncomingValueData slice by Timestamp

func (*DataTimeSorter) Len

func (ds *DataTimeSorter) Len() int

func (*DataTimeSorter) Less

func (ds *DataTimeSorter) Less(i, j int) bool

func (*DataTimeSorter) Swap

func (ds *DataTimeSorter) Swap(i, j int)

type Executer

type Executer interface {
	// Process trigger for one host
	ProcessTrigger(key string, trigger string, state *State,
		data IncomingHostData) (newState *State, cmdb *CmdBuffer, err error)
}

Interface for trigger executer

type HttpServer

type HttpServer struct {
	// Request handler  object
	RHandler RequestHandler
	// contains filtered or unexported fields
}

Http server object InitMetric must be called before use

func (*HttpServer) InitMetrics

func (h *HttpServer) InitMetrics(metricsNamespace string)

Initialize metric objects

func (*HttpServer) ServeHTTP

func (h *HttpServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

Request handler

type IncomingData

type IncomingData map[string]IncomingHostData

Type for incoming monitoring data Format (in json notation):

{
  "host1": {
    "key1.1": [{
      "Timestamp": 1361785778,
      "Value": 3.14
    }]
  },
  "host2": {
    "key2.1": [{
      "Timestamp": 1361785817,
      "Value": "test string"
    }],
    "key2.2": [{
      "Timestamp": 1361785858,
      "Value": 12345
    },
    {
      "Timestamp": 1361785927,
      "Value": 999.3
    }]
  }
}

type IncomingHostData

type IncomingHostData map[string][]IncomingValueData

type IncomingMessage

type IncomingMessage struct {
	// Incoming monitoring data
	Data IncomingData
	// Processing level (0 for new data)
	// Increments after each resend
	Level uint32
}

Type for incoming monitoring data request

type IncomingValueData

type IncomingValueData struct {
	Timestamp uint64
	Value     interface{}
}

type MetricSet

type MetricSet struct {
	// contains filtered or unexported fields
}

Namespaced set of metrics

func NewMetricSet

func NewMetricSet(namespace string, tickTime time.Duration) *MetricSet

Creates new metric set or panic if namespace exists

func (*MetricSet) NewCounter

func (ms *MetricSet) NewCounter(name string) *CounterMetric

func (*MetricSet) NewTimer

func (ms *MetricSet) NewTimer(name string) *TimerMetric

type MetricType

type MetricType int
const (
	METRIC_COUNTER MetricType = iota
	METRIC_TIMER
)

type MySQLDataReader

type MySQLDataReader struct {
	// contains filtered or unexported fields
}

Driver for retriving historical data from MySQL database See mysql_saver.go for details about schema

func NewMySQLDataReader

func NewMySQLDataReader(cfg Config) (dr *MySQLDataReader, err error)

func (*MySQLDataReader) Read

func (dr *MySQLDataReader) Read(hostKey string, itemKey string, from uint64, to uint64) (data []IncomingValueData, err error)

type MySQLSaver

type MySQLSaver struct {
	// contains filtered or unexported fields
}

Driver for saving historical data in MySQL database It's assumes the tables structure like this:

CREATE TABLE `history_host` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `by_name` (`name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `history_item` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `host_id` int(11) NOT NULL,
  `name` varchar(255) NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `by_name` (`host_id`, `name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `history` (
  `item_id` int(11) NOT NULL,
  `timestamp` DATETIME NOT NULL,
  `value` DOUBLE NOT NULL,
  PRIMARY KEY (`item_id`, `timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `history_log` (
  `item_id` int(11) NOT NULL,
  `timestamp` DATETIME NOT NULL,
  `value` TEXT NOT NULL,
  PRIMARY KEY (`item_id`, `timestamp`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

func NewMySQLSaver

func NewMySQLSaver(cfg Config) (s *MySQLSaver, err error)

func (*MySQLSaver) Push

func (s *MySQLSaver) Push(data *IncomingData)

type MySQLStateKeeper

type MySQLStateKeeper struct {
	// contains filtered or unexported fields
}

Driver for retriving and saving state in MySQL database It's assumes the table structure like this:

CREATE TABLE `states` (
  `host` varchar(255) NOT NULL,
  `state` text,
  `cas` BIGINT NOT NULL DEFAULT 0,
  PRIMARY KEY (`host`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

func NewMySQLStateKeeper

func NewMySQLStateKeeper(cfg Config) (sk *MySQLStateKeeper, err error)

func (*MySQLStateKeeper) Get

func (sk *MySQLStateKeeper) Get(key string) (ans StateKeeperAnswer)

func (*MySQLStateKeeper) MGet

func (sk *MySQLStateKeeper) MGet(keys []string) (states map[string]StateKeeperAnswer)

func (*MySQLStateKeeper) Set

func (sk *MySQLStateKeeper) Set(key string, data State, cas *uint64) (retry bool, err error)

type MySQLTriggersGetter

type MySQLTriggersGetter struct {
	// contains filtered or unexported fields
}

Driver for retriving triggers from MySQL database It's assumes the table structure like this:

CREATE TABLE `triggers` (
  `host` varchar(255) NOT NULL,
  `trigger` text,
  PRIMARY KEY (`host`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

func NewMySQLTriggersGetter

func NewMySQLTriggersGetter(cfg Config) (tg *MySQLTriggersGetter, err error)

func (*MySQLTriggersGetter) MGet

func (tg *MySQLTriggersGetter) MGet(keys []string) (triggers map[string]string, err error)

type RequestHandler

type RequestHandler interface {
	Handle(data IncomingData) map[string]error
}

Interface for incoming data handler

type RequestHandlerImpl

type RequestHandlerImpl struct {
	// Interface for triggers retriving
	TGetter TriggersGetter
	// Interface for state storage
	SKeeper StateKeeper
	// Interface for executer
	Exec Executer
	// Interface for command commiter
	CBProcessor CmdBufferProcessor
	// contains filtered or unexported fields
}

Main data processor implementation You must call InitMetrics before use new object

func (*RequestHandlerImpl) Handle

func (rh *RequestHandlerImpl) Handle(data IncomingData) (errs map[string]error)

func (*RequestHandlerImpl) InitMetrics

func (rh *RequestHandlerImpl) InitMetrics(metricsNamespace string)

Initializes statistical metrics

type ResponseMessage

type ResponseMessage struct {
	Errors map[string]string
}

Response

type SPExecuter

type SPExecuter struct {
	// contains filtered or unexported fields
}

Executer implementation for subprocesses with MessagePack-based RPC

func NewSPExecuter

func NewSPExecuter(cfg Config, metricNamespace string) *SPExecuter

Create new instance of SPExecutor per process

func (*SPExecuter) ProcessTrigger

func (e *SPExecuter) ProcessTrigger(key string, trigger string, state *State,
	data IncomingHostData) (newState *State, cmdb *CmdBuffer, err error)

type SendBuffer

type SendBuffer interface {
	Push(data *IncomingData)
}

Interface for internal sendbuffers and for external data savers

type SendBufferImpl

type SendBufferImpl struct {
	// contains filtered or unexported fields
}

Buffer for reprocessed data

func NewSendBufferImpl

func NewSendBufferImpl(rh RequestHandler, cfg Config, metricsNamespace string) (sb *SendBufferImpl)

Creates and initialize new SendBuffer

func (*SendBufferImpl) Listen

func (sb *SendBufferImpl) Listen()

Locks and begins data processing

func (*SendBufferImpl) Push

func (sb *SendBufferImpl) Push(data *IncomingData)

Enqueue data for reprocessing

type State

type State map[string]StateElem

Type for state of an host maps keys to values with timestamps of last update Format (in json notation):

{
  "key1": {
    "Value": 10.3,
    "LastUpdate": 1361785927
  },
  "key2": {
    "Value": "booo!",
    "LastUpdate": 1361785778
  }

func NewState

func NewState() *State

type StateElem

type StateElem struct {
	LastUpdate uint64
	Value      interface{}
}

Type for State element

type StateKeeper

type StateKeeper interface {
	Get(key string) StateKeeperAnswer
	MGet(keys []string) map[string]StateKeeperAnswer
	Set(key string, data State, cas *uint64) (retry bool, err error)
}

Interface for state storage

type StateKeeperAnswer

type StateKeeperAnswer struct {
	State
	Cas *uint64
	Err error
}

Answer of StateKeeper's get requests

type TimerMetric

type TimerMetric struct {
	// contains filtered or unexported fields
}

func (*TimerMetric) Add

func (m *TimerMetric) Add(τ time.Duration)

func (*TimerMetric) NewObservation

func (m *TimerMetric) NewObservation() *TimerMetricObservation

type TimerMetricObservation

type TimerMetricObservation struct {
	// contains filtered or unexported fields
}

func (*TimerMetricObservation) End

func (τ *TimerMetricObservation) End()

type TriggersGetter

type TriggersGetter interface {
	MGet(keys []string) (triggers map[string]string, err error)
}

Interface for trigger configuration

type WorkerProcessInput

type WorkerProcessInput struct {
	Hostname string
	Trigger  string
	State    *State
	IData    IncomingHostData
}

type WorkerProcessOutput

type WorkerProcessOutput struct {
	CmdBuffer *CmdBuffer
	State     *State
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL