agent

package
v0.0.0-...-c0df949 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2018 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Mem1MB = 1024 * 1024
	Mem1GB = 1024 * 1024 * 1024
)

Variables

This section is empty.

Functions

func NewSlotQueue

func NewSlotQueue(key string) *slotQueue

func NewSlotQueueMgr

func NewSlotQueueMgr() *slotQueueMgr

func StatsComplete

func StatsComplete(ctx context.Context)

func StatsDequeue

func StatsDequeue(ctx context.Context)

Call when a function has been queued but cannot be started because of an error

func StatsDequeueAndFail

func StatsDequeueAndFail(ctx context.Context)

func StatsDequeueAndStart

func StatsDequeueAndStart(ctx context.Context)

func StatsEnqueue

func StatsEnqueue(ctx context.Context)

func StatsFailed

func StatsFailed(ctx context.Context)

func StatsIncrementErrors

func StatsIncrementErrors(ctx context.Context)

func StatsIncrementTimedout

func StatsIncrementTimedout(ctx context.Context)

func StatsIncrementTooBusy

func StatsIncrementTooBusy(ctx context.Context)

Types

type Agent

type Agent interface {
	// GetCall will return a Call that is executable by the Agent, which
	// can be built via various CallOpt's provided to the method.
	GetCall(...CallOpt) (Call, error)

	// Submit will attempt to execute a call locally, a Call may store information
	// about itself in its Start and End methods, which will be called in Submit
	// immediately before and after the Call is executed, respectively. An error
	// will be returned if there is an issue executing the call or the error
	// may be from the call's execution itself (if, say, the container dies,
	// or the call times out).
	Submit(Call) error

	// Close will wait for any outstanding calls to complete and then exit.
	// Close is not safe to be called from multiple threads.
	io.Closer

	// Return the http.Handler used to handle Prometheus metric requests
	PromHandler() http.Handler
	AddCallListener(fnext.CallListener)

	// Enqueue is to use the agent's sweet sweet client bindings to remotely
	// queue async tasks and should be removed from Agent interface ASAP.
	Enqueue(context.Context, *models.Call) error
}

func New

func New(da DataAccess) Agent

type CachedDataAccess

type CachedDataAccess struct {
	DataAccess
	// contains filtered or unexported fields
}

CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.

func (*CachedDataAccess) GetApp

func (da *CachedDataAccess) GetApp(ctx context.Context, appName string) (*models.App, error)

func (*CachedDataAccess) GetRoute

func (da *CachedDataAccess) GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error)

type Call

type Call interface {
	// Model will return the underlying models.Call configuration for this call.
	// TODO we could respond to async correctly from agent but layering, this
	// is only because the front end has different responses based on call type.
	// try to discourage use elsewhere until this gets pushed down more...
	Model() *models.Call

	// Start will be called before this call is executed, it may be used to
	// guarantee mutual exclusion, check docker permissions, update timestamps,
	// etc.
	// TODO Start and End can likely be unexported as they are only used in the agent,
	// and on a type which is constructed in a specific agent. meh.
	Start(ctx context.Context) error

	// End will be called immediately after attempting a call execution,
	// regardless of whether the execution failed or not. An error will be passed
	// to End, which if nil indicates a successful execution. Any error returned
	// from End will be returned as the error from Submit.
	End(ctx context.Context, err error) error
}

type CallOpt

type CallOpt func(a *agent, c *call) error

TODO build w/o closures... lazy

func FromModel

func FromModel(mCall *models.Call) CallOpt

TODO this currently relies on FromRequest having happened before to create the model here, to be a fully qualified model. We probably should double check but having a way to bypass will likely be what's used anyway unless forced.

func FromRequest

func FromRequest(appName, path string, req *http.Request) CallOpt

func WithContext

func WithContext(ctx context.Context) CallOpt

func WithWriter

func WithWriter(w io.Writer) CallOpt

TODO this should be required

type ContainerState

type ContainerState interface {
	UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
}

func NewContainerState

func NewContainerState() ContainerState

type ContainerStateType

type ContainerStateType int
const (
	ContainerStateNone  ContainerStateType = iota // uninitialized
	ContainerStateWait                            // resource (cpu + mem) waiting
	ContainerStateStart                           // launching
	ContainerStateIdle                            // running idle
	ContainerStateBusy                            // running busy
	ContainerStateDone                            // exited/failed/done
	ContainerStateMax
)

type DataAccess

type DataAccess interface {
	// GetApp abstracts querying the datastore for an app.
	GetApp(ctx context.Context, appName string) (*models.App, error)

	// GetRoute abstracts querying the datastore for a route within an app.
	GetRoute(ctx context.Context, appName string, routePath string) (*models.Route, error)

	// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
	Enqueue(ctx context.Context, mCall *models.Call) error

	// Dequeue will query the queue for the next available Call that can be run
	// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
	Dequeue(ctx context.Context) (*models.Call, error)

	// Start will attempt to start the provided Call within an appropriate
	// context.
	Start(ctx context.Context, mCall *models.Call) error

	// Finish will notify the system that the Call has been processed, and
	// fulfill the reservation in the queue if the call came from a queue.
	Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}

DataAccess abstracts the datastore and message queue operations done by the agent, so that API nodes and runner nodes can work with the same interface but actually operate on the data in different ways (by direct access or by mediation through an API node).

func NewCachedDataAccess

func NewCachedDataAccess(da DataAccess) DataAccess

func NewDirectDataAccess

func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess

type Param

type Param struct {
	Key   string
	Value string
}

type Params

type Params []Param

type RequestState

type RequestState interface {
	UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}

func NewRequestState

func NewRequestState() RequestState

type RequestStateType

type RequestStateType int
const (
	RequestStateNone RequestStateType = iota // uninitialized
	RequestStateWait                         // request is waiting
	RequestStateExec                         // request is executing
	RequestStateDone                         // request is done
	RequestStateMax
)

type ResourceToken

type ResourceToken interface {
	// Close must be called by any thread that receives a token.
	io.Closer
}

type ResourceTracker

type ResourceTracker interface {
	// WaitAsyncResource returns a channel that will send once when there seem to be sufficient
	// resource levels to run an async task, it is up to the implementer to create policy here.
	WaitAsyncResource(ctx context.Context) chan struct{}

	// GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled,
	// the channel will never receive anything. If it is not possible to fulfill this resource, the channel
	// will never receive anything (use IsResourcePossible). If a resource token is available for the provided
	// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
	// Memory is expected to be provided in MB units.
	GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync bool) <-chan ResourceToken

	// IsResourcePossible returns whether it's possible to fulfill the requested resources on this
	// machine. It must be called before GetResourceToken or GetResourceToken may hang.
	// Memory is expected to be provided in MB units.
	IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool

	// returns number of waiters waiting for a resource token blocked on condition variable
	GetResourceTokenWaiterCount() uint64
}

A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: add cpu, disk, network IO for future

func NewResourceTracker

func NewResourceTracker() ResourceTracker

type Slot

type Slot interface {
	Close(ctx context.Context) error
	Error() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL