active

package
v2.4.3+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 5, 2020 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Overview

Package active provides code for managing processing of an entire directory of task files.

Package active provides code for managing processing of an entire directory of task files.

Index

Constants

This section is empty.

Variables

View Source
var JobFailures = promauto.NewCounterVec(
	prometheus.CounterOpts{
		Name: "etl_job_failures",
		Help: "Job level failures.",
	},

	[]string{"prefix", "year", "type"},
)

JobFailures counts the all errors that result in test loss.

Provides metrics:

etl_job_failures{prefix, year, kind}

Example usage:

JobFailures.WithLabelValues("ndt/tcpinfo" "2019", "insert").Inc()

Functions

func MustStorageClient

func MustStorageClient(ctx context.Context) stiface.Client

MustStorageClient creates a default GCS client.

Types

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context implements context.Context, but allows injection of an alternate Err().

func WithFail

func WithFail(ctx context.Context) *Context

WithFail wraps a context to allow specifying custom error with Fail()

func (*Context) Err

func (c *Context) Err() error

Err returns nil, otherErr, or Context.Err()

func (*Context) Fail

func (c *Context) Fail(err error)

Fail cancels the context, and sets the result of context.Err()

type FileLister

type FileLister func(ctx context.Context) ([]*storage.ObjectAttrs, int64, error)

FileLister defines a function that returns a list of storage.ObjectAttrs.

func FileListerFunc

func FileListerFunc(sc stiface.Client, prefix string, filter *regexp.Regexp) FileLister

FileListerFunc creates a function that returns a slice of *storage.ObjectAttrs. On certain GCS errors, it may return partial result and an error. TODO - consider moving this to GardenerAPI.

type GCSSource

type GCSSource struct {
	// contains filtered or unexported fields
}

GCSSource implements RunnableSource for a GCS bucket/prefix.

func NewGCSSource

func NewGCSSource(ctx context.Context, label string, fl FileLister, toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error)

NewGCSSource creates a new source for active processing.

func (*GCSSource) CancelStreaming

func (src *GCSSource) CancelStreaming()

CancelStreaming terminates the streaming goroutine context.

func (*GCSSource) Label

func (src *GCSSource) Label() string

Label implements Source.Label

func (*GCSSource) Next

func (src *GCSSource) Next(ctx context.Context) (Runnable, error)

Next implements RunnableSource. It returns

the next pending job to run, OR
iterator.Done OR
ctx.Err() OR
gcs error

type GardenerAPI

type GardenerAPI struct {
	// contains filtered or unexported fields
}

GardenerAPI encapsulates the backend paths and clients to connect to gardener and GCS.

func NewGardenerAPI

func NewGardenerAPI(trackerBase url.URL, gcs stiface.Client) *GardenerAPI

NewGardenerAPI creates a GardenerAPI.

func (*GardenerAPI) JobFileSource

func (g *GardenerAPI) JobFileSource(ctx context.Context, job tracker.Job,
	toRunnable func(*storage.ObjectAttrs) Runnable) (*GCSSource, error)

JobFileSource creates a gcsSource for the job.

func (*GardenerAPI) NextJob

NextJob requests a new job from Gardener service.

func (*GardenerAPI) Poll

func (g *GardenerAPI) Poll(ctx context.Context,
	toRunnable func(o *storage.ObjectAttrs) Runnable, maxWorkers int, period time.Duration)

Poll requests work items from gardener, and processes them.

func (*GardenerAPI) RunAll

func (g *GardenerAPI) RunAll(ctx context.Context, rSrc RunnableSource, job tracker.Job) (*errgroup.Group, error)

RunAll will execute functions provided by Next() until there are no more, or the context is canceled.

func (*GardenerAPI) Status

func (g *GardenerAPI) Status(w http.ResponseWriter)

Status adds a small amount of status info to w.

type Runnable

type Runnable interface {
	Run(context.Context) error
	Info() string
}

Runnable is just a function that does something and returns an error. A Runnable may return ErrShouldRetry if there was a non-persistent error. TODO - should this instead be and interface, with Run() and ShouldRetry()?

type RunnableSource

type RunnableSource interface {
	// Next should return iterator.Done when there are no more Runnables.
	// It may block if there are no more runnables available right now,
	// (or if throttling is applied)
	Next(ctx context.Context) (Runnable, error)

	// Label returns a string for use in metrics and debug logs'
	Label() string
}

RunnableSource provides a Next function that returns Runnables.

func Throttle

func Throttle(src RunnableSource, tokens TokenSource) RunnableSource

Throttle applies a provided TokenSource to throttle a Source. This returns an interface, which is discouraged by Go advocates, but seems like the right thing to do here, as there is no reason to export the concrete type.

type TokenSource

type TokenSource interface {
	Acquire(ctx context.Context) error
	Release()
}

TokenSource specifies the interface for a source of tokens for throttling.

func NewWSTokenSource

func NewWSTokenSource(n int) TokenSource

NewWSTokenSource returns a TokenSource based on semaphore.Weighted.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL