processor

package
v1.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2021 License: AGPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ParsedEventBufferSize is the size of the buffer of the Go channel containing the parsed events.
	// Since there are different goroutines writing and reading from that channel each with different I/O characteristics,
	// we are specifying this buffer to avoid blocking the goroutines that write to the channel if the reader goroutine is
	// temporarily busy. The writer goroutines will block writing but only when the buffer has been full - something we need
	// to avoid using up lot of memory.
	// see also: https://golang.org/doc/effective_go.html#channels
	ParsedEventBufferSize = 1000
)

Functions

func PollEvents added in v1.13.0

func PollEvents(
	ctx context.Context,
	sqsClient sqsiface.SQSAPI,
	resolver pantherlog.ParserResolver,
) (sqsMessageCount int, err error)

PollEvents acts as an interface to aggregate sqs messages to avoid many small S3 files being created under load. The function will attempt to read more messages from the queue when the queue has messages. Under load the lambda will continue to read events and maximally aggregate data to produce fewer, bigger files. Fewer, bigger files makes Athena queries much faster.

func Process

func Process(
	ctx context.Context,
	dataStreams <-chan *common.DataStream,
	destination destinations.Destination,
	newProcessor func(stream *common.DataStream) (*Processor, error),
) error

Process orchestrates the tasks of parsing logs, classification, normalization and forwarding the logs to the appropriate destination. Any errors will cause Lambda invocation to fail

func RunScalingDecisions added in v1.13.0

func RunScalingDecisions(ctx context.Context, sqsClient sqsiface.SQSAPI, lambdaClient lambdaiface.LambdaAPI, interval time.Duration)

RunScalingDecisions makes periodic adaptive decisions to scale up based on the sqs queue stats

Types

type Factory added in v1.11.0

type Factory func(r *common.DataStream) (*Processor, error)

func NewFactory added in v1.11.0

func NewFactory(resolver pantherlog.ParserResolver) Factory

type ProcessFunc added in v1.11.0

type ProcessFunc func(streamCh <-chan *common.DataStream, dest destinations.Destination) error

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL