shuttle

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 8, 2015 License: BSD-2-Clause Imports: 20 Imported by: 0

README

Travis Releases GoDoc

Log Shuttle

Log-shuttle is an open source UNIX program that delivers messages from applications and daemons to log routers and processors via HTTPs.

One of the motivations behind log-shuttle is to provide a simpler form of encrypted & authenticated log delivery. Using HTTPs & Basic Authentication is simpler than the techniques described in RFC5425. TLS transport mapping for Syslog requires that you maintain both client & server certificates for authentication. In multi-tenant environments, the maintenance of certificate management can be quite burdensome.

When using log-shuttle with logplex it is recommended that you spawn 1 log-shuttle per logplex token. This will isolate data between customers and ensure a good QoS. Log-shuttle accepts input from stdin in a newline (\n) delimited format.

When using log-shuttle with Amazon's Kinesis, all the details for the region, stream and access credentials are supplied in the -logs-url (or $LOGS_URL env variable). See the Kinesis setion of this document.

To block as little as possible, log-shuttle will drop outstanding batches if it accumulates > -back-buff amount.

Kinesis

log-shuttle sends data into Kinesis using the PutRecords API call. Each Kinesis record is encoded as length prefixed rfc5424 encoded logs as per rfc6587 (this is the same format logplex accepts). One record per log line.

Log-shuttle expects the following encoding of -logs-url when using Amazon Kinesis:

```
https://<AWS_KEY>:<AWS_SECRET>@kinesis.<AMAZON_REGION>.amazonaws.com/<STREAM NAME>
```

See the Amazon Endpoints documentation for supported regions and hostnames.

Kinesis Caveats

Things that should be handled better/things you should know:

  1. AWS_SECRET, AWS_KEY, AMAZON_REGION & STREAM NAME need to be properly url encoded.
  2. log-shuttle assumes a 200 response means everything is good. Kinesis can return a 200, meaning the http request was good, but include per record errors in the response body.
  3. The maximum number of records in a PutRecords requests is 500, so set the batch size no higher than 498 (500 - 2 records for possible drops / lost).
  4. Logplex max line length is 10k, Kinesis max record size is 50k of base64 encoded data. A -max-line-length of somewhere less than 37500 should work for Kinesis w/o causing errors.
  5. Kinesis does not support the -gzip option as that option compresses the body of the request.

Install

$ go get -u github.com/heroku/log-shuttle/...

After that $GOPATH/bin/log-shuttle should be available.

Making Debs

Requires:

  • dpkg (see also brew install dpkg)
  • go & gox (updated via Makefile)
make debs

Hacking on log-shuttle

Fork the repo, hack, submit PRs.

Testing
$ go test -v ./...
Submitting Code
  • Open an issue on GitHub.
  • Keep changes in a feature branch
  • Submit PR
Replacing local syslog

Libc uses a local AF_UNIX SOCK_DGRAM (or SOCK_STREAM) for syslog(3) calls. Most unix utils use the syslog(3) call to log to syslog. You can have log-shuttle transport those messages too with a little help from some other standard unix programs.

  1. Stop your local syslog
  2. rm -f /dev/log
  3. us netcat, tr, stdbuf to read connections to /dev/log and convert the \0 terminator to \n

Like so...

sudo /etc/init.d/rsyslog stop
sudo rm -f /dev/log
(sudo nc -n -k -d -Ul /dev/log & until [ ! -e /dev/log ]; do sleep 0.01; done; sudo chmod a+rw /dev/log) | stdbuf -i0 -o0 tr \\0 \\n | ./log-shuttle -logs-url=... ... -input-format=1

License

Copyright (c) 2013-15 Heroku Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Documentation

Index

Constants

View Source
const (
	InputFormatRaw = iota
	InputFormatRFC3164
	InputFormatRFC5424
)

Input format constants. TODO: ensure these are really used properly

View Source
const (
	DefaultMaxLineLength = 10000 // Logplex max is 10000 bytes, so default to that
	DefaultInputFormat   = InputFormatRaw
	DefaultFrontBuff     = 1000
	DefaultBackBuff      = 50
	DefaultTimeout       = 5 * time.Second
	DefaultWaitDuration  = 250 * time.Millisecond
	DefaultMaxAttempts   = 3
	DefaultStatsInterval = 0 * time.Second
	DefaultStatsSource   = ""
	DefaultPrintVersion  = false
	DefaultVerbose       = false
	DefaultSkipVerify    = false
	DefaultPriVal        = "190"
	DefaultVersion       = "1"
	DefaultProcID        = "shuttle"
	DefaultAppName       = "token"
	DefaultHostname      = "shuttle"
	DefaultMsgID         = "- -"
	DefaultLogsURL       = ""
	DefaultNumBatchers   = 2
	DefaultNumOutlets    = 4
	DefaultBatchSize     = 500
	DefaultID            = ""
	DefaultDrop          = true
)

Default option values

View Source
const (
	// EOFRetrySleep is the amount of time to sleep between retries caused by an io.EOF, in ms.
	EOFRetrySleep = 100
	// OtherRetrySleep is the tIme to sleep between retries for any other error, in ms.
	OtherRetrySleep = 1000
	// DepthHighWatermark is the high watermark, beyond which the outlet looses batches instead of retrying.
	DepthHighWatermark = 0.6
	// RetryFormat is the format string for retries
	RetryFormat = "at=post retry=%t msgcount=%d inbox.length=%d request_id=%q attempts=%d error=%q\n"
	// RetryWithTypeFormat if the format string for retries that also have a type
	RetryWithTypeFormat = "at=post retry=%t msgcount=%d inbox.length=%d request_id=%q attempts=%d error=%q errtype=\"%T\"\n"
)
View Source
const (
	// LogplexBatchTimeFormat is the format of timestamps as expected by Logplex
	LogplexBatchTimeFormat = "2006-01-02T15:04:05.000000+00:00"
	// LogplexContentType is the content type logplex expects
	LogplexContentType = "application/logplex-1"
)

Variables

View Source
var (
	DefaultFormatterFunc = NewLogplexBatchFormatter
)

Defaults that can't be constants

Functions

This section is empty.

Types

type Batch

type Batch struct {
	UUID *uuid.UUID
	// contains filtered or unexported fields
}

Batch holds incoming log lines and provides some helpers for dealing with the grouping of logLines

func NewBatch

func NewBatch(capacity int) Batch

NewBatch returns a new batch with a capacity pre-set

func (*Batch) Add

func (b *Batch) Add(ll LogLine) bool

Add a logline to the batch and return a boolean indicating if the batch is full or not

func (*Batch) MsgCount

func (b *Batch) MsgCount() int

MsgCount returns the number of msgs in the batch

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher coalesces logs coming via inLogs into batches, which are sent out via outBatches

func NewBatcher

func NewBatcher(s *Shuttle) Batcher

NewBatcher created an empty Batcher for the provided shuttle

func (Batcher) Batch

func (b Batcher) Batch()

Batch loops getting an empty batch and filling it. Filled batcches are sent via the outBatches channel. If outBatches is full, then the batch is dropped and the drops counters is incremented by the number of messages dropped.

type Config

type Config struct {
	MaxLineLength int
	BackBuff      int
	FrontBuff     int
	BatchSize     int
	NumBatchers   int
	NumOutlets    int
	InputFormat   int
	MaxAttempts   int
	LogsURL       string
	Prival        string
	Version       string
	Procid        string
	Hostname      string
	Appname       string
	Msgid         string
	StatsSource   string
	SkipVerify    bool
	PrintVersion  bool
	Verbose       bool
	UseGzip       bool
	Drop          bool
	WaitDuration  time.Duration
	Timeout       time.Duration
	StatsInterval time.Duration

	ID            string
	FormatterFunc NewHTTPFormatterFunc

	// Loggers
	Logger    *log.Logger
	ErrLogger *log.Logger
	// contains filtered or unexported fields
}

Config holds the various config options for a shuttle

func NewConfig

func NewConfig() Config

NewConfig returns a newly created Config, filled in with defaults

func (*Config) ComputeHeader

func (c *Config) ComputeHeader()

ComputeHeader computes the syslogFrameHeaderFormat once so we don't have to do that for every formatter itteration Should be called after setting up the rest of the config or if the config changes

type Counter

type Counter struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Counter is used to track 2 values for a given metric. The first item is the "all time" metric counterand the second is the last value since the metric was ReadAndReset. Counters are safe for concurrent use.

func NewCounter

func NewCounter(initial int) *Counter

NewCounter returns a new Counter initialized to the initial value

func (*Counter) Add

func (c *Counter) Add(u int) int

Add increments the counter (alltime and current), returning the new value

func (*Counter) AllTime

func (c *Counter) AllTime() int

AllTime returns the current alltime value of the Counter

func (*Counter) Read

func (c *Counter) Read() int

Read returns the current value of the Counter

func (*Counter) ReadAndReset

func (c *Counter) ReadAndReset() (int, time.Time)

ReadAndReset returns the current value and the last time it was reset, then resets the value and the last reset time to time.Now()

type GzipFormatter

type GzipFormatter struct {
	// contains filtered or unexported fields
}

GzipFormatter is an HTTPFormatter that is built with a delegate HTTPFormatter but which compresses the request body

func NewGzipFormatter

func NewGzipFormatter(delegate HTTPFormatter) *GzipFormatter

NewGzipFormatter builds a new GzipFormatter with the supplied delegate

func (*GzipFormatter) Close

func (g *GzipFormatter) Close() error

Close the stream

func (*GzipFormatter) MsgCount

func (g *GzipFormatter) MsgCount() int

MsgCount return the number of messages contained in the formatted batch

func (*GzipFormatter) Read

func (g *GzipFormatter) Read(p []byte) (int, error)

Read bytes from the formatter stream

func (*GzipFormatter) Request

func (g *GzipFormatter) Request() (*http.Request, error)

Request returns a http.Request to be used with a http.Client The request has it's body and headers set as necessary

type HTTPFormatter

type HTTPFormatter interface {
	Request() (*http.Request, error) // Request() returns a *http.Request ready to be handled by an outlet
	SubFormatter
}

HTTPFormatter is the interface that http outlets use to format a HTTP request.

func NewKinesisFormatter

func NewKinesisFormatter(b Batch, eData []errData, config *Config) HTTPFormatter

NewKinesisFormatter constructs a proper HTTPFormatter for Kinesis http targets

func NewLogplexBatchFormatter

func NewLogplexBatchFormatter(b Batch, eData []errData, config *Config) HTTPFormatter

NewLogplexBatchFormatter returns a new LogplexBatchFormatter wrapping the provided batch

type HTTPOutlet

type HTTPOutlet struct {

	// User supplied loggers
	Logger *log.Logger
	// contains filtered or unexported fields
}

HTTPOutlet handles delivery of batches to HTTP endpoints by creating formatters for each request. HTTPOutlets handle retries, response parsing and lost counters

func NewHTTPOutlet

func NewHTTPOutlet(s *Shuttle) *HTTPOutlet

NewHTTPOutlet returns a properly constructed HTTPOutlet for the given shuttle

func (*HTTPOutlet) Outlet

func (h *HTTPOutlet) Outlet()

Outlet receives batches from the inbox and submits them to logplex via HTTP.

type KinesisFormatter

type KinesisFormatter struct {
	io.Reader
	// contains filtered or unexported fields
}

KinesisFormatter formats batches destined for AWS Kinesis HTTP endpoints Kinesis has a very small payload side, so recommend setting config.BatchSize in the 1-3 range so as to not loose logs because we go over the batch size. Kinesis formats the Data using the LogplexLineFormatter, which is additionally base64 encoded.

func (*KinesisFormatter) MsgCount

func (kf *KinesisFormatter) MsgCount() int

MsgCount returns the number of records that the formatter is formatting

func (*KinesisFormatter) Request

func (kf *KinesisFormatter) Request() (*http.Request, error)

Request constructs a request for this formatter See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html

type KinesisRecord

type KinesisRecord struct {
	// contains filtered or unexported fields
}

KinesisRecord is used to marshal LoglexLineFormatters to Kinesis Records for the PutRecords API Call

func (KinesisRecord) WriteTo

func (r KinesisRecord) WriteTo(w io.Writer) (n int64, err error)

WriteTo writes the LogplexLineFormatter to the provided writer in Kinesis' PutRecordsFormat. Conforms to the WriterTo interface.

Since the data should be relatively small just read the LogplexLineFormatter into ram making a bunch of temporary garbage. AFAICT it's just not worth it to try and string these together with io.Pipes and the like. :-(

type LogLine

type LogLine struct {
	// contains filtered or unexported fields
}

LogLine holds the new line terminated log messages and when shuttle received them.

func NewLogLine

func NewLogLine(line []byte, when time.Time) LogLine

NewLogLine returns a newly constructed LogLine containing the provided line and time

func (LogLine) Length

func (ll LogLine) Length() int

Length returns the length of the raw byte of the LogLine

type LogLineReader

type LogLineReader struct {
	// contains filtered or unexported fields
}

LogLineReader performs the reading of lines from an io.ReadCloser, encapsulating lines into a LogLine and emitting them on outbox

func NewLogLineReader

func NewLogLineReader(o chan<- LogLine, m metrics.Registry) LogLineReader

NewLogLineReader constructs a new reader with it's own Outbox.

func (LogLineReader) Enqueue

func (rdr LogLineReader) Enqueue(ll LogLine)

Enqueue a single log line and increment the line counters

func (LogLineReader) ReadLogLines

func (rdr LogLineReader) ReadLogLines(input io.ReadCloser) error

ReadLogLines reads lines from the ReadCloser

type LogplexBatchFormatter

type LogplexBatchFormatter struct {
	io.Reader
	// contains filtered or unexported fields
}

LogplexBatchFormatter implements on io.Reader that returns Logplex formatted log lines. Wraps log lines in length prefixed rfc5424 formatting, splitting them as necessary to config.MaxLineLength

func (*LogplexBatchFormatter) MsgCount

func (bf *LogplexBatchFormatter) MsgCount() int

MsgCount of the wrapped batch.

func (*LogplexBatchFormatter) Request

func (bf *LogplexBatchFormatter) Request() (*http.Request, error)

Request returns a properly constructed *http.Request, complete with headers and ContentLength set.

type LogplexLineFormatter

type LogplexLineFormatter struct {
	// contains filtered or unexported fields
}

LogplexLineFormatter formats individual loglines into length prefixed rfc5424 messages via an io.Reader interface

func NewLogplexErrorFormatter

func NewLogplexErrorFormatter(err errData, config *Config) *LogplexLineFormatter

NewLogplexErrorFormatter returns a LogplexLineFormatter for the error data. These can be used to inject error data into the log stream

func NewLogplexLineFormatter

func NewLogplexLineFormatter(ll LogLine, config *Config) *LogplexLineFormatter

NewLogplexLineFormatter returns a new LogplexLineFormatter wrapping the provided LogLine

func (*LogplexLineFormatter) AppName

func (llf *LogplexLineFormatter) AppName() string

AppName returns the name of app name field based on the inputFormat For use in syslog framing

func (*LogplexLineFormatter) MsgCount

func (llf *LogplexLineFormatter) MsgCount() int

MsgCount is always 1 for a Line

func (*LogplexLineFormatter) Read

func (llf *LogplexLineFormatter) Read(p []byte) (n int, err error)

Implements the io.Reader interface tries to fill p as full as possible before returning

func (*LogplexLineFormatter) Reset

func (llf *LogplexLineFormatter) Reset()

Reset the reader so that the log line can be re-read

type NewHTTPFormatterFunc

type NewHTTPFormatterFunc func(b Batch, eData []errData, config *Config) HTTPFormatter

NewHTTPFormatterFunc defines the function type for defining creating and returning a new Formatter

type Shuttle

type Shuttle struct {
	LogLineReader

	LogLines        chan LogLine
	Batches         chan Batch
	MetricsRegistry metrics.Registry

	Drops, Lost      *Counter
	NewFormatterFunc NewHTTPFormatterFunc
	Logger           *log.Logger
	ErrLogger        *log.Logger
	// contains filtered or unexported fields
}

Shuttle is the main entry point into the library

func NewShuttle

func NewShuttle(config Config) *Shuttle

NewShuttle returns a properly constructed Shuttle with a given config

func (*Shuttle) Land

func (s *Shuttle) Land()

Land gracefully terminates the shuttle instance, ensuring that anything read is batched and delivered

func (*Shuttle) Launch

func (s *Shuttle) Launch()

Launch a shuttle by spawing it's outlets and batchers (in that order), which is the reverse of shutdown.

type SubFormatter

type SubFormatter interface {
	MsgCount() int // MsgCount is the number of messages after formatting
	io.Reader
}

SubFormatter formats a complete batch or a subsection of a batch. It may split lines in the batch as needed by the destination, making the MsgCount() of the formatter different from the MsgCount of the source batch. A formatter may emitt more (likely) or less bytes for a given LogLine than the actual Logline.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL