logbuffer

package
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 22, 2026 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DfltPrevNumReqsSampleSize = 10
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AbstractRecentRecords

type AbstractRecentRecords[T Storable, U SerializableState] interface {
	AddRecord(rec T)

	// ConfirmRecordCheck sets a time of last check
	// for the records with the same clustering ID
	// as `rec`.
	ConfirmRecordCheck(rec Storable)

	// GetLastCheck returns time of the last time items
	// with specified `clusteringID` where checked.
	GetLastCheck(clusteringID string) time.Time

	// RemoveAnalyzedRecords removes all the records with specified
	// `clusteringID` up until the defined time.
	RemoveAnalyzedRecords(clusteringID string, dt time.Time)

	NumOfRecords(clusteringID string) int

	// ClearOldRecords is a maintenance function called
	// randomly by a respective log processor to keep
	// the number of records in RAM at a reasonable level.
	// The method should return number of removed items
	// (it is mostly for better overview, i.e. not essential)
	ClearOldRecords(maxAge time.Time) int

	// TotalNumOfRecordsSince returns number of records
	// for each clusteringID with its time greater or equal
	// to the `dt`.
	TotalNumOfRecordsSince(dt time.Time) int

	ForEach(clusteringID string, fn func(item T))

	// TotalForEach apply a provided function on all items
	// no matter what clusteringID they belong to
	TotalForEach(fn func(item T))

	// SetStateData sets the current state data for later reuse
	// It may or may not backup data to disk or database.
	// If applicable then `GetStateData` should load the data
	// in case nothing was set yet.
	SetStateData(stateData U)

	GetStateData(dtNow time.Time) U

	EmptyStateData() U
}

type BotDetectionConf

type BotDetectionConf struct {
	// IPOutlierCoeff specifies how far from the Q3 must a value be
	// to be considered an outlier (the formula is `Q3 + ipOutlierCoeff * IQR`)
	IPOutlierCoeff float64 `json:"ipOutlierCoeff"`

	// IPOutlierMinFreq specifies minimum number of requests for
	// an IP (per interval defined in buffer config `AnalysisIntervalSecs`)
	// to be actually reported. Because in case there is small traffic, even
	// legit IP requests may be evaluated as outliers.
	IPOutlierMinFreq int `json:"ipOutlierMinFreq"`

	// BlocklistIP is just for "known" IPs reporting (i.e. there is no
	// actual blocking involved - klogproc indeed cannot block anything).
	BlocklistIP []string `json:"blocklistIp"`

	// TrafficReportingThreshold defines a number specifying how much
	// a number of requests must have changed from the last check
	// (see `AnalysisIntervalSecs`) to be considered abnormal.
	// Please note that this number is really hard to tune as during
	// day, there are natural increases of traffic and without knowing
	// a typical (or even current) day requests progression, this is
	// rather a hint then a 100% evidence of bot activity.
	TrafficReportingThreshold float64 `json:"trafficReportingThreshold"`

	PrevNumReqsSampleSize int `json:"prevNumReqsSampleSize"`
}

type BufferConf

type BufferConf struct {

	// ID buffers with ID can be shared between multiple log readers.
	// This makes sense mostly for services composed of multiple
	// homogenous processes each writing to its log file (e.g. Node.JS)
	ID string `json:"id"`

	HistoryLookupItems int `json:"historyLookupItems"`

	// AnalysisIntervalSecs specifies how often klogproc analyses previous
	// records. The interval is also important because it is a base for other
	// configured values (typically different limits/thresholds)
	AnalysisIntervalSecs int                   `json:"analysisIntervalSecs"`
	ClusteringDBScan     *ClusteringDBScanConf `json:"clusteringDbScan"`
	BotDetection         *BotDetectionConf     `json:"botDetection"`
}

func (*BufferConf) HasConfiguredBufferProcessing

func (bc *BufferConf) HasConfiguredBufferProcessing() bool

func (*BufferConf) IsReference

func (bc *BufferConf) IsReference() bool

func (*BufferConf) IsShared

func (bc *BufferConf) IsShared() bool

func (*BufferConf) Validate

func (bc *BufferConf) Validate() error

type ClusteringDBScanConf

type ClusteringDBScanConf struct {
	MinDensity int     `json:"minDensity"`
	Epsilon    float64 `json:"epsilon"`
}

type DummyRecentRecords

type DummyRecentRecords[T Storable, U SerializableState] struct {
	// contains filtered or unexported fields
}

func NewDummyStorage

func NewDummyStorage[T Storable, U SerializableState](stateDataFactory func() U) *DummyRecentRecords[T, U]

func (*DummyRecentRecords[T, U]) AddRecord

func (st *DummyRecentRecords[T, U]) AddRecord(rec T)

func (*DummyRecentRecords[T, U]) ClearOldRecords

func (st *DummyRecentRecords[T, U]) ClearOldRecords(maxAge time.Time) int

func (*DummyRecentRecords[T, U]) ConfirmRecordCheck

func (st *DummyRecentRecords[T, U]) ConfirmRecordCheck(rec Storable)

func (*DummyRecentRecords[T, U]) EmptyStateData

func (st *DummyRecentRecords[T, U]) EmptyStateData() U

func (*DummyRecentRecords[T, U]) ForEach

func (st *DummyRecentRecords[T, U]) ForEach(clusteringID string, fn func(item T))

func (*DummyRecentRecords[T, U]) GetLastCheck

func (st *DummyRecentRecords[T, U]) GetLastCheck(clusteringID string) time.Time

func (*DummyRecentRecords[T, U]) GetStateData

func (st *DummyRecentRecords[T, U]) GetStateData(dtNow time.Time) U

func (*DummyRecentRecords[T, U]) NumOfRecords

func (st *DummyRecentRecords[T, U]) NumOfRecords(clusteringID string) int

func (*DummyRecentRecords[T, U]) RemoveAnalyzedRecords

func (st *DummyRecentRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)

func (*DummyRecentRecords[T, U]) Report

func (st *DummyRecentRecords[T, U]) Report() map[string]any

func (*DummyRecentRecords[T, U]) SetStateData

func (st *DummyRecentRecords[T, U]) SetStateData(stateData U)

func (*DummyRecentRecords[T, U]) TotalForEach

func (st *DummyRecentRecords[T, U]) TotalForEach(fn func(item T))

func (*DummyRecentRecords[T, U]) TotalNumOfRecordsSince

func (st *DummyRecentRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int

type PrevRecords

type PrevRecords[T Storable, U SerializableState] struct {
	// contains filtered or unexported fields
}

PrevRecords keeps: 1. a defined number of log records in memory (using a circular list for each "clustering ID") 2. custom state object (used typically to store stats in bot detection) 3. last check (both global and for individual "clustering IDs" (which is session + IP))

All the functions are safe to be used concurrently. The `T` type represents a log record type stored by this PrevRecords. The `U` type is a type used to store state data when dealing with persistence.

func NewStorage

func NewStorage[T Storable, U SerializableState](
	bufferConf *BufferConf,
	worklogReset bool,
	storageDirPath string,
	analyzedLogFilePath string,
	stateDataFactory func() U,
) *PrevRecords[T, U]

NewStorage is a recommended factory for creating `Storage`

func (*PrevRecords[T, U]) AddRecord

func (st *PrevRecords[T, U]) AddRecord(rec T)

func (*PrevRecords[T, U]) ClearOldRecords

func (st *PrevRecords[T, U]) ClearOldRecords(maxAge time.Time) int

func (*PrevRecords[T, U]) ConfirmRecordCheck

func (st *PrevRecords[T, U]) ConfirmRecordCheck(rec Storable)

func (*PrevRecords[T, U]) EmptyStateData

func (st *PrevRecords[T, U]) EmptyStateData() U

func (*PrevRecords[T, U]) ForEach

func (st *PrevRecords[T, U]) ForEach(clusteringID string, fn func(item T))

ForEach iterates over stored records with the provided `clusteringID` and calls the provided `fn` with each item as an argument.

func (*PrevRecords[T, U]) GetLastCheck

func (st *PrevRecords[T, U]) GetLastCheck(clusteringID string) time.Time

func (*PrevRecords[T, U]) GetStateData

func (st *PrevRecords[T, U]) GetStateData(dtNow time.Time) U

func (*PrevRecords[T, U]) NumOfRecords

func (st *PrevRecords[T, U]) NumOfRecords(clusteringID string) int

NumOfRecords gets number of stored records for a specific records (identified by their `clusteringID`).

func (*PrevRecords[T, U]) RemoveAnalyzedRecords

func (st *PrevRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)

RemoveAnalyzedRecords removes all the log records older than `dt` with provided `clusteringID` (which is typically something like userID, session, IP)

func (*PrevRecords[T, U]) SetStateData

func (st *PrevRecords[T, U]) SetStateData(stateData U)

func (*PrevRecords[T, U]) TotalForEach

func (st *PrevRecords[T, U]) TotalForEach(fn func(item T))

ForEach iterates over all stored records (no matter what clustering ID they have) and calls the provided `fn` with each item as an argument.

Please note that the records are not sorted by date here as the method iterates in two nested loops - first one goes through all the record groups (= records with the same clustering ID) and the for each this group it iterates through all its items.

func (*PrevRecords[T, U]) TotalNumOfRecordsSince

func (st *PrevRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int

TotalNumOfRecords returns total number of stored records no matter what clustering ID they have but with its time greater or equal to `dt`

type SampleWithReplac

type SampleWithReplac[T any] struct {
	Data []T `json:"data"`
	Cap  int `json:"cap"`
}

func NewSampleWithReplac

func NewSampleWithReplac[T any](initialCap int) *SampleWithReplac[T]

func (*SampleWithReplac[T]) Add

func (sample *SampleWithReplac[T]) Add(item T) int

Add adds a new value to the sample. It returns the sample size after the value was added

func (*SampleWithReplac[T]) GetAll

func (sample *SampleWithReplac[T]) GetAll() []T

func (*SampleWithReplac[T]) Len

func (sample *SampleWithReplac[T]) Len() int

func (*SampleWithReplac[T]) Resize

func (sample *SampleWithReplac[T]) Resize(newSize int)

type SerializableState

type SerializableState interface {
	ToJSON() ([]byte, error)

	// AfterLoadNormalize should make sure loaded
	// data matches the provided `conf`.
	// E.g. if stored samples have length of 100
	// and the current configuration requires 20,
	// the method should cut the sample so it matches
	// the configuration.
	// It should also fix broken stored data (e.g. samples
	// with size 0)
	AfterLoadNormalize(conf *BufferConf, dt time.Time)

	// Report is mainly for debugging and overview
	// pursposes. It should show relevant values of the
	// state object.
	Report() map[string]any
}

type Storable

type Storable interface {
	GetTime() time.Time
	ClusteringClientID() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL