agent

package
v0.0.0-...-3735483 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2018 License: Apache-2.0 Imports: 50 Imported by: 0

Documentation

Overview

Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.

The Agent Interface

The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls (hot+cold)
  • execute calls against containers
  • invoke Start and End for each call appropriately
  • check the mq for any async calls, and submit them

For more information about how an agent executes a call see the documentation on the Agent interface.

Variants

There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().

To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().

Index

Constants

View Source
const (
	EnvDockerNetworks          = "FN_DOCKER_NETWORKS"
	EnvFreezeIdle              = "FN_FREEZE_IDLE_MSECS"
	EnvEjectIdle               = "FN_EJECT_IDLE_MSECS"
	EnvHotPoll                 = "FN_HOT_POLL_MSECS"
	EnvHotLauncherTimeout      = "FN_HOT_LAUNCHER_TIMEOUT_MSECS"
	EnvAsyncChewPoll           = "FN_ASYNC_CHEW_POLL_MSECS"
	EnvCallEndTimeout          = "FN_CALL_END_TIMEOUT_MSECS"
	EnvMaxCallEndStacking      = "FN_MAX_CALL_END_STACKING"
	EnvMaxResponseSize         = "FN_MAX_RESPONSE_SIZE"
	EnvMaxLogSize              = "FN_MAX_LOG_SIZE_BYTES"
	EnvMaxTotalCPU             = "FN_MAX_TOTAL_CPU_MCPUS"
	EnvMaxTotalMemory          = "FN_MAX_TOTAL_MEMORY_BYTES"
	EnvMaxFsSize               = "FN_MAX_FS_SIZE_MB"
	EnvPreForkPoolSize         = "FN_EXPERIMENTAL_PREFORK_POOL_SIZE"
	EnvPreForkImage            = "FN_EXPERIMENTAL_PREFORK_IMAGE"
	EnvPreForkCmd              = "FN_EXPERIMENTAL_PREFORK_CMD"
	EnvPreForkUseOnce          = "FN_EXPERIMENTAL_PREFORK_USE_ONCE"
	EnvPreForkNetworks         = "FN_EXPERIMENTAL_PREFORK_NETWORKS"
	EnvEnableNBResourceTracker = "FN_ENABLE_NB_RESOURCE_TRACKER"

	MaxDisabledMsecs = time.Duration(math.MaxInt64)
)
View Source
const (
	Mem1MB = 1024 * 1024
	Mem1GB = 1024 * 1024 * 1024
)

Variables

View Source
var (
	ErrorExpectedTry  = errors.New("Protocol failure: expected ClientMsg_Try")
	ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
)
View Source
var (
	ErrorPoolClosed       = errors.New("Runner pool closed")
	ErrorPoolRunnerExists = errors.New("Runner already exists")
)
View Source
var CapacityFull = errors.New("max capacity reached")
View Source
var (
	ErrorRunnerClosed = errors.New("Runner is closed")
)

Functions

func DefaultStaticRunnerPool

func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool

func GetGroupID

func GetGroupID(call *models.Call) string

func NewCallHandle

func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle

func NewHotContainer

func NewHotContainer(call *call, cfg *AgentConfig) (*container, func())

func NewSlotQueue

func NewSlotQueue(key string) *slotQueue

func NewSlotQueueMgr

func NewSlotQueueMgr() *slotQueueMgr

func NewStaticRunnerPool

func NewStaticRunnerPool(runnerAddresses []string, pki *pool.PKIData, runnerCN string, runnerFactory pool.MTLSRunnerFactory) pool.RunnerPool

func SecureGRPCRunnerFactory

func SecureGRPCRunnerFactory(addr, runnerCertCN string, pki *pool.PKIData) (pool.Runner, error)

Types

type Agent

type Agent interface {
	// GetCall will return a Call that is executable by the Agent, which
	// can be built via various CallOpt's provided to the method.
	GetCall(...CallOpt) (Call, error)

	// Submit will attempt to execute a call locally, a Call may store information
	// about itself in its Start and End methods, which will be called in Submit
	// immediately before and after the Call is executed, respectively. An error
	// will be returned if there is an issue executing the call or the error
	// may be from the call's execution itself (if, say, the container dies,
	// or the call times out).
	Submit(Call) error

	// Close will wait for any outstanding calls to complete and then exit.
	// Close is not safe to be called from multiple threads.
	io.Closer

	AddCallListener(fnext.CallListener)

	// Enqueue is to use the agent's sweet sweet client bindings to remotely
	// queue async tasks and should be removed from Agent interface ASAP.
	Enqueue(context.Context, *models.Call) error

	// GetAppID is to get the match of an app name to its ID
	GetAppID(ctx context.Context, appName string) (string, error)

	// GetAppByID is to get the app by ID
	GetAppByID(ctx context.Context, appID string) (*models.App, error)

	// GetRoute is to get the route by appId and path
	GetRoute(ctx context.Context, appID string, path string) (*models.Route, error)
}

Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:

  • manage the memory pool for a given server
  • manage the container lifecycle for calls (hot+cold)
  • execute calls against containers
  • invoke Start and End for each call appropriately
  • check the mq for any async calls, and submit them

Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. Cold calls will launch one container each. Cold calls will get container input and output directly, whereas hot calls will be able to read/write directly from/to a pipe in a container via Dispatch. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before starting a container, if cold (i.e. after pulling), or immediately before sending any input, if hot. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.

func DefaultPureRunner

func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (Agent, error)

func New

func New(da DataAccess, options ...AgentOption) Agent

New creates an Agent that executes functions locally as Docker containers.

func NewLBAgent

func NewLBAgent(da DataAccess, rp pool.RunnerPool, p pool.Placer) (Agent, error)

NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.

func NewPureRunner

func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (Agent, error)

func UnsecuredPureRunner

func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (Agent, error)

type AgentConfig

type AgentConfig struct {
	MinDockerVersion        string        `json:"min_docker_version"`
	DockerNetworks          string        `json:"docker_networks"`
	FreezeIdle              time.Duration `json:"freeze_idle_msecs"`
	EjectIdle               time.Duration `json:"eject_idle_msecs"`
	HotPoll                 time.Duration `json:"hot_poll_msecs"`
	HotLauncherTimeout      time.Duration `json:"hot_launcher_timeout_msecs"`
	AsyncChewPoll           time.Duration `json:"async_chew_poll_msecs"`
	CallEndTimeout          time.Duration `json:"call_end_timeout"`
	MaxCallEndStacking      uint64        `json:"max_call_end_stacking"`
	MaxResponseSize         uint64        `json:"max_response_size_bytes"`
	MaxLogSize              uint64        `json:"max_log_size_bytes"`
	MaxTotalCPU             uint64        `json:"max_total_cpu_mcpus"`
	MaxTotalMemory          uint64        `json:"max_total_memory_bytes"`
	MaxFsSize               uint64        `json:"max_fs_size_mb"`
	PreForkPoolSize         uint64        `json:"pre_fork_pool_size"`
	PreForkImage            string        `json:"pre_fork_image"`
	PreForkCmd              string        `json:"pre_fork_pool_cmd"`
	PreForkUseOnce          uint64        `json:"pre_fork_use_once"`
	PreForkNetworks         string        `json:"pre_fork_networks"`
	EnableNBResourceTracker bool          `json:"enable_nb_resource_tracker"`
}

func NewAgentConfig

func NewAgentConfig() (*AgentConfig, error)

type AgentOption

type AgentOption func(*agent) error

func WithConfig

func WithConfig(cfg *AgentConfig) AgentOption

type CachedDataAccess

type CachedDataAccess struct {
	DataAccess
	// contains filtered or unexported fields
}

CachedDataAccess wraps a DataAccess and caches the results of GetApp and GetRoute.

func (*CachedDataAccess) GetAppByID

func (da *CachedDataAccess) GetAppByID(ctx context.Context, appID string) (*models.App, error)

func (*CachedDataAccess) GetAppID

func (da *CachedDataAccess) GetAppID(ctx context.Context, appName string) (string, error)

func (*CachedDataAccess) GetRoute

func (da *CachedDataAccess) GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error)

type Call

type Call interface {
	// Model will return the underlying models.Call configuration for this call.
	// TODO we could respond to async correctly from agent but layering, this
	// is only because the front end has different responses based on call type.
	// try to discourage use elsewhere until this gets pushed down more...
	Model() *models.Call

	// Start will be called before this call is executed, it may be used to
	// guarantee mutual exclusion, check docker permissions, update timestamps,
	// etc.
	// TODO Start and End can likely be unexported as they are only used in the agent,
	// and on a type which is constructed in a specific agent. meh.
	Start(ctx context.Context) error

	// End will be called immediately after attempting a call execution,
	// regardless of whether the execution failed or not. An error will be passed
	// to End, which if nil indicates a successful execution. Any error returned
	// from End will be returned as the error from Submit.
	End(ctx context.Context, err error) error
}

type CallOpt

type CallOpt func(c *call) error

TODO build w/o closures... lazy

func FromModel

func FromModel(mCall *models.Call) CallOpt

TODO this currently relies on FromRequest having happened before to create the model here, to be a fully qualified model. We probably should double check but having a way to bypass will likely be what's used anyway unless forced.

func FromModelAndInput

func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt

func FromRequest

func FromRequest(a Agent, app *models.App, path string, req *http.Request) CallOpt

func WithContext

func WithContext(ctx context.Context) CallOpt

func WithWriter

func WithWriter(w io.Writer) CallOpt

TODO this should be required

type CapacityGate

type CapacityGate interface {
	// CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is
	// guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units
	// of capacity have been committed.
	CheckAndReserveCapacity(units uint64) error

	// ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under
	// zero; implementations are free to panic in that case.
	ReleaseCapacity(units uint64)
}

type ContainerState

type ContainerState interface {
	UpdateState(ctx context.Context, newState ContainerStateType, slots *slotQueue)
}

func NewContainerState

func NewContainerState() ContainerState

type ContainerStateType

type ContainerStateType int
const (
	ContainerStateNone  ContainerStateType = iota // uninitialized
	ContainerStateWait                            // resource (cpu + mem) waiting
	ContainerStateStart                           // launching
	ContainerStateIdle                            // running idle
	ContainerStateBusy                            // running busy
	ContainerStateDone                            // exited/failed/done
	ContainerStateMax
)

type DataAccess

type DataAccess interface {
	GetAppID(ctx context.Context, appName string) (string, error)

	// GetAppByID abstracts querying the datastore for an app.
	GetAppByID(ctx context.Context, appID string) (*models.App, error)

	// GetRoute abstracts querying the datastore for a route within an app.
	GetRoute(ctx context.Context, appID string, routePath string) (*models.Route, error)

	// Enqueue will add a Call to the queue (ultimately forwards to mq.Push).
	Enqueue(ctx context.Context, mCall *models.Call) error

	// Dequeue will query the queue for the next available Call that can be run
	// by this Agent, and reserve it (ultimately forwards to mq.Reserve).
	Dequeue(ctx context.Context) (*models.Call, error)

	// Start will attempt to start the provided Call within an appropriate
	// context.
	Start(ctx context.Context, mCall *models.Call) error

	// Finish will notify the system that the Call has been processed, and
	// fulfill the reservation in the queue if the call came from a queue.
	Finish(ctx context.Context, mCall *models.Call, stderr io.Reader, async bool) error
}

DataAccess abstracts the datastore and message queue operations done by the agent, so that API nodes and runner nodes can work with the same interface but actually operate on the data in different ways (by direct access or by mediation through an API node).

func NewCachedDataAccess

func NewCachedDataAccess(da DataAccess) DataAccess

func NewDirectDataAccess

func NewDirectDataAccess(ds models.Datastore, ls models.LogStore, mq models.MessageQueue) DataAccess

type Param

type Param struct {
	Key   string
	Value string
}

type Params

type Params []Param

type RequestState

type RequestState interface {
	UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)
}

func NewRequestState

func NewRequestState() RequestState

type RequestStateType

type RequestStateType int
const (
	RequestStateNone RequestStateType = iota // uninitialized
	RequestStateWait                         // request is waiting
	RequestStateExec                         // request is executing
	RequestStateDone                         // request is done
	RequestStateMax
)

type ResourceToken

type ResourceToken interface {
	// Close must be called by any thread that receives a token.
	io.Closer
	Error() error
}

type ResourceTracker

type ResourceTracker interface {
	// WaitAsyncResource returns a channel that will send once when there seem to be sufficient
	// resource levels to run an async task, it is up to the implementer to create policy here.
	WaitAsyncResource(ctx context.Context) chan struct{}

	// GetResourceToken returns a channel to wait for a resource token on. If the provided context is canceled,
	// the channel will never receive anything. If it is not possible to fulfill this resource, the channel
	// will never receive anything (use IsResourcePossible). If a resource token is available for the provided
	// resource parameters, it will otherwise be sent once on the returned channel. The channel is never closed.
	// if isNB is set, resource check is done and error token is returned without blocking.
	// if isAsync is set, resource allocation specific for async requests is considered. (eg. always allow
	// a sync only reserve area) Memory is expected to be provided in MB units.
	GetResourceToken(ctx context.Context, memory, cpuQuota uint64, isAsync, isNB bool) <-chan ResourceToken

	// IsResourcePossible returns whether it's possible to fulfill the requested resources on this
	// machine. It must be called before GetResourceToken or GetResourceToken may hang.
	// Memory is expected to be provided in MB units.
	IsResourcePossible(memory, cpuQuota uint64, isAsync bool) bool

	// returns number of waiters waiting for a resource token blocked on condition variable
	GetResourceTokenWaiterCount() uint64
}

A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: add cpu, disk, network IO for future

func NewResourceTracker

func NewResourceTracker(cfg *AgentConfig) ResourceTracker

type Slot

type Slot interface {
	Close(ctx context.Context) error
	Error() error
	// contains filtered or unexported methods
}

Directories

Path Synopsis
Package drivers is intended as a general purpose container abstraction library.
Package drivers is intended as a general purpose container abstraction library.
docker
Package docker provides a Docker driver for Fn.
Package docker provides a Docker driver for Fn.
mock
Package mock provides a fake Driver implementation that is only used for testing.
Package mock provides a fake Driver implementation that is only used for testing.
Package runner is a generated protocol buffer package.
Package runner is a generated protocol buffer package.
Package protocol defines the protocol between the Fn Agent and the code running inside of a container.
Package protocol defines the protocol between the Fn Agent and the code running inside of a container.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL