series

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 5, 2023 License: Apache-2.0 Imports: 29 Imported by: 2

Documentation

Overview

Package series provides tools for collecting and aggregating timeseries events as part of the logging infrastructure.

The series "system" includes a few basic types and concepts: an "event" which is a single data point, a Metric which is a single series of datapoints, and a Collector which is responsible for tracking and publishing metrics.

In general, as a developer, to use grip/series for your metrics: you configure a series.Collector, and embed it in your grip sending pipeline, and then embed metric events in your message.

The x/metrics package contains message types that use github.com/shirou/gopsutil to collect and generate structred logging messages with metrics information. These tools also integrate with the `tychoish/birch` bson library and it's `birch/x/ftdc` timeseries compression format. Additionally, `bson` formatted output renders for metric events are also provided here.

WARNING: This implementation is alpha quality at the moment. Pull requests welcome.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GoRuntimeEventProducer

func GoRuntimeEventProducer(labels ...dt.Pair[string, string]) fun.Producer[[]*Event]

func RenderHistogramJSON

func RenderHistogramJSON(
	buf *bytes.Buffer,
	key string,
	labels fun.Future[*dt.Pairs[string, string]],
	sample *dt.Pairs[float64, int64],
	ts time.Time,
)

func RenderMetricGraphite

func RenderMetricGraphite(buf *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time)

func RenderMetricJSON

func RenderMetricJSON(buf *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time)

func RenderMetricOpenTSB

func RenderMetricOpenTSB(buf *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time)

func Sender

func Sender(s send.Sender, coll *Collector) send.Sender

Sender wraps a send.Sender and a collector and unifies them: if there are any events wrapped or embedded in the message the sender extracts them, propagating the events to the collector and then separately passing the message along to the underlying sender.

The events are, typically, not part of the message sent to the underlying sender: while Events do have a string form that can be logged, and most senders will handle them appropriately, events are not logged with *this* sender.

func WithMetrics

func WithMetrics(c any, events ...*Event) message.Composer

WithMetrics inspects a value that might have *Event, (or related types, including functions that produce events and slices of events) embedded in them.

Types

type Collector

type Collector struct {
	CollectorConf
	// contains filtered or unexported fields
}

Collector maintains the local state of collected metrics: metric series are registered lazily when they are first sent, and the collector tracks the value and is responsible for orchestrating.

func NewCollector

func NewCollector(ctx context.Context, opts ...CollectorOptionProvider) (*Collector, error)

NewCollector constructs a collector service that is responsible for collecting and distributing metric events. There are several basic modes of operation:

- Embedded: Use series.Sender to create in a grip/send.Sender: here the collector wraps the sender and intercepts events from normal logger messages. The series.WithMetrics helper can attach metrics.

- Directly: You can use the Push/Publish/Stream/PushEvent methods to send events to the collector.

- Background: Using the Register() method you can add a function to the Collector which will collect its result and distribute them on the provided backend.

Output from a collector is managed by CollectorBackends, which may be implemented externally (a backend is a fun.Processor function that consumes (and processes!) fun.Iterator[series.MetricPublisher] objects. Metrics publishers, then are closures that write the metrics format to an io.Writer, while the formatting of a message is controlled by the <>Renderer function in the Collector configuration.

func (*Collector) Close

func (c *Collector) Close() error

func (*Collector) Iterator

func (c *Collector) Iterator() *fun.Iterator[MetricSnapshot]

Iterator iterates through every metric and label combination, and takes a (rough) snapshot of each metric. Rough only because the timestamps and last metric may not always be (exactly) synchronized with regards to eachother.

func (*Collector) Publish

func (c *Collector) Publish(events []*Event)

func (*Collector) Push

func (c *Collector) Push(events ...*Event)

func (*Collector) PushEvent

func (c *Collector) PushEvent(e *Event)

func (*Collector) Register

func (c *Collector) Register(prod fun.Producer[[]*Event], dur time.Duration)

Register runs an event producing function,

func (*Collector) Stream

func (c *Collector) Stream(
	iter *fun.Iterator[*Event],
	opts ...fun.OptionProvider[*fun.WorkerGroupConf],
) fun.Worker

type CollectorBackend

type CollectorBackend fun.Processor[*fun.Iterator[MetricPublisher]]

func LoggerBackend

func LoggerBackend(sender send.Sender, r Renderer) CollectorBackend

func PassthroughBackend added in v0.3.6

func PassthroughBackend(r Renderer, handler fun.Handler[string], opts ...fun.OptionProvider[*fun.WorkerGroupConf]) CollectorBackend

func (CollectorBackend) Worker

type CollectorBackendFileConf

type CollectorBackendFileConf struct {
	Directory      string
	FilePrefix     string
	Extension      string
	CounterPadding int
	Megabytes      int
	Gzip           bool
	Renderer       Renderer `json:"-" yaml:"-" db:"-" bson:"-"`
}

func (*CollectorBackendFileConf) RotatingFilePath

func (conf *CollectorBackendFileConf) RotatingFilePath() fun.Producer[string]

func (*CollectorBackendFileConf) RotatingFileProducer

func (conf *CollectorBackendFileConf) RotatingFileProducer() fun.Producer[io.WriteCloser]

func (*CollectorBackendFileConf) Validate

func (conf *CollectorBackendFileConf) Validate() error

type CollectorBackendSocketConf

type CollectorBackendSocketConf struct {
	Dialer  net.Dialer
	Network string // tcp or udb
	Address string

	DialWorkers       int
	IdleConns         int
	MinDialRetryDelay time.Duration
	MaxDialRetryDelay time.Duration
	DialErrorHandling CollectorBackendSocketErrorOption

	MessageWorkers       int
	NumMessageRetries    int
	MinMessageRetryDelay time.Duration
	MaxMessageRetryDelay time.Duration
	MessageErrorHandling CollectorBackendSocketErrorOption

	Renderer Renderer
}

func (*CollectorBackendSocketConf) Validate

func (conf *CollectorBackendSocketConf) Validate() error

type CollectorBackendSocketErrorOption

type CollectorBackendSocketErrorOption int8
const (
	CollectorBackendSocketErrorINVALID CollectorBackendSocketErrorOption = iota
	CollectorBackendSocketErrorAbort
	CollectorBackendSocketErrorContinue
	CollectorBackendSocketErrorCollect
	CollectorBackendSocketErrorPanic
	CollectorBackendSocketErrorUNSPECIFIED
)

func (CollectorBackendSocketErrorOption) Validate

type CollectorBakendFileOptionProvider

type CollectorBakendFileOptionProvider = fun.OptionProvider[*CollectorBackendFileConf]

func CollectorBackendFileConfCounterPadding

func CollectorBackendFileConfCounterPadding(v int) CollectorBakendFileOptionProvider

func CollectorBackendFileConfDirectory

func CollectorBackendFileConfDirectory(path string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfExtension

func CollectorBackendFileConfExtension(ext string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfPrefix

func CollectorBackendFileConfPrefix(prefix string) CollectorBakendFileOptionProvider

func CollectorBackendFileConfRotationSizeMB

func CollectorBackendFileConfRotationSizeMB(v int) CollectorBakendFileOptionProvider

func CollectorBackendFileConfWithRenderer added in v0.3.6

func CollectorBackendFileConfWithRenderer(r Renderer) CollectorBakendFileOptionProvider

type CollectorBakendSocketOptionProvider

type CollectorBakendSocketOptionProvider = fun.OptionProvider[*CollectorBackendSocketConf]

func CollectorBackendSocketConfAddress

func CollectorBackendSocketConfAddress(addr string) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfDialWorkers

func CollectorBackendSocketConfDialWorkers(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfDialer

func CollectorBackendSocketConfDialer(d net.Dialer) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfIdleConns

func CollectorBackendSocketConfIdleConns(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMaxDialRetryDelay

func CollectorBackendSocketConfMaxDialRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMaxMessageRetryDelay

func CollectorBackendSocketConfMaxMessageRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMessageWorkers

func CollectorBackendSocketConfMessageWorkers(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMinDialRetryDelay

func CollectorBackendSocketConfMinDialRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfMinMessageRetryDelay

func CollectorBackendSocketConfMinMessageRetryDelay(d time.Duration) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNetowrkTCP

func CollectorBackendSocketConfNetowrkTCP() CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNetowrkUDP

func CollectorBackendSocketConfNetowrkUDP() CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfNumMessageRetries

func CollectorBackendSocketConfNumMessageRetries(n int) CollectorBakendSocketOptionProvider

func CollectorBackendSocketConfWithRenderer added in v0.3.6

func CollectorBackendSocketConfWithRenderer(r Renderer) CollectorBakendSocketOptionProvider

type CollectorConf

type CollectorConf struct {
	Backends      []CollectorBackend
	BrokerOptions pubsub.BrokerOptions
	Buffer        int
}

func (*CollectorConf) Validate

func (conf *CollectorConf) Validate() error

type CollectorOptionProvider

type CollectorOptionProvider = fun.OptionProvider[*CollectorConf]

func CollectorConfAppendBackends

func CollectorConfAppendBackends(bs ...CollectorBackend) CollectorOptionProvider

func CollectorConfBuffer

func CollectorConfBuffer(size int) CollectorOptionProvider

func CollectorConfFileBackend added in v0.3.6

func CollectorConfFileBackend(opts *CollectorBackendFileConf) CollectorOptionProvider

func CollectorConfSet

func CollectorConfSet(c *CollectorConf) CollectorOptionProvider

func CollectorConfWithFileBacked added in v0.3.6

func CollectorConfWithFileBacked(opts ...CollectorBakendFileOptionProvider) CollectorOptionProvider

func CollectorConfWithLoggerBackend

func CollectorConfWithLoggerBackend(sender send.Sender, r Renderer) CollectorOptionProvider

type Event

type Event struct {
	// contains filtered or unexported fields
}

func Extract

func Extract(c any) []*Event

Extract takes an arbitrary object and attempts to introspect it to find events.

func (*Event) Export

func (e *Event) Export() Record

func (*Event) MarshalJSON

func (e *Event) MarshalJSON() ([]byte, error)

func (*Event) String

func (e *Event) String() string

type EventExtractor

type EventExtractor interface {
	Events() []*Event
}

EventExtractor is a type that is implementable by arbitrary types to create events.

type HistogramConf

type HistogramConf struct {
	Min               int64
	Max               int64
	SignificantDigits int
	Quantiles         []float64
	OutOfRange        HistogramOutOfRangeOption
	Interval          time.Duration
}

func MakeDefaultHistogramConf

func MakeDefaultHistogramConf() *HistogramConf

func (*HistogramConf) Apply

func (conf *HistogramConf) Apply(opts ...HistogramOptionProvider) error

func (*HistogramConf) Validate

func (conf *HistogramConf) Validate() error

type HistogramOptionProvider

type HistogramOptionProvider = fun.OptionProvider[*HistogramConf]

func HistogramConfBounds

func HistogramConfBounds(min, max int64) HistogramOptionProvider

func HistogramConfInterval

func HistogramConfInterval(dur time.Duration) HistogramOptionProvider

func HistogramConfLowerBound

func HistogramConfLowerBound(in int64) HistogramOptionProvider

func HistogramConfReset

func HistogramConfReset() HistogramOptionProvider

func HistogramConfSet

func HistogramConfSet(arg *HistogramConf) HistogramOptionProvider

func HistogramConfSetQuantiles

func HistogramConfSetQuantiles(quant []float64) HistogramOptionProvider

func HistogramConfSignifcantDigits

func HistogramConfSignifcantDigits(in int) HistogramOptionProvider

func HistogramConfUpperBound

func HistogramConfUpperBound(in int64) HistogramOptionProvider

type HistogramOutOfRangeOption

type HistogramOutOfRangeOption int8
const (
	HistogramOutOfRangeINVALID HistogramOutOfRangeOption = iota
	HistogramOutOfRangePanic
	HistogramOutOfRangeIgnore
	HistogramOutOfRangeTruncate
	HistogramOutOfRangeUNSPECIFIED
)

type Metric

type Metric struct {
	ID   string
	Type MetricType
	// contains filtered or unexported fields
}

func Collect

func Collect(id string) *Metric

func Counter

func Counter(id string) *Metric

func Delta

func Delta(id string) *Metric

func Gauge

func Gauge(id string) *Metric

func Histogram

func Histogram(id string, opts ...HistogramOptionProvider) *Metric

func (*Metric) Add

func (m *Metric) Add(v int64) *Event

func (*Metric) Annotate

func (m *Metric) Annotate(pairs ...dt.Pair[string, string]) *Metric

func (*Metric) Collect

func (m *Metric) Collect(fn fun.Future[int64]) *Event

func (*Metric) CollectAdd

func (m *Metric) CollectAdd(fn fun.Future[int64]) *Event

func (*Metric) Dec

func (m *Metric) Dec() *Event

func (*Metric) Equal

func (m *Metric) Equal(two *Metric) bool

func (*Metric) Inc

func (m *Metric) Inc() *Event

func (*Metric) Label

func (m *Metric) Label(k, v string) *Metric

func (*Metric) Labels

func (m *Metric) Labels(set *dt.Set[dt.Pair[string, string]]) *Metric

func (*Metric) MetricType

func (m *Metric) MetricType(t MetricType) *Metric

func (*Metric) Periodic

func (m *Metric) Periodic(dur time.Duration) *Metric

Periodic sets an interval for the metrics to be reported: new events aren't reported for this metric (id+labels) regardless of periodic being set on future matching events, but the periodic reporting remains.

This periodicity only refers to the _reporting_ of the event, not the collection of the event. Register a fun.Producer[[]*Events] on the series.Collector for periodic collection.

func (*Metric) Set

func (m *Metric) Set(v int64) *Event

type MetricHistogramRenderer

type MetricHistogramRenderer func(
	wr *bytes.Buffer,
	key string,
	labels fun.Future[*dt.Pairs[string, string]],
	sample *dt.Pairs[float64, int64],
	ts time.Time,
)

func MakeDefaultHistogramMetricRenderer

func MakeDefaultHistogramMetricRenderer(mr MetricValueRenderer) MetricHistogramRenderer

type MetricMessage

type MetricMessage struct {
	message.Composer
	Events []*Event
}

MetricMessage is a collection of events and a message.Composer object that can be used as a message.Composer but that also contains some number of events.

func Message

func Message(m message.Composer, events ...*Event) *MetricMessage

Message is a simple constructor around *MetricMessage (which implements message.Composer) and includes a slice of event pointers.

func (*MetricMessage) Raw

func (m *MetricMessage) Raw() any

func (*MetricMessage) String

func (m *MetricMessage) String() string

func (*MetricMessage) Structured

func (m *MetricMessage) Structured() bool

type MetricPublisher

type MetricPublisher func(io.Writer, Renderer) error

type MetricSnapshot

type MetricSnapshot struct {
	Name      string
	Labels    string
	Value     int64
	Timestamp time.Time
}

MetricSnapshot is the export format for a metric series at a given point of time.

type MetricType

type MetricType string

MetricType determines the kind of metric, in particular how the state of the metric is tracked over the lifetime of the application.

const (
	// MetricTypeDeltas represents an integer/numeric value that
	// is rendered as deltas: the difference since the last time
	// the metric was reported.
	MetricTypeDeltas MetricType = "deltas"
	// MetricTypeCounter represents an incrementing metric that
	// increases over the lifetime of the process' lifespan. When
	// reported, the total value of the counter is displayed.
	MetricTypeCounter   MetricType = "counter"
	MetricTypeGuage     MetricType = "gauge"
	MetricTypeHistogram MetricType = "histogram"
)

type MetricValueRenderer

type MetricValueRenderer func(writer *bytes.Buffer, key string, labels fun.Future[*dt.Pairs[string, string]], value int64, ts time.Time)

MetricValueRenderer takes an event and writes the output to a buffer. This provides the ability to add support arbitrary output formats and targets via dependency injection.

type Record

type Record struct {
	ID     string                    `bson:"metric" json:"metric" yaml:"metric"`
	Value  int64                     `bson:"Value" json:"Value" yaml:"Value"`
	Labels *dt.Pairs[string, string] `bson:"labels" json:"labels" yaml:"labels"`
}

type Renderer added in v0.3.6

type Renderer struct {
	Metric    MetricValueRenderer
	Histogram MetricHistogramRenderer
}

func MakeGraphiteRenderer added in v0.3.6

func MakeGraphiteRenderer() Renderer

func MakeJSONRenderer added in v0.3.6

func MakeJSONRenderer() Renderer

func MakeOpenTSBLineRenderer added in v0.3.6

func MakeOpenTSBLineRenderer() Renderer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL