linesd

package module
v0.0.0-...-a90adf1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2019 License: MIT Imports: 38 Imported by: 0

README

linesd

builds

Build Status

data aggregation

seemlesly collect and uppload batches of data (lines) to AWS S3 for long term storage and / or Elastic Search cluster for indexing and searching.

example

the following example will consume lines from stdin and push batches of 60 lines / 60 seconds worth of data to S3 bucket and/or Elastic Search cluster of your choice.

while [ yes ] ; do date; sleep 1; done | \
    go run cmd/linesd/main.go --config config.json \
       --batch_size_seconds 60 \
       --batch_size_lines 60

batches of data are accumulated in memory before being uploaded. https://github.com/prometheus is used for monitoring health and perf of the running process.

references

  1. see: scribed - https://github.com/facebookarchive/scribe
  2. see: splunk - https://www.splunk.com/

coming soon:

  1. binary releases
  2. tests

Documentation

Index

Constants

View Source
const (
	CONTENT_TYPE      = "Content-Type"
	CONTENT_TYPE_AAC  = "audio/aac"
	CONTENT_TYPE_JPEG = "image/jpeg"
	CONTENT_TYPE_RAW  = "application/octet-stream"
	CONTENT_TYPE_BIN  = "application/octet-stream"
	CONTENT_TYPE_FLAC = "audio/flac"
	CONTENT_TYPE_XML  = "text/xml"
	CONTENT_TYPE_CSV  = "text/csv"
	CONTENT_TYPE_JSON = "application/json"
	CONTENT_TYPE_HTML = "text/html"
	CONTENT_TYPE_TXT  = "text/plain"
	CONTENT_TYPE_MP4  = "video/mp4"
	CONTENT_TYPE_PNG  = "image/png"
	CONTENT_TYPE_KML  = "application/vnd.google-earth.kml+xml"
	CONTENT_TYPE_GZ   = "application/x-gzip"
)
View Source
const HTTP_OK_STATUS_MAX = 300
View Source
const HTTP_OK_STATUS_MIN = 200
View Source
const ID_LINESD = "LINESD"
View Source
const LOG_FILES_CURRENT_LOG = "current" + LOG_FILES_EXT
View Source
const LOG_FILES_EXT = ".log"
View Source
const LOG_FILES_MAX_NUM = 10
View Source
const SEP = ":::"
View Source
const SERVICE_NAME = "linesd"
View Source
const VERSION_NUMBER = "0.0.5"

Variables

View Source
var DurationREMult = regexp.MustCompile(`\d+([sSmMhHdDwW])`)
View Source
var DurationREValue = regexp.MustCompile(`(\d+).*`)
View Source
var (
	ERR_INVALID_S3_URL = errors.New("S3 URL must be of form 's3://{bucket}/{key}'")
)
View Source
var (
	ERR_S3_FAILURE = errors.New("can't establish S3 connection")
)
View Source
var FilterASCIIRE = regexp.MustCompile(`[^a-zA-Z_0-9\-]`)

Functions

func CheckFatal

func CheckFatal(e error) error

func CheckNotFatal

func CheckNotFatal(e error) error

func CleanupStringASCII

func CleanupStringASCII(s string, isToLower bool) string

func CompressBlob

func CompressBlob(in []byte) ([]byte, error)

func ElasticSearchPut

func ElasticSearchPut(timeout time.Duration, endpoint string, indexPrefix string, env string, itemType string, items map[string]interface{}) error

func Explode

func Explode(x string) (a, b string)

func FromJson

func FromJson(buf []byte, o interface{}) error

func FromJsonBytes

func FromJsonBytes(buf []byte, o interface{}) error

func FromJsonString

func FromJsonString(buf string, o interface{}) error

func GetMD5Hash

func GetMD5Hash(text string) string

func HttpGetJson

func HttpGetJson(url string, o interface{}, username string, password string,
	timeout time.Duration) error

func HumanFloat

func HumanFloat(value float64, units string) string

func Implode

func Implode(a, b string) string

func IsPrivateIPv4

func IsPrivateIPv4(ip net.IP) bool

func IsValidURL

func IsValidURL(urlString string) error

func Lower16BitPrivateIP

func Lower16BitPrivateIP() (uint16, error)

func ParseDurationString

func ParseDurationString(duration string) int

func ParseFloatOrDefault

func ParseFloatOrDefault(val string, bits int, defaultVal float64) float64

func ParseS3URL

func ParseS3URL(urlStr string) (string, string, error)

func ParseStringOrDefault

func ParseStringOrDefault(s *string, defaultVal string) string

func PercentBar

func PercentBar(percent float64) string

func Prefix

func Prefix(s string, l int) string

func PrivateIPv4

func PrivateIPv4() (net.IP, error)

func ReadJsonFile

func ReadJsonFile(filename string, o interface{}) error

func S3Exists

func S3Exists(region *string, bucketName string, remoteFilename string) (bool, *s3.HeadObjectOutput, error)

func S3Get

func S3Get(region *string, bucketName string, key string) ([]byte, string, error)

func S3GetSignedURL

func S3GetSignedURL(region *string, bucketName string, key string) (string, error)

func S3GetToLocalfile

func S3GetToLocalfile(region *string, bucketName string, key string, localFilename string) (string, error)

func S3Head

func S3Head(region *string, bucketName string, remoteFilename string) (bool, error)

func S3PutBlob

func S3PutBlob(
	region *string,
	timeout time.Duration,
	bucketName string,
	blob []byte,
	remoteFilename string,
	contentType string,
) (*s3.PutObjectOutput, string, error)

func S3PutLocalFile

func S3PutLocalFile(
	region *string,
	timeout time.Duration,
	bucketName string,
	localFilename string,
	remoteFilename string,
	contentType string) (*s3.PutObjectOutput, string, error)

func S3PutReader

func S3PutReader(
	region *string,
	bucketName string,
	blob io.ReadSeeker,
	remoteFilename string,
	contentType string,
) (*s3.PutObjectOutput, error)

func ToHexString

func ToHexString(buf []byte) string

func ToJsonBytes

func ToJsonBytes(v interface{}) []byte

func ToJsonBytesNoIndent

func ToJsonBytesNoIndent(v interface{}) []byte

func ToJsonString

func ToJsonString(v interface{}) string

func ToJsonStringNoIndent

func ToJsonStringNoIndent(v interface{}) string

Types

type Config

type Config struct {
	AWSRegion           string                   `json:"aws_region"`
	AWSBucket           string                   `json:"aws_bucket"`
	AWSKeyPrefix        string                   `json:"aws_key_prefix"`
	AWSElasticSearchURL string                   `json:"aws_elastic_search"`
	Env                 string                   `json:"env"`
	ConcLimit           int                      `json:"conc_limit"`
	IsShowBatches       bool                     `json:"is_show_batches"`
	IsShowLines         bool                     `json:"is_show_lines"`
	BatchSizeInLines    int                      `json:"batch_size_in_lines"`
	BatchSizeInSeconds  int                      `json:"batch_size_in_seconds"`
	Progress            int                      `json:"progress"`
	Address             string                   `json:"address"`
	Streams             map[string]*ConfigStream `json:"streams"`

	// HTTP timeout:
	TimeoutSeconds int `json:"timeout_seconds"`
	// log to local files as well:
	LogFilesFolder string `json:"log_files_folder"`
	// max local file size in bytes:
	LogFilesFileSizeBytes int `json:"log_files_size_bytes"`
	// S3 destination prefix:
	AWSKeyPrefixEnv string
}

type ConfigStream

type ConfigStream struct {
	Name         string `json:"name"`
	TaiCmd       string `json:"tail_cmd"`
	TailFilename string `json:"tail_filename"`
	IsStdin      bool   `json:"is_stdin"`
	// contains filtered or unexported fields
}

type LinesBatch

type LinesBatch struct {
	Name           string    `json:"name"`
	Hostname       string    `json:"hostname"`
	BatchId        string    `json:"batch_id"`
	TimestampStart int64     `json:"ts_start"`
	TimestampEnd   int64     `json:"ts_end"`
	Lines          []*string `json:"lines"`
}

type Server

type Server struct {
	Stats  Stats
	Config Config
	// contains filtered or unexported fields
}

func (*Server) AppendDebugLineToLogFile

func (t *Server) AppendDebugLineToLogFile(line string)

func (*Server) GenerateUniqueId

func (s *Server) GenerateUniqueId(idType string) (string, string, time.Time)

func (*Server) Initialize

func (t *Server) Initialize(config *Config)

func (*Server) ProcessLine

func (t *Server) ProcessLine(streamAddress *string, stream *ConfigStream, line *string)

func (*Server) ProcessLineToLocalFile

func (t *Server) ProcessLineToLocalFile(line *string)

func (*Server) ReadStream

func (t *Server) ReadStream(streamAddress *string, stream *ConfigStream)

func (*Server) RunForever

func (t *Server) RunForever()

func (*Server) UploadBatch

func (t *Server) UploadBatch(batch *LinesBatch)

func (*Server) Write

func (t *Server) Write(p []byte) (n int, err error)

This will be called from log.print* function as log.SetOutput was called on it

type Stats

type Stats struct {
	Counters *prometheus.CounterVec `json:"-"`
	Gauges   *prometheus.GaugeVec   `json:"-"`
}

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL