ekanite

package module
v1.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2021 License: MIT Imports: 32 Imported by: 0

README

For detailed look at the goals, design, and implementation of this project, check out these blog posts.

Ekanite Circle CI GoDoc Go Report Card Release

Ekanite is a high-performance syslog server with built-in text search. Its goal is to do a couple of things, and do them well -- accept log messages over the network, and make it easy to search the messages. What it lacks in feature, it makes up for in focus. Built in Go, it has no external dependencies, which makes deployment easy.

Features include:

  • Supports reception of log messages over UDP, TCP, and TCP with TLS.
  • Full text search of all received log messages.
  • Full parsing of RFC5424 headers.
  • Log messages are indexed by parsed timestamp, if one is available. This means search results are presented in the order the messages occurred, not in the order they were received, ensuring sensible display even with delayed senders.
  • Automatic data-retention management. Ekanite deletes indexed log data older than a configurable time period.
  • Not a JVM in sight.

Search is implemented using the bleve search library. For some performance analysis of bleve, and of the sharding techniques used by Ekanite, check out this post.

Getting started

The quickest way to get running on OSX and Linux is to download a pre-built release binary. You can find these binaries on the Github releases page. Once installed, you can start Ekanite like so:

ekanited -datadir ~/ekanite_data # Or any directory of your choice.

To see all Ekanite options pass -h on the command line.

If you want to build Ekanite, either because you want the latest code or a pre-built binary for platform is not available, take a look at CONTRIBUTING.md.

Sending logs to Ekanite

For now, for Ekanite to accept logs, your syslog client must be configured such that the log lines are RFC5424 compliant, and in the following format:

<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROC-ID MSGID MSG"

Consult the RFC to learn what each of these fields is. The TIMESTAMP field must be in RFC3339 format. Both rsyslog and syslog-ng support templating, which make it very easy for those programs to format logs correctly and transmit the logs to Ekanite. Templates and installation instructions for both systems are below.

rsyslog

# Send messages to Ekanite over TCP using the template. Assumes Ekanite is listening on 127.0.0.1:5514
$template Ekanite,"<%pri%>%protocol-version% %timestamp:::date-rfc3339% %HOSTNAME% %app-name% %procid% - %msg%\n"
*.*             @@127.0.0.1:5514;Ekanite

Add this template to /etc/rsyslog.d/23-ekanite.conf and then restart rsyslog using the command sudo service rsyslog restart.

syslog-ng

source s_ekanite {
	system();	# Check which OS & collect system logs
	internal();	# Collect syslog-ng logs
};
template Ekanite { template("<${PRI}>1 ${ISODATE} ${HOST} ${PROGRAM} ${PID} - $MSG\n"); template_escape(no) };
destination d_ekanite {
	tcp("127.0.0.1" port(5514) template(Ekanite));
};

log {
	source(s_ekanite);
	destination(d_ekanite);
};

Add this template to /etc/syslog-ng/syslog-ng.conf and then restart syslog-ng using the command /etc/init.d/syslog-ng restart.

With these changes in place rsyslog or syslog-ng will continue to send logs to any existing destination, and also forward the logs to Ekanite.

Searching the logs

Search support is pretty simple at the moment. You have two options -- a simple telnet-like interface, and a browser-based interface.

Telnet interface

Telnet to the query server (see the command line options) and enter a search term. The query language supported is the simple language supported by bleve, but a more sophisiticated query syntax, including searching for specific field values, may be supported soon.

For example, below is an example search session, showing accesses to the login URL of a Wordpress site. The telnet clients connects to the query server and enters the string login

$ telnet 127.0.0.1 9950
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
login
<134>0 2015-05-05T23:50:17.025568+00:00 fisher apache-access - - 65.98.59.154 - - [05/May/2015:23:50:12 +0000] "GET /wp-login.php HTTP/1.0" 200 206 "-" "-"
<134>0 2015-05-06T01:24:41.232890+00:00 fisher apache-access - - 104.140.83.221 - - [06/May/2015:01:24:40 +0000] "GET /wp-login.php?action=register HTTP/1.0" 200 206 "http://www.philipotoole.com/" "Opera/9.80 (Windows NT 6.2; Win64; x64) Presto/2.12.388 Version/12.17"
<134>0 2015-05-06T01:24:41.232895+00:00 fisher apache-access - - 104.140.83.221 - - [06/May/2015:01:24:40 +0000] "GET /wp-login.php?action=register HTTP/1.1" 200 243 "http://www.philipotoole.com/wp-login.php?action=register" "Opera/9.80 (Windows NT 6.2; Win64; x64) Presto/2.12.388 Version/12.17"
<134>0 2015-05-06T02:47:54.612953+00:00 fisher apache-access - - 184.68.20.22 - - [06/May/2015:02:47:51 +0000] "GET /wp-login.php HTTP/1.1" 200 243 "-" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/24.0.1309.0 Safari/537.17"
<134>0 2015-05-06T04:20:49.008609+00:00 fisher apache-access - - 193.104.41.186 - - [06/May/2015:04:20:46 +0000] "POST /wp-login.php HTTP/1.1" 200 206 "-" "Opera 10.00"

Perhaps you only want to search for POST accesses to that URL:

login -GET
<134>0 2015-05-06T04:20:49.008609+00:00 fisher apache-access - - 193.104.41.186 - - [06/May/2015:04:20:46 +0000] "POST /wp-login.php HTTP/1.1" 200 206 "-" "Opera 10.00"

A more sophisticated client program is planned.

Browser interface

The browser-based interface also accepts bleve-style queries, identical to those described in the Telnet section. By default the browser interface is available at http://localhost:8080. An example session is shown below.

Data Diagram

Diagnostics

Basic statistics and diagnostics are available. Visit http://localhost:9951/debug/vars to retrieve this information. The host and port can be changed via the -diag command-line option.

Project Status

The project is actively developed and is early stage software -- contributions in the form of bug reports and pull requests are welcome. Much work remains around performance and scaling, and you can check out the issues for more details.

Documentation

Index

Constants

View Source
const (
	DefaultNumShards       = 4
	DefaultIndexDuration   = 24 * time.Hour
	DefaultRetentionPeriod = 7 * 24 * time.Hour

	RetentionCheckInterval = time.Hour
)

Engine defaults

View Source
const (

	// MaxSearchHitSize the max search record in results
	MaxSearchHitSize = 10000

	SegoDictPath = "lib/sego_dictionary.txt"
)

Variables

View Source
var ErrNotFound = errors.New("not found")

Functions

func CloseWith added in v1.3.1

func CloseWith(closer io.Closer)

func Convert added in v1.3.1

func Convert(pa string, delta time.Duration, create func(pa string) (Writer, error)) error

func DeleteIndex

func DeleteIndex(i *Index) error

DeleteIndex deletes the index.

func ErrArray added in v1.3.1

func ErrArray(errList []error) error

func GroupBy added in v1.3.1

func GroupBy(seacher Searcher, ctx context.Context, startAt, endAt time.Time, q query.Query, field string,
	cb func(map[string]uint64) error) error

func GroupByNumeric added in v1.3.2

func GroupByNumeric(seacher Searcher, ctx context.Context, startAt, endAt time.Time, q query.Query, field string, start, end, step int64,
	cb func(req *bleve.SearchRequest, resp *bleve.SearchResult, results []*search.NumericRangeFacet) error) error

func GroupByTime added in v1.3.1

func GroupByTime(seacher Searcher, ctx context.Context, startAt, endAt time.Time, q query.Query, field string, value time.Duration,
	cb func(req *bleve.SearchRequest, resp *bleve.SearchResult, results []*search.DateRangeFacet) error) error

func MultiSearch added in v1.3.1

func MultiSearch(ctx context.Context, req *bleve.SearchRequest, indexes []*LazyIndex) (*bleve.SearchResult, error)

MultiSearch executes a SearchRequest across multiple Index objects, then merges the results. The indexes must honor any ctx deadline.

func ParseTime added in v1.3.1

func ParseTime(s string) time.Time

func SearchString added in v1.3.1

func SearchString(ctx context.Context, logger *log.Logger, searcher Searcher, q string) (<-chan string, error)

func SetSegoRootDir added in v1.3.3

func SetSegoRootDir(dir string)

Types

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher accepts "input events", and once it has a certain number, or a certain amount of time has passed, sends those as indexable Events to an Indexer. It also supports a maximum number of unprocessed Events it will keep pending. Once this limit is reached, it will not accept anymore until outstanding Events are processed.

func NewBatcher

func NewBatcher(e EventIndexer, sz int, dur time.Duration, max int) *Batcher

NewBatcher returns a Batcher for EventIndexer e, a batching size of sz, a maximum duration of dur, and a maximum outstanding count of max.

func (*Batcher) C

func (b *Batcher) C() chan<- Document

C returns the channel on the batcher to which events should be sent.

func (*Batcher) Start

func (b *Batcher) Start(errChan chan<- error) error

Start starts the batching process.

func (*Batcher) Stop added in v1.3.1

func (b *Batcher) Stop()

Stop stops the batching process.

type Continuation added in v1.3.1

type Continuation struct {
	// contains filtered or unexported fields
}

func (*Continuation) Close added in v1.3.1

func (c *Continuation) Close() error

type DocID

type DocID string

DocID is a string, with the following configuration. It's 32-characters long, encoding 2 64-bit unsigned integers. When sorting DocIDs, the first 16 characters, reading from the left hand side represent the most significant 64-bit number. And therefore the next 16 characters represent the least-significant 64-bit number.

type DocIDs

type DocIDs []DocID

DocIDs is a slice of DocIDs.

func (DocIDs) Len

func (a DocIDs) Len() int

func (DocIDs) Less

func (a DocIDs) Less(i, j int) bool

func (DocIDs) Swap

func (a DocIDs) Swap(i, j int)

type Document

type Document interface {
	ID() DocID
	Data() interface{}
	ReferenceTime() time.Time
}

Document specifies the interface required by an object if it is to be indexed.

type Engine

type Engine struct {
	NumShards       int           // Number of shards to use when creating an index.
	NumCaches       int           // Number of caches to use when search in index.
	IndexDuration   time.Duration // Duration of created indexes.
	RetentionPeriod time.Duration // How long after Index end-time to hang onto data.

	Logger *log.Logger
	// contains filtered or unexported fields
}

Engine is the component that performs all indexing.

func NewEngine

func NewEngine(path string) *Engine

NewEngine returns a new indexing engine, which will use any data located at path.

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine.

func (*Engine) FieldDict added in v1.3.1

func (e *Engine) FieldDict(ctx context.Context, startTime, endTime time.Time, field string) ([]bleve_index.DictEntry, error)

func (*Engine) Fields added in v1.3.1

func (e *Engine) Fields(ctx context.Context, startTime, endTime time.Time) ([]string, error)

func (*Engine) Index

func (e *Engine) Index(ctx *Continuation, events []Document) error

Index indexes a batch of Events. It blocks until all processing has completed.

func (*Engine) Open

func (e *Engine) Open() error

Open opens the engine.

func (*Engine) Path

func (e *Engine) Path() string

Path returns the path to the directory of indexed data.

func (*Engine) Query added in v1.3.1

func (e *Engine) Query(ctx context.Context, startTime, endTime time.Time, req *bleve.SearchRequest, cb func(*bleve.SearchRequest, *bleve.SearchResult) error) error

func (*Engine) Total

func (e *Engine) Total() (uint64, error)

Total returns the total number of documents indexed.

type EventIndexer

type EventIndexer interface {
	Index(ctx *Continuation, events []Document) error
}

EventIndexer is the interface a system than can index events must implement.

type Index

type Index struct {
	Shards []*Shard         // Individual bleve indexes
	Alias  bleve.IndexAlias // All bleve indexes as one reference, for search
	// contains filtered or unexported fields
}

Index represents a collection of shards. It contains data for a specific time range.

func NewIndex

func NewIndex(id int, path string, startTime, endTime time.Time, numShards int) (*Index, error)

NewIndex returns an Index for the given start and end time, with the requested shards. It returns an error if an index already exists at the path.

func OpenIndex

func OpenIndex(id int, path string, exceptstartTime, exceptEndTime time.Time) (*Index, error)

OpenIndex opens an existing index, at the given path.

func (*Index) Close

func (i *Index) Close() error

Close closes the index.

func (*Index) Contains

func (i *Index) Contains(t time.Time) bool

Contains returns whether the index's time range includes the given reference time.

func (*Index) Document

func (i *Index) Document(id DocID) ([]byte, error)

Document returns the source from the index for the given ID.

func (*Index) EndTime

func (i *Index) EndTime() time.Time

EndTime returns the exclusive end time of the index.

func (*Index) Expired

func (i *Index) Expired(t time.Time, r time.Duration) bool

Expired returns whether the index has expired at the given time, if the retention period is r.

func (*Index) Index

func (i *Index) Index(documents []Document) error

Index indexes the slice of documents in the index. It takes care of all shard routing.

func (*Index) Path

func (i *Index) Path() string

Path returns the path to storage for the index.

func (*Index) Search

func (i *Index) Search(q string) (DocIDs, error)

Search performs a search of the index using the given query. Returns IDs of documents which satisfy all queries. Returns Doc IDs in sorted order, ascending.

func (*Index) Shard

func (i *Index) Shard(docId DocID) *Shard

Shard returns the shard from the index, for the given doc ID.

func (*Index) StartTime

func (i *Index) StartTime() time.Time

StartTime returns the inclusive start time of the index.

func (*Index) Total

func (i *Index) Total() (uint64, error)

Total returns the number of documents in the index.

type IndexLoader added in v1.3.1

type IndexLoader struct {
	// contains filtered or unexported fields
}

func (*IndexLoader) Close added in v1.3.1

func (loader *IndexLoader) Close() error

func (*IndexLoader) Do added in v1.3.1

func (loader *IndexLoader) Do(cb func(loader *IndexLoader, switchFunc func()))

func (*IndexLoader) GetIndexes added in v1.3.1

func (loader *IndexLoader) GetIndexes(startTime, endTime time.Time) []*LazyIndex

func (*IndexLoader) Load added in v1.3.1

func (loader *IndexLoader) Load(ctx context.Context, li *LazyIndex) (*ResourceIndex, error)

func (*IndexLoader) Open added in v1.3.1

func (il *IndexLoader) Open(pa string, numShards, numCaches int, indexDuration time.Duration) error

Open opens the engine.

type Indexes

type Indexes []*Index

Indexes is a slice of indexes.

func (Indexes) Len

func (i Indexes) Len() int

Indexes are ordered by decreasing end time. If two indexes have the same end time, then order by decreasing start time. This means that the first index in the slice covers the latest time range.

func (Indexes) Less

func (i Indexes) Less(u, v int) bool

func (Indexes) Swap

func (i Indexes) Swap(u, v int)

type LazyIndex added in v1.3.1

type LazyIndex struct {
	// contains filtered or unexported fields
}

func OpenLazyIndex added in v1.3.1

func OpenLazyIndex(path string, indexDuration time.Duration) (*LazyIndex, error)

OpenLazyIndex opens an existing index, at the given path.

func (*LazyIndex) Contains added in v1.3.1

func (i *LazyIndex) Contains(t time.Time) bool

Contains returns whether the index's time range includes the given reference time.

func (*LazyIndex) EndTime added in v1.3.1

func (i *LazyIndex) EndTime() time.Time

EndTime returns the exclusive end time of the index.

func (*LazyIndex) Expired added in v1.3.1

func (i *LazyIndex) Expired(t time.Time, r time.Duration) bool

Expired returns whether the index has expired at the given time, if the retention period is r.

func (*LazyIndex) Load added in v1.3.1

func (i *LazyIndex) Load(ctx context.Context) (*ResourceIndex, error)

func (*LazyIndex) Path added in v1.3.1

func (i *LazyIndex) Path() string

Path returns the path to storage for the index.

func (*LazyIndex) StartTime added in v1.3.1

func (i *LazyIndex) StartTime() time.Time

StartTime returns the inclusive start time of the index.

type LazyIndexes added in v1.3.1

type LazyIndexes []*LazyIndex

Indexes is a slice of indexes.

func (LazyIndexes) Len added in v1.3.1

func (i LazyIndexes) Len() int

Indexes are ordered by decreasing end time. If two indexes have the same end time, then order by decreasing start time. This means that the first index in the slice covers the latest time range.

func (LazyIndexes) Less added in v1.3.1

func (i LazyIndexes) Less(u, v int) bool

func (LazyIndexes) Swap added in v1.3.1

func (i LazyIndexes) Swap(u, v int)

type ResourceIndex added in v1.3.1

type ResourceIndex struct {
	*Index
	// contains filtered or unexported fields
}

func (*ResourceIndex) Close added in v1.3.1

func (ri *ResourceIndex) Close() error

type Searcher

type Searcher interface {
	Query(ctx context.Context, startTime, endTime time.Time, req *bleve.SearchRequest,
		cb func(*bleve.SearchRequest, *bleve.SearchResult) error) error
	Fields(ctx context.Context, startTime, endTime time.Time) ([]string, error)
	FieldDict(ctx context.Context, startTime, endTime time.Time, field string) ([]bleve_index.DictEntry, error)
}

Searcher is the interface any object that perform searches should implement.

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard is a the basic data store for indexed data. Indexing operations are not goroutine safe, and only 1 indexing operation should occur at one time.

func NewShard

func NewShard(path string) *Shard

NewShard returns a shard using the data at the given path.

func (*Shard) Close

func (s *Shard) Close() error

Close closes the shard.

func (*Shard) Document

func (s *Shard) Document(id DocID) ([]byte, error)

Document returns the source document from the shard for the given ID.

func (*Shard) Index

func (s *Shard) Index(documents []Document) error

Index indexes a slice of Documents into the shard.

func (*Shard) Open

func (s *Shard) Open() error

Open opens the shard. If no data exists at the shard's path, an empty shard will be created.

func (*Shard) Total

func (s *Shard) Total() (uint64, error)

Total returns the number of events in the shard.

type Writer added in v1.3.1

type Writer interface {
	Output(string, *document.Document, map[string]interface{}) error
	Close() error
}

func NewCsvWriter added in v1.3.1

func NewCsvWriter(out io.Writer) (Writer, error)

func NewShardWriter added in v1.3.1

func NewShardWriter(pa string) (Writer, error)

Directories

Path Synopsis
cmd
Package query implements a parser for the Ekanite query language.
Package query implements a parser for the Ekanite query language.
Package rfc5424 is a state-machine parser of RFC5424-formatted log lines.
Package rfc5424 is a state-machine parser of RFC5424-formatted log lines.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL