inpututils

package
v0.1.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2021 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MetadataLastModified = "last_modified"
	MetadataURL          = "url"
)

These keys identify values in the record Metadata cache

Variables

This section is empty.

Functions

func SetGCPercentIfNotSet

func SetGCPercentIfNotSet(percent int)

SetGCPercentIfNotSet sets the GC target percentage, unless GOGC environment variable is set, in which case SetGCPercentIfNotSet doesn't not override it and let it as is.

Types

type CompressedInput

type CompressedInput struct {
	Opener func(fn string) (io.ReadCloser, int64, time.Time, *url.URL, error)
	Sizer  func(fn string) (int64, error)
	Done   chan bool
	// contains filtered or unexported fields
}

CompressedInput is a base for creating input components that processes multiple gzip or zstd-compressed logs coming from arbitrary sources.

This class implements an internal queue of files (expressed by filenames) and instantiates a number of workers to process them. Subclasses can enqueue a file for processing through compressedInput.ProcessFile()

It must be configured with an Opener function that is able to open a file given its filename and returns a io.ReadCloser instance for that file.

func NewCompressedInput

func NewCompressedInput(opener func(fn string) (io.ReadCloser, int64, time.Time, *url.URL, error), sizer func(fn string) (int64, error), done chan bool) *CompressedInput

func (*CompressedInput) FreeMem

func (s *CompressedInput) FreeMem(data *baker.Data)

func (*CompressedInput) NoMoreFiles

func (s *CompressedInput) NoMoreFiles()

Signal compressedInput that we've finished enqueuing files, and it can exit whenever it has finished processing what was already enqueued. This can be used by an input which has a fixed set of files to process.

func (*CompressedInput) ParseFile

func (s *CompressedInput) ParseFile(fn string)

func (*CompressedInput) ProcessFile

func (s *CompressedInput) ProcessFile(fn string) error

Enqueue a file for processing by compressedInput. This function must be called by subclasses to schedule processing a (gzip|zstd) logfile. The function just enqueues the file and exits, so it's normally fast, but might block if the backlog is bigger than internal channel size (default: 1024 files)

func (*CompressedInput) SetOutputChannel

func (s *CompressedInput) SetOutputChannel(data chan<- *baker.Data)

func (*CompressedInput) Stats

func (s *CompressedInput) Stats() baker.InputStats

func (*CompressedInput) Stop

func (s *CompressedInput) Stop()

type S3Input

type S3Input struct {
	*CompressedInput

	Bucket string
	// contains filtered or unexported fields
}

func NewS3Input

func NewS3Input(region, bucket string) *S3Input

func (*S3Input) ProcessDirectory

func (s *S3Input) ProcessDirectory(prefix string) error

ProcessDirectory enqueues all files matching a specific prefix for processing by s3Input. If prefix is actually a s3 url use the bucket there instead of the one provided at creation time.

This function makes (multiple) remotes call to acquire the listing of all files matching the specified prefix in the bucket, and enqueue them for processing,

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL