Version: v0.12.0 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2019 License: BSD-2-Clause-Views Imports: 18 Imported by: 0



like bufio.Writer (Copyright 2009 The Go Authors. All rights reserved.) but with instrumentation around flushing because this codebase is the only user: * doesn't implement the entire bufio.Writer api because it doesn't need to * and some simplifications can be made, less edgecases etc



This section is empty.


This section is empty.


func NewKeepSafe

func NewKeepSafe(initialCap int, periodKeep time.Duration) *keepSafe

func NewSlowChan

func NewSlowChan(backend chan []byte, sleep time.Duration) chan []byte

note: goroutine doesn't get cleaned until backend closes. don't instantiate gazillions of these

func Pickle added in v0.11.0

func Pickle(dp *Datapoint) []byte


type Conn

type Conn struct {
	In chan []byte
	// contains filtered or unexported fields

Conn represents a connection to a tcp endpoint. As long as conn.isAlive(), caller may write data to conn.In when no longer alive, caller must call either getRedo or clearRedo: * getRedo to get the last bunch of data which may have not made it to the tcp endpoint. After calling getRedo(), no data may be written to conn.In it also clears the keepSafe buffer * clearRedo: releases the keepSafe buffer NOTE: in the future, this design can be much simpler: keepSafe doesn't need a separate structure, we could just have an in-line buffer in between the dest and the conn since we write buffered chunks in the bufWriter, may as well "keep those safe". e.g. buffered writing and keepSafe can be the same buffer. but this requires significant refactoring.

func NewConn

func NewConn(key, addr string, periodFlush time.Duration, pickle bool, connBufSize, ioBufSize int) (*Conn, error)

func (*Conn) Close

func (c *Conn) Close()

Close closes the connection and releases all resources, with the exception of the keepSafe buffer. because the caller of conn needs a chance to collect that data

func (*Conn) Flush

func (c *Conn) Flush() error

func (*Conn) HandleData

func (c *Conn) HandleData()

func (*Conn) Write

func (c *Conn) Write(buf []byte) (int, error)

returns a network/write error, so that it can be retried later deals with pickle errors internally because retrying wouldn't help anyway

type Datapoint

type Datapoint struct {
	Name string
	Val  float64
	Time uint32

func ParseDataPoint added in v0.11.0

func ParseDataPoint(buf []byte) (*Datapoint, error)

type Destination

type Destination struct {
	Matcher matcher.Matcher `json:"matcher"`

	Addr         string `json:"address"`  // tcp dest
	Instance     string `json:"instance"` // Optional carbon instance name, useful only with consistent hashing
	SpoolDir     string // where to store spool files (if enabled)
	Key          string // unique key per destination, based on routeName and destination addr/port combination
	Spool        bool   `json:"spool"`        // spool metrics to disk while dest down?
	Pickle       bool   `json:"pickle"`       // send in pickle format?
	Online       bool   `json:"online"`       // state of connection online/offline.
	SlowNow      bool   `json:"slowNow"`      // did we have to drop packets in current loop
	SlowLastLoop bool   `json:"slowLastLoop"` // "" last loop

	SpoolBufSize         int
	SpoolMaxBytesPerFile int64
	SpoolSyncEvery       int64
	SpoolSyncPeriod      time.Duration
	SpoolSleep           time.Duration // how long to wait between stores to spool
	UnspoolSleep         time.Duration // how long to wait between loads from spool
	RouteName            string

	// set in/via Run()
	In chan []byte `json:"-"` // incoming metrics
	// contains filtered or unexported fields

func New

func New(routeName, prefix, sub, regex, addr, spoolDir string, spool, pickle bool, periodFlush, periodReConn time.Duration, connBufSize, ioBufSize, spoolBufSize int, spoolMaxBytesPerFile, spoolSyncEvery int64, spoolSyncPeriod, spoolSleep, unspoolSleep time.Duration) (*Destination, error)

New creates a destination object. Note that it still needs to be told to run via Run().

func (*Destination) Flush

func (dest *Destination) Flush() error

func (*Destination) GetMatcher

func (dest *Destination) GetMatcher() matcher.Matcher

func (*Destination) Match

func (dest *Destination) Match(s []byte) bool

func (*Destination) Run

func (dest *Destination) Run()

func (*Destination) Shutdown

func (dest *Destination) Shutdown() error

func (*Destination) Snapshot

func (dest *Destination) Snapshot() *Destination

a "basic" static copy of the dest, not actually running

func (*Destination) Update

func (dest *Destination) Update(opts map[string]string) error

can't be changed yet: pickle, spool, flush, reconn

func (*Destination) UpdateMatcher

func (dest *Destination) UpdateMatcher(matcher matcher.Matcher)

func (*Destination) WaitOnline

func (dest *Destination) WaitOnline() chan struct{}

type Spool

type Spool struct {
	InRT   chan []byte
	InBulk chan []byte
	Out    chan []byte
	// contains filtered or unexported fields

sits in front of nsqd diskqueue. provides buffering (to accept input while storage is slow / sync() runs -every 1000 items- etc) QoS (RT vs Bulk) and controllable i/o rates

func NewSpool

func NewSpool(key, spoolDir string, bufSize int, maxBytesPerFile, syncEvery int64, syncPeriod, spoolSleep, unspoolSleep time.Duration) *Spool

parameters should be tuned so that: can buffer packets for the duration of 1 sync buffer no more then needed, esp if we know the queue is slower then the ingest rate

func (*Spool) Buffer

func (s *Spool) Buffer()

func (*Spool) Close

func (s *Spool) Close()

func (*Spool) Ingest

func (s *Spool) Ingest(bulkData [][]byte)

func (*Spool) Writer

func (s *Spool) Writer()

provides a channel based api to the queue

type Writer

type Writer struct {
	// contains filtered or unexported fields

Writer implements buffering for an io.Writer object. If an error occurs writing to a Writer, no more data will be accepted and all subsequent writes will return the error. After all data has been written, the client should call the Flush method to guarantee all data has been forwarded to the underlying io.Writer.

func NewWriter

func NewWriter(w io.Writer, size int, key string) *Writer

NewWriterSize returns a new Writer whose buffer has at least the specified size. If the argument io.Writer is already a Writer with large enough size, it returns the underlying Writer.

func (*Writer) Available

func (b *Writer) Available() int

Available returns how many bytes are unused in the buffer.

func (*Writer) Buffered

func (b *Writer) Buffered() int

Buffered returns the number of bytes that have been written into the current buffer.

func (*Writer) Flush

func (b *Writer) Flush() error

Flush writes any buffered data to the underlying io.Writer.

func (*Writer) Write

func (b *Writer) Write(p []byte) (nn int, err error)

Write writes the contents of p into the buffer. It returns the number of bytes written. If nn < len(p), it also returns an error explaining why the write is short.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL