v0.2.1 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2015 License: MIT, MIT Imports: 13 Imported by: 0




View Source
const (
	DefaultRecomputePreviousN = 2

	DefaultRecomputeNoOlderThan = 10 * time.Minute

	DefaultComputeRunsPerInterval = 10

	DefaultComputeNoMoreThan = 2 * time.Minute
View Source
const (
	// When planning a select statement, passing zero tells it not to chunk results. Only applies to raw queries
	NoChunkingSize = 0


This section is empty.


This section is empty.


type Config

type Config struct {
	// Enables logging in CQ service to display when CQ's are processed and how many points are wrote.
	LogEnabled bool `toml:"log-enabled"`

	// If this flag is set to false, both the brokers and data nodes should ignore any CQ processing.
	Enabled bool `toml:"enabled"`

	// when continuous queries are run we'll automatically recompute previous intervals
	// in case lagged data came in. Set to zero if you never have lagged data. We do
	// it this way because invalidating previously computed intervals would be insanely hard
	// and expensive.
	RecomputePreviousN int `toml:"recompute-previous-n"`

	// The RecomputePreviousN setting provides guidance for how far back to recompute, the RecomputeNoOlderThan
	// setting sets a ceiling on how far back in time it will go. For example, if you have 2 PreviousN
	// and have this set to 10m, then we'd only compute the previous two intervals for any
	// CQs that have a group by time <= 5m. For all others, we'd only recompute the previous window
	RecomputeNoOlderThan toml.Duration `toml:"recompute-no-older-than"`

	// ComputeRunsPerInterval will determine how many times the current and previous N intervals
	// will be computed. The group by time will be divided by this and it will get computed  this many times:
	// group by time seconds / runs per interval
	// This will give partial results for current group by intervals and will determine how long it will
	// be until lagged data is recomputed. For example, if this number is 10 and the group by time is 10m, it
	// will be a minute past the previous 10m bucket of time before lagged data is picked up
	ComputeRunsPerInterval int `toml:"compute-runs-per-interval"`

	// ComputeNoMoreThan paired with the RunsPerInterval will determine the ceiling of how many times smaller
	// group by times will be computed. For example, if you have RunsPerInterval set to 10 and this setting
	// to 1m. Then for a group by time(1m) will actually only get computed once per interval (and once per PreviousN).
	// If you have a group by time(5m) then you'll get five computes per interval. Any group by time window larger
	// than 10m will get computed 10 times for each interval.
	ComputeNoMoreThan toml.Duration `toml:"compute-no-more-than"`

Config represents a configuration for the continuous query service.

func NewConfig

func NewConfig() Config

NewConfig returns a new instance of Config with defaults.

type ContinuousQuerier

type ContinuousQuerier interface {
	// Run executes the named query in the named database.  Blank database or name matches all.
	Run(database, name string, t time.Time) error

ContinuousQuerier represents a service that executes continuous queries.

type ContinuousQuery

type ContinuousQuery struct {
	Database string
	Info     *meta.ContinuousQueryInfo
	LastRun  time.Time
	// contains filtered or unexported fields

ContinuousQuery is a local wrapper / helper around continuous queries.

func NewContinuousQuery

func NewContinuousQuery(database string, cqi *meta.ContinuousQueryInfo) (*ContinuousQuery, error)

NewContinuousQuery returns a ContinuousQuery object with a parsed influxql.CreateContinuousQueryStatement

type RunRequest added in v0.2.0

type RunRequest struct {
	// Now tells the CQ serivce what the current time is.
	Now time.Time
	// CQs tells the CQ service which queries to run.
	// If nil, all queries will be run.
	CQs []string

RunRequest is a request to run one or more CQs.

type Service

type Service struct {
	MetaStore     metaStore
	QueryExecutor queryExecutor
	Config        *Config
	RunInterval   time.Duration
	// RunCh can be used by clients to signal service to run CQs.
	RunCh  chan *RunRequest
	Logger *log.Logger
	// contains filtered or unexported fields

Service manages continuous query execution.

func NewService

func NewService(c Config) *Service

NewService returns a new instance of Service.

func (*Service) Close

func (s *Service) Close() error

Close stops the service.

func (*Service) ExecuteContinuousQuery

func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.ContinuousQueryInfo, now time.Time) error

ExecuteContinuousQuery executes a single CQ.

func (*Service) Open

func (s *Service) Open() error

Open starts the service.

func (*Service) Run

func (s *Service) Run(database, name string, t time.Time) error

Run runs the specified continuous query, or all CQs if none is specified.

func (*Service) SetLogger

func (s *Service) SetLogger(l *log.Logger)

SetLogger sets the internal logger to the logger passed in.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL