Documentation
¶
Index ¶
- Constants
- type AbstractRecentRecords
- type BotDetectionConf
- type BufferConf
- type ClusteringDBScanConf
- type DummyRecentRecords
- func (st *DummyRecentRecords[T, U]) AddRecord(rec T)
- func (st *DummyRecentRecords[T, U]) ClearOldRecords(maxAge time.Time) int
- func (st *DummyRecentRecords[T, U]) ConfirmRecordCheck(rec Storable)
- func (st *DummyRecentRecords[T, U]) EmptyStateData() U
- func (st *DummyRecentRecords[T, U]) ForEach(clusteringID string, fn func(item T))
- func (st *DummyRecentRecords[T, U]) GetLastCheck(clusteringID string) time.Time
- func (st *DummyRecentRecords[T, U]) GetStateData(dtNow time.Time) U
- func (st *DummyRecentRecords[T, U]) NumOfRecords(clusteringID string) int
- func (st *DummyRecentRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)
- func (st *DummyRecentRecords[T, U]) Report() map[string]any
- func (st *DummyRecentRecords[T, U]) SetStateData(stateData U)
- func (st *DummyRecentRecords[T, U]) TotalForEach(fn func(item T))
- func (st *DummyRecentRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int
- type PrevRecords
- func (st *PrevRecords[T, U]) AddRecord(rec T)
- func (st *PrevRecords[T, U]) ClearOldRecords(maxAge time.Time) int
- func (st *PrevRecords[T, U]) ConfirmRecordCheck(rec Storable)
- func (st *PrevRecords[T, U]) EmptyStateData() U
- func (st *PrevRecords[T, U]) ForEach(clusteringID string, fn func(item T))
- func (st *PrevRecords[T, U]) GetLastCheck(clusteringID string) time.Time
- func (st *PrevRecords[T, U]) GetStateData(dtNow time.Time) U
- func (st *PrevRecords[T, U]) NumOfRecords(clusteringID string) int
- func (st *PrevRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)
- func (st *PrevRecords[T, U]) SetStateData(stateData U)
- func (st *PrevRecords[T, U]) TotalForEach(fn func(item T))
- func (st *PrevRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int
- type SampleWithReplac
- type SerializableState
- type Storable
Constants ¶
const (
DfltPrevNumReqsSampleSize = 10
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AbstractRecentRecords ¶
type AbstractRecentRecords[T Storable, U SerializableState] interface { AddRecord(rec T) // ConfirmRecordCheck sets a time of last check // for the records with the same clustering ID // as `rec`. ConfirmRecordCheck(rec Storable) // GetLastCheck returns time of the last time items // with specified `clusteringID` where checked. GetLastCheck(clusteringID string) time.Time // RemoveAnalyzedRecords removes all the records with specified // `clusteringID` up until the defined time. RemoveAnalyzedRecords(clusteringID string, dt time.Time) NumOfRecords(clusteringID string) int // ClearOldRecords is a maintenance function called // randomly by a respective log processor to keep // the number of records in RAM at a reasonable level. // The method should return number of removed items // (it is mostly for better overview, i.e. not essential) ClearOldRecords(maxAge time.Time) int // TotalNumOfRecordsSince returns number of records // for each clusteringID with its time greater or equal // to the `dt`. TotalNumOfRecordsSince(dt time.Time) int ForEach(clusteringID string, fn func(item T)) // TotalForEach apply a provided function on all items // no matter what clusteringID they belong to TotalForEach(fn func(item T)) // SetStateData sets the current state data for later reuse // It may or may not backup data to disk or database. // If applicable then `GetStateData` should load the data // in case nothing was set yet. SetStateData(stateData U) GetStateData(dtNow time.Time) U EmptyStateData() U }
type BotDetectionConf ¶
type BotDetectionConf struct {
// IPOutlierCoeff specifies how far from the Q3 must a value be
// to be considered an outlier (the formula is `Q3 + ipOutlierCoeff * IQR`)
IPOutlierCoeff float64 `json:"ipOutlierCoeff"`
// IPOutlierMinFreq specifies minimum number of requests for
// an IP (per interval defined in buffer config `AnalysisIntervalSecs`)
// to be actually reported. Because in case there is small traffic, even
// legit IP requests may be evaluated as outliers.
IPOutlierMinFreq int `json:"ipOutlierMinFreq"`
// BlocklistIP is just for "known" IPs reporting (i.e. there is no
// actual blocking involved - klogproc indeed cannot block anything).
BlocklistIP []string `json:"blocklistIp"`
// TrafficReportingThreshold defines a number specifying how much
// a number of requests must have changed from the last check
// (see `AnalysisIntervalSecs`) to be considered abnormal.
// Please note that this number is really hard to tune as during
// day, there are natural increases of traffic and without knowing
// a typical (or even current) day requests progression, this is
// rather a hint then a 100% evidence of bot activity.
TrafficReportingThreshold float64 `json:"trafficReportingThreshold"`
PrevNumReqsSampleSize int `json:"prevNumReqsSampleSize"`
}
type BufferConf ¶
type BufferConf struct {
// ID buffers with ID can be shared between multiple log readers.
// This makes sense mostly for services composed of multiple
// homogenous processes each writing to its log file (e.g. Node.JS)
ID string `json:"id"`
HistoryLookupItems int `json:"historyLookupItems"`
// AnalysisIntervalSecs specifies how often klogproc analyses previous
// records. The interval is also important because it is a base for other
// configured values (typically different limits/thresholds)
AnalysisIntervalSecs int `json:"analysisIntervalSecs"`
ClusteringDBScan *ClusteringDBScanConf `json:"clusteringDbScan"`
BotDetection *BotDetectionConf `json:"botDetection"`
}
func (*BufferConf) HasConfiguredBufferProcessing ¶
func (bc *BufferConf) HasConfiguredBufferProcessing() bool
func (*BufferConf) IsReference ¶
func (bc *BufferConf) IsReference() bool
func (*BufferConf) IsShared ¶
func (bc *BufferConf) IsShared() bool
func (*BufferConf) Validate ¶
func (bc *BufferConf) Validate() error
type ClusteringDBScanConf ¶
type DummyRecentRecords ¶
type DummyRecentRecords[T Storable, U SerializableState] struct { // contains filtered or unexported fields }
func NewDummyStorage ¶
func NewDummyStorage[T Storable, U SerializableState](stateDataFactory func() U) *DummyRecentRecords[T, U]
func (*DummyRecentRecords[T, U]) AddRecord ¶
func (st *DummyRecentRecords[T, U]) AddRecord(rec T)
func (*DummyRecentRecords[T, U]) ClearOldRecords ¶
func (st *DummyRecentRecords[T, U]) ClearOldRecords(maxAge time.Time) int
func (*DummyRecentRecords[T, U]) ConfirmRecordCheck ¶
func (st *DummyRecentRecords[T, U]) ConfirmRecordCheck(rec Storable)
func (*DummyRecentRecords[T, U]) EmptyStateData ¶
func (st *DummyRecentRecords[T, U]) EmptyStateData() U
func (*DummyRecentRecords[T, U]) ForEach ¶
func (st *DummyRecentRecords[T, U]) ForEach(clusteringID string, fn func(item T))
func (*DummyRecentRecords[T, U]) GetLastCheck ¶
func (st *DummyRecentRecords[T, U]) GetLastCheck(clusteringID string) time.Time
func (*DummyRecentRecords[T, U]) GetStateData ¶
func (st *DummyRecentRecords[T, U]) GetStateData(dtNow time.Time) U
func (*DummyRecentRecords[T, U]) NumOfRecords ¶
func (st *DummyRecentRecords[T, U]) NumOfRecords(clusteringID string) int
func (*DummyRecentRecords[T, U]) RemoveAnalyzedRecords ¶
func (st *DummyRecentRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)
func (*DummyRecentRecords[T, U]) Report ¶
func (st *DummyRecentRecords[T, U]) Report() map[string]any
func (*DummyRecentRecords[T, U]) SetStateData ¶
func (st *DummyRecentRecords[T, U]) SetStateData(stateData U)
func (*DummyRecentRecords[T, U]) TotalForEach ¶
func (st *DummyRecentRecords[T, U]) TotalForEach(fn func(item T))
func (*DummyRecentRecords[T, U]) TotalNumOfRecordsSince ¶
func (st *DummyRecentRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int
type PrevRecords ¶
type PrevRecords[T Storable, U SerializableState] struct { // contains filtered or unexported fields }
PrevRecords keeps: 1. a defined number of log records in memory (using a circular list for each "clustering ID") 2. custom state object (used typically to store stats in bot detection) 3. last check (both global and for individual "clustering IDs" (which is session + IP))
All the functions are safe to be used concurrently. The `T` type represents a log record type stored by this PrevRecords. The `U` type is a type used to store state data when dealing with persistence.
func NewStorage ¶
func NewStorage[T Storable, U SerializableState]( bufferConf *BufferConf, worklogReset bool, storageDirPath string, analyzedLogFilePath string, stateDataFactory func() U, ) *PrevRecords[T, U]
NewStorage is a recommended factory for creating `Storage`
func (*PrevRecords[T, U]) AddRecord ¶
func (st *PrevRecords[T, U]) AddRecord(rec T)
func (*PrevRecords[T, U]) ClearOldRecords ¶
func (st *PrevRecords[T, U]) ClearOldRecords(maxAge time.Time) int
func (*PrevRecords[T, U]) ConfirmRecordCheck ¶
func (st *PrevRecords[T, U]) ConfirmRecordCheck(rec Storable)
func (*PrevRecords[T, U]) EmptyStateData ¶
func (st *PrevRecords[T, U]) EmptyStateData() U
func (*PrevRecords[T, U]) ForEach ¶
func (st *PrevRecords[T, U]) ForEach(clusteringID string, fn func(item T))
ForEach iterates over stored records with the provided `clusteringID` and calls the provided `fn` with each item as an argument.
func (*PrevRecords[T, U]) GetLastCheck ¶
func (st *PrevRecords[T, U]) GetLastCheck(clusteringID string) time.Time
func (*PrevRecords[T, U]) GetStateData ¶
func (st *PrevRecords[T, U]) GetStateData(dtNow time.Time) U
func (*PrevRecords[T, U]) NumOfRecords ¶
func (st *PrevRecords[T, U]) NumOfRecords(clusteringID string) int
NumOfRecords gets number of stored records for a specific records (identified by their `clusteringID`).
func (*PrevRecords[T, U]) RemoveAnalyzedRecords ¶
func (st *PrevRecords[T, U]) RemoveAnalyzedRecords(clusteringID string, dt time.Time)
RemoveAnalyzedRecords removes all the log records older than `dt` with provided `clusteringID` (which is typically something like userID, session, IP)
func (*PrevRecords[T, U]) SetStateData ¶
func (st *PrevRecords[T, U]) SetStateData(stateData U)
func (*PrevRecords[T, U]) TotalForEach ¶
func (st *PrevRecords[T, U]) TotalForEach(fn func(item T))
ForEach iterates over all stored records (no matter what clustering ID they have) and calls the provided `fn` with each item as an argument.
Please note that the records are not sorted by date here as the method iterates in two nested loops - first one goes through all the record groups (= records with the same clustering ID) and the for each this group it iterates through all its items.
func (*PrevRecords[T, U]) TotalNumOfRecordsSince ¶
func (st *PrevRecords[T, U]) TotalNumOfRecordsSince(dt time.Time) int
TotalNumOfRecords returns total number of stored records no matter what clustering ID they have but with its time greater or equal to `dt`
type SampleWithReplac ¶
func NewSampleWithReplac ¶
func NewSampleWithReplac[T any](initialCap int) *SampleWithReplac[T]
func (*SampleWithReplac[T]) Add ¶
func (sample *SampleWithReplac[T]) Add(item T) int
Add adds a new value to the sample. It returns the sample size after the value was added
func (*SampleWithReplac[T]) GetAll ¶
func (sample *SampleWithReplac[T]) GetAll() []T
func (*SampleWithReplac[T]) Len ¶
func (sample *SampleWithReplac[T]) Len() int
func (*SampleWithReplac[T]) Resize ¶
func (sample *SampleWithReplac[T]) Resize(newSize int)
type SerializableState ¶
type SerializableState interface {
ToJSON() ([]byte, error)
// AfterLoadNormalize should make sure loaded
// data matches the provided `conf`.
// E.g. if stored samples have length of 100
// and the current configuration requires 20,
// the method should cut the sample so it matches
// the configuration.
// It should also fix broken stored data (e.g. samples
// with size 0)
AfterLoadNormalize(conf *BufferConf, dt time.Time)
// Report is mainly for debugging and overview
// pursposes. It should show relevant values of the
// state object.
Report() map[string]any
}