loghive

package module
v0.0.0-...-db1b28a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2020 License: MIT Imports: 16 Imported by: 0

README

loghive

GoDoc Build Status codecov

Loghive is an engine for storing and retrieving logs, built on SQLite.

Why

Because storing and querying logs should be fast and easy, and it currently isn't. I've yet to find an open-source log manager that isn't either hideously overwrought or a freemium commercial offering. All I want is to put logs in, and get them back out fast, and be able to easily specify only the logs I need.

Concepts

A Domain is a label that represents a single log history.

A Segment is a SQLite database that stores a contiguous chunk of a Domain's history starting from a point in time. Logs in a Segment are keyed by timestamp.

A Log is a struct with 3 fields:

type Log struct {
	Domain    string    `json:"domain"`
	Timestamp time.Time `json:"timestamp"`
	Line      []byte    `json:"line"`
}

Logs can be queried by domain, timestamp, and line contents. Queries that span multiple domains will have their results serialized and delivered in timestamp order.

Usage

func TestRoundtripQuery(t *testing.T) {
	path := "./fixtures/roundtrip_query"
	os.RemoveAll(path)
	defer os.RemoveAll(path)
	config := DefaultConfig
	config.WritableDomains = []string{"test"}
	h, err := NewHive(path, config)
	if err != nil {
		t.Errorf("Should be able to create hive: %v", err)
	}
	flushed, err := h.Enqueue("test", []byte("foo"))
	if err != nil {
		t.Errorf("Should be able to enqueue message: %v", err)
	}
	<-flushed
	oneMinuteAgo := time.Now().Add(time.Duration(-1) * time.Minute)
	q := NewQuery([]string{"test"}, oneMinuteAgo, time.Now(), FilterMatchAll())
	go func() {
		err = h.Query(q)
		if err != nil {
			t.Errorf("Should be able to execute query %v %v", q, err)
		}
	}()

	gotResults := false
	for log := range q.Results {
		gotResults = true
		fmt.Printf("Result 1: %v\n", log)
		if string(log.Line) != "foo" {
			t.Errorf("Expected line 'foo', got '%v'", string(log.Line))
		}
	}
	if !gotResults {
		t.Error("Expected to get results")
	}
}

License

Released under The MIT License (see LICENSE.txt).

Copyright 2019 Duncan Smith

Documentation

Index

Constants

View Source
const SyntheticFailureFlush = "!~!~!~_internal_::thislogshouldfail👎Flush"

SyntheticFailureFlush is an ugly hack that exists because I can't figure out how to force a failure during tests otherwise.

View Source
const SyntheticFailureWrite = "!~!~!~_internal_::thislogshouldfail👎Write"

SyntheticFailureWrite is an ugly hack that exists because I can't figure out how to force a failure during tests otherwise.

Variables

View Source
var DefaultConfig = Config{
	InternalLogLevel:   logrus.InfoLevel,
	InternalLogFormat:  LogFormatLogfmt,
	WritableDomains:    []string{"_"},
	SegmentMaxDuration: time.Duration(336) * time.Hour,
	SegmentMaxBytes:    128 * 1024 * 1024,
	LineMaxBytes:       8 * 1024,
	FlushAfterItems:    128,
	FlushAfterDuration: time.Duration(1000) * time.Millisecond,
}

DefaultConfig is the default configuration, which will be written to the config database if a config is not found

Functions

func FilterContainsString

func FilterContainsString(s string) func(l *Log) bool

FilterContainsString takes a string and returns a filter matching lines containing that string

func FilterExactString

func FilterExactString(s string) func(l *Log) bool

FilterExactString takes a string and returns a filter matching lines with exactly that string

func FilterMatchAll

func FilterMatchAll() func(l *Log) bool

FilterMatchAll returns a filter matching all logs

Types

type Config

type Config struct {
	InternalLogLevel   logrus.Level
	InternalLogFormat  LogFormat
	WritableDomains    []string
	SegmentMaxDuration time.Duration
	SegmentMaxBytes    int64
	LineMaxBytes       int
	FlushAfterItems    int
	FlushAfterDuration time.Duration
}

Config describes the configuration that Loghive needs to function

type DBMap

type DBMap = map[string]*sql.DB

DBMap is a map of DB paths to open database handles

type Hive

type Hive struct {
	*mutable.RW

	StoragePath string
	// contains filtered or unexported fields
}

Hive is a running Loghive instance pointed at a storage path

func NewHive

func NewHive(path string, config Config) (*Hive, error)

NewHive returns a pointer to a Hive at the given path. Configuration will be loaded from a file found there or populated from defaults.

func (*Hive) Enqueue

func (h *Hive) Enqueue(domain string, line []byte) (bbq.Callback, error)

Enqueue constructs a Log from the given domain and line, and puts it on the queue to be written

func (*Hive) MarkSegmentForDeletion

func (h *Hive) MarkSegmentForDeletion(s Segment) error

MarkSegmentForDeletion will mark a segment to deleted during the next DeleteMarkedSegments

func (*Hive) Query

func (h *Hive) Query(q *Query) error

Query will return the results of a query on a channel

func (*Hive) ValidateQuery

func (h *Hive) ValidateQuery(query *Query) error

ValidateQuery will return any error with a query's parameters

type IC

type IC struct {
	Method  string
	Log     *Log
	Segment *Segment
}

IC is short for Internal (Log) Context. This makes it easy to do structured logging when processing logs

func (IC) L

func (ic IC) L() *logrus.Entry

L returns a logrus logger with fields set from the log context

type Log

type Log struct {
	Domain    string    `json:"domain"`
	Timestamp time.Time `json:"timestamp"`
	Line      []byte    `json:"line"`
}

Log is a single log line within a domain

func NewLog

func NewLog(domain string, line []byte) *Log

NewLog constructs a timestamped Log from a domain and line

func (*Log) Order

func (l *Log) Order() int64

Order implements march.Ordered

func (*Log) String

func (l *Log) String() string

type LogFormat

type LogFormat string

LogFormat describes how internal logs will be formatted

const LogFormatJSON LogFormat = "json"

LogFormatJSON will print JSON logs

const LogFormatLogfmt LogFormat = "logfmt"

LogFormatLogfmt will print Logfmt logs

type LogWriteFailure

type LogWriteFailure struct {
	Err   error `json:"err"`
	Log   Log   `json:"log"`
	Count int   `json:"count"`
}

LogWriteFailure provides information about why one or more logs were not written

type Query

type Query struct {
	Domains []string
	Start   time.Time
	End     time.Time
	Filter  func(*Log) bool
	Results chan *Log
}

Query is a set of parameters for querying logs

func NewQuery

func NewQuery(domains []string, start time.Time, end time.Time, filter func(*Log) bool) *Query

NewQuery validates and builds a query from the given parameters

type Segment

type Segment struct {
	Path      string    `json:"path"`
	Domain    string    `json:"domain"`
	Timestamp time.Time `json:"timestamp"`
}

Segment is a file that represents a contiguous subset of logs in a domain starting at a timestamp

type SegmentManager

type SegmentManager struct {
	*mutable.RW
	DBMap
	SegmentMap
	Path          string
	PathsToDelete []string
}

SegmentManager manages connections to Segments in BadgerDBs

func NewSegmentManager

func NewSegmentManager(path string) *SegmentManager

NewSegmentManager creates a new SegmentManager for a specified path

func (*SegmentManager) Close

func (m *SegmentManager) Close()

Close closes all managed DBs

func (*SegmentManager) CreateNeededSegments

func (m *SegmentManager) CreateNeededSegments(maxBytes int64, maxDuration time.Duration) error

CreateNeededSegments checks the latest segment in each domain and if any exceed the given constraints, a new segment is created in that domain

func (*SegmentManager) CreateSegment

func (m *SegmentManager) CreateSegment(domain string, timestamp time.Time) (Segment, error)

CreateSegment creates a SQLite file for a segment and stores its metadata in it

func (*SegmentManager) DeleteMarkedSegments

func (m *SegmentManager) DeleteMarkedSegments() error

DeleteMarkedSegments will delete any segments marked for deletion

func (*SegmentManager) Iterate

func (m *SegmentManager) Iterate(domains []string, start, end time.Time, chunkSize int, bufferSize int) []chan []Log

Iterate takes a list of domains and a range of time, returning a list of channels (one per domain) on which chunks of the requested size will be delivered

func (*SegmentManager) MarkSegmentForDeletion

func (m *SegmentManager) MarkSegmentForDeletion(s Segment) error

MarkSegmentForDeletion will mark a segment to deleted during the next DeleteMarkedSegments

func (*SegmentManager) ScanDir

func (m *SegmentManager) ScanDir() ([]Segment, error)

ScanDir scans the directory at `m.Path` for segment DB files and opens them

func (*SegmentManager) Write

func (m *SegmentManager) Write(logs []*Log) error

Write stores a group of logs in their appropriate segment files, returning any failures

type SegmentMap

type SegmentMap = map[string][]Segment

SegmentMap is a map of Segments by Domain

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL