Version: v0.3.749 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2019 License: Apache-2.0 Imports: 61 Imported by: 11



Package agent defines the Agent interface and related concepts. An agent is an entity that knows how to execute an Fn function.

The Agent Interface

The Agent interface is the heart of this package. Agent exposes an api to create calls from various parameters and then execute those calls. An Agent has a few roles:

* manage the memory pool for a given server
* manage the container lifecycle for calls
* execute calls against containers
* invoke Start and End for each call appropriately

For more information about how an agent executes a call see the documentation on the Agent interface.


There are two flavors of runner, the local Docker agent and a load-balancing agent. To create an agent that uses Docker containers to execute calls, use New().

To create an agent that can load-balance across a pool of sub-agents, use NewLBAgent().



View Source
const (
	// EnvContainerLabelTag is a classifier label tag that is used to distinguish fn managed containers
	EnvContainerLabelTag = "FN_CONTAINER_LABEL_TAG"
	// EnvImageCleanMaxSize enables image cleaner and sets the high water mark for image cache in bytes
	EnvImageCleanMaxSize = "FN_IMAGE_CLEAN_MAX_SIZE"
	// EnvImageCleanExemptTags list of image names separated by whitespace that are exempt from removal in image cleaner
	EnvImageCleanExemptTags = "FN_IMAGE_CLEAN_EXEMPT_TAGS"
	// EnvImageEnableVolume allows image to contain VOLUME definitions
	EnvImageEnableVolume = "FN_IMAGE_ENABLE_VOLUME"
	// EnvDockerNetworks is a comma separated list of networks to attach to each container started
	EnvDockerNetworks = "FN_DOCKER_NETWORKS"
	// EnvDockerLoadFile is a file location for a file that contains a tarball of a docker image to load on startup
	EnvDockerLoadFile = "FN_DOCKER_LOAD_FILE"
	// EnvDisableUnprivilegedContainers disables docker security features like user name, cap drop etc.
	EnvDisableUnprivilegedContainers = "FN_DISABLE_UNPRIVILEGED_CONTAINERS"
	// EnvFreezeIdle is the delay between a container being last used and being frozen
	// EnvHotPoll is the interval to ping for a slot manager thread to check if a container should be
	// launched for a given function
	// EnvHotLauncherTimeout is the timeout for a hot container queue to persist if idle
	// EnvHotStartTimeout is the timeout for a hot container to be created including docker-pull
	// EnvHotStartTimeout is the timeout for a hot container to become available for use for requests after EnvHotStartTimeout
	// EnvMaxResponseSize is the maximum number of bytes that a function may return from an invocation
	EnvMaxResponseSize = "FN_MAX_RESPONSE_SIZE"
	// EnvHdrMaxResponseSize is the maximum number of bytes that a function may return in an invocation header
	EnvMaxHdrResponseSize = "FN_MAX_HDR_RESPONSE_SIZE"
	// EnvMaxLogSize is the maximum size that a function's log may reach
	// EnvMaxTotalCPU is the maximum CPU that will be reserved across all containers
	// EnvMaxTotalMemory is the maximum memory that will be reserved across all containers
	// EnvMaxFsSize is the maximum filesystem size that a function may use
	EnvMaxFsSize = "FN_MAX_FS_SIZE_MB"
	// EnvMaxPIDs is the maximum number of PIDs that a function is allowed to create
	// EnvMaxOpenFiles is the maximum number open files handles the process in a
	// function is allowed to have
	EnvMaxOpenFiles = "FN_MAX_OPEN_FILES"
	// EnvMaxLockedMemory the maximum number of bytes of memory that may be
	// locked into RAM
	EnvMaxLockedMemory = "FN_MAX_LOCKED_MEMORY"
	// EnvMaxPendingSignals limit on the number of signals that may be queued
	EnvMaxPendingSignals = "FN_MAX_PENDING_SIGNALS"
	// EnvMaxMessageQueue limit on the number of bytes that can be allocated for
	// POSIX message queues
	EnvMaxMessageQueue = "FN_MAX_MESSAGE_QUEUE"
	// EnvPreForkPoolSize is the number of containers pooled to steal network from, this may reduce latency
	// EnvPreForkImage is the image to use for the pre-fork pool
	// EnvPreForkCmd is the command to run for images in the pre-fork pool, it should run for a long time
	// EnvPreForkUseOnce limits the number of times a pre-fork pool container may be used to one, they are otherwise recycled
	// EnvPreForkNetworks is the equivalent of EnvDockerNetworks but for pre-fork pool containers
	// EnvEnableNBResourceTracker makes every request to the resource tracker non-blocking, meaning the resources are either
	// available or it will return an error immediately
	// EnvMaxTmpFsInodes is the maximum number of inodes for /tmp in a container
	EnvMaxTmpFsInodes = "FN_MAX_TMPFS_INODES"
	// EnvDisableReadOnlyRootFs makes the root fs for a container have rw permissions, by default it is read only
	// EnvDisableDebugUserLogs disables user function logs being logged at level debug. wise to enable for production.
	EnvDisableDebugUserLogs = "FN_DISABLE_DEBUG_USER_LOGS"

	// EnvIOFSEnableTmpfs enables creating a per-container tmpfs mount for the IOFS
	EnvIOFSEnableTmpfs = "FN_IOFS_TMPFS"
	// EnvIOFSPath is the path within fn server container of a directory to configure for unix socket files for each container
	// EnvIOFSDockerPath determines the relative location on the docker host where iofs mounts should be prefixed with
	// EnvIOFSOpts are the options to set when mounting the iofs directory for unix socket files

	// EnvDetachedHeadroom is the extra room we want to give to a detached function to run.
	EnvDetachedHeadroom = "FN_EXECUTION_HEADROOM"

	// MaxMsDisabled is used to determine whether mr freeze is lying in wait. TODO remove this manuever
	MaxMsDisabled = time.Duration(math.MaxInt64)

	// DefaultHotPoll is the default value for EnvHotPoll
	DefaultHotPoll = 200 * time.Millisecond
View Source
const (
	// Here we give 5 seconds of timeout inside the container. We hardcode these numbers here to
	// ensure we control idle timeout & timeout as well as how long should cache be valid.
	// A cache duration of idleTimeout + 500 msecs allows us to reuse the cache, for about 1.5 secs,
	// and during this time, since we allow no queries to go through, the hot container times out.
	// For now, status tests a single case: a new hot container is spawned when cache is expired
	// and when a query is allowed to run.
	// TODO: we might want to mix this up and perhaps allow that hot container to handle
	// more than one query to test both 'new hot container' and 'old hot container' cases.
	StatusCallTimeout       = int32(5)
	StatusCallIdleTimeout   = int32(1)
	StatusCallCacheDuration = time.Duration(500)*time.Millisecond + time.Duration(StatusCallIdleTimeout)*time.Second

	// Total context timeout (scheduler+execution.) We need to allocate plenty of time here.
	// 60 seconds should be enough to provoke disk I/O errors, docker timeouts. etc.
	StatusCtxTimeout = time.Duration(60 * time.Second)
View Source
const (
	Mem1MB = 1024 * 1024
	Mem1GB = 1024 * 1024 * 1024

	// Assume 2GB RAM on non-linux systems
	DefaultNonLinuxMemory = 2048 * Mem1MB
View Source
const (
	// max buffer size for grpc data messages, 10K
	MaxDataChunk          = 10 * 1024
	DefaultConnectTimeout = 100 * time.Millisecond
View Source
const RegistryToken = "FN_REGISTRY_TOKEN"

RegistryToken is a reserved call extensions key to pass registry token



View Source
var (
	ErrorExpectedTry  = errors.New("Protocol failure: expected ClientMsg_Try")
	ErrorExpectedData = errors.New("Protocol failure: expected ClientMsg_Data")
View Source
var (
	ErrorRunnerClosed    = errors.New("Runner is closed")
	ErrorPureRunnerNoEOF = errors.New("Purerunner missing EOF response")
View Source
var (

	// AppIDMetricKey is a tag for metrics
	AppIDMetricKey = common.MakeKey("app_id")
	// FnIDMetricKey is a tag for metrics
	FnIDMetricKey = common.MakeKey("fn_id")
	// ImageNameMetricKey is a tag for metrics
	ImageNameMetricKey = common.MakeKey("image_name")
View Source
var CapacityFull = errors.New("max capacity reached")


func DefaultStaticRunnerPool

func DefaultStaticRunnerPool(runnerAddresses []string) pool.RunnerPool

func GetCallLatencies

func GetCallLatencies(c *call) (time.Duration, time.Duration)

func NewCallHandle

func NewCallHandle(engagement runner.RunnerProtocol_EngageServer) *callHandle

func NewDockerDriver

func NewDockerDriver(cfg *Config) (drivers.Driver, error)

NewDockerDriver creates a default docker driver from agent config

func NewSlotQueue

func NewSlotQueue(key string) *slotQueue

func NewSlotQueueMgr

func NewSlotQueueMgr() *slotQueueMgr

func NewStaticRunnerPool

func NewStaticRunnerPool(runnerAddresses []string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) pool.RunnerPool

func NewStatusTracker

func NewStatusTracker() *statusTracker

func NewStatusTrackerWithAgent

func NewStatusTrackerWithAgent(a Agent) *statusTracker

func NewgRPCRunner

func NewgRPCRunner(addr string, tlsConf *tls.Config, dialOpts ...grpc.DialOption) (pool.Runner, error)

func NewgRPCRunnerWithTimeout

func NewgRPCRunnerWithTimeout(addr string, tlsConf *tls.Config, timeout time.Duration, dialOpts ...grpc.DialOption) (pool.Runner, error)

func RegisterAgentViews

func RegisterAgentViews(tagKeys []string, latencyDist []float64)

RegisterAgentViews creates and registers all agent views

func RegisterContainerViews

func RegisterContainerViews(tagKeys []string, latencyDist []float64)

RegisterContainerViews creates and register containers views with provided tag keys

func RegisterDockerViews

func RegisterDockerViews(tagKeys []string, latencyDist, ioNetDist, ioDiskDist, memoryDist, cpuDist []float64)

RegisterDockerViews creates a and registers Docker views with provided tag keys

func RegisterLBAgentViews

func RegisterLBAgentViews(tagKeys []string, latencyDist []float64)

func RegisterRunnerViews

func RegisterRunnerViews(tagKeys []string, latencyDist []float64)

RegisterRunnerViews creates and registers all runner views

func TranslateGRPCStatusToRunnerStatus

func TranslateGRPCStatusToRunnerStatus(status *pb.RunnerStatus) *pool.RunnerStatus

TranslateGRPCStatusToRunnerStatus runner.RunnerStatus to runnerpool.RunnerStatus


type Agent

type Agent interface {
	// GetCall will return a Call that is executable by the Agent, which
	// can be built via various CallOpt's provided to the method.
	GetCall(...CallOpt) (Call, error)

	// Submit will attempt to execute a call locally, a Call may store information
	// about itself in its Start and End methods, which will be called in Submit
	// immediately before and after the Call is executed, respectively. An error
	// will be returned if there is an issue executing the call or the error
	// may be from the call's execution itself (if, say, the container dies,
	// or the call times out).
	Submit(Call) error

	// Close will wait for any outstanding calls to complete and then exit.
	// Closing the agent will invoke Close on the underlying DataAccess.
	// Close is not safe to be called from multiple threads.


Agent exposes an api to create calls from various parameters and then submit those calls, it also exposes a 'safe' shutdown mechanism via its Close method. Agent has a few roles:

* manage the memory pool for a given server
* manage the container lifecycle for calls
* execute calls against containers
* invoke Start and End for each call appropriately

Overview: Upon submission of a call, Agent will start the call's timeout timer immediately. If the call is hot, Agent will attempt to find an active hot container for that route, and if necessary launch another container. calls will be able to read/write directly from/to a socket in the container. If it's necessary to launch a container, first an attempt will be made to try to reserve the ram required while waiting for any hot 'slot' to become available [if applicable]. If there is an error launching the container, an error will be returned provided the call has not yet timed out or found another hot 'slot' to execute in [if applicable]. call.Start will be called immediately before sending any input to a container. call.End will be called regardless of the timeout timer's status if the call was executed, and that error returned may be returned from Submit.

func DefaultPureRunner

func DefaultPureRunner(cancel context.CancelFunc, addr string, tlsCfg *tls.Config) (Agent, error)

func New

func New(options ...Option) Agent

New creates an Agent that executes functions locally as Docker containers.

func NewLBAgent

func NewLBAgent(rp pool.RunnerPool, p pool.Placer, options ...LBAgentOption) (Agent, error)

NewLBAgent creates an Agent that knows how to load-balance function calls across a group of runner nodes.

func NewPureRunner

func NewPureRunner(cancel context.CancelFunc, addr string, options ...PureRunnerOption) (Agent, error)

type Call

type Call interface {
	// Model will return the underlying models.Call configuration for this call.
	Model() *models.Call

	// Start will be called before this call is executed, it may be used to
	// guarantee mutual exclusion, check docker permissions, update timestamps,
	// etc.
	// TODO Start and End can likely be unexported as they are only used in the agent,
	// and on a type which is constructed in a specific agent. meh.
	Start(ctx context.Context) error

	// End will be called immediately after attempting a call execution,
	// regardless of whether the execution failed or not. An error will be passed
	// to End, which if nil indicates a successful execution. Any error returned
	// from End will be returned as the error from Submit.
	End(ctx context.Context, err error) error

Call is an agent specific instance of a call object, that is runnable.

type CallOpt

type CallOpt func(c *call) error

CallOpt allows configuring a call before execution TODO(reed): consider the interface here, all options must be defined in agent and flexible enough for usage by extenders of fn, this straddling is painful. consider models.Call?

func FromHTTPFnRequest

func FromHTTPFnRequest(app *models.App, fn *models.Fn, req *http.Request) CallOpt

FromHTTPFnRequest Sets up a call from an http trigger request

func FromModel

func FromModel(mCall *models.Call) CallOpt

FromModel creates a call object from an existing stored call model object, reading the body from the stored call payload

func FromModelAndInput

func FromModelAndInput(mCall *models.Call, in io.ReadCloser) CallOpt

FromModelAndInput creates a call object from an existing stored call model object , reading the body from a provided stream

func InvokeDetached

func InvokeDetached() CallOpt

InvokeDetached mark a call to be a detached call

func WithContext

func WithContext(ctx context.Context) CallOpt

WithContext overrides the context on the call

func WithDockerAuth

func WithDockerAuth(auth docker.Auther) CallOpt

WithDockerAuth configures a call to retrieve credentials for an image pull

func WithExtensions

func WithExtensions(extensions map[string]string) CallOpt

WithExtensions adds internal attributes to the call that can be interpreted by extensions in the agent Pure runner can use this to pass an extension to the call

func WithLogger

func WithLogger(w io.ReadWriteCloser) CallOpt

WithLogger sets stderr to the provided one

func WithTrigger

func WithTrigger(t *models.Trigger) CallOpt

WithTrigger adds trigger specific bits to a call. TODO consider removal, this is from a shuffle

func WithWriter

func WithWriter(w io.Writer) CallOpt

WithWriter sets the writer that the call uses to send its output message to TODO this should be required

type CallOverrider

type CallOverrider func(*http.Request, *models.Call, map[string]string) (map[string]string, error)

CallOverrider should die. Interceptor in GetCall

type Config

type Config struct {
	MinDockerVersion              string        `json:"min_docker_version"`
	ContainerLabelTag             string        `json:"container_label_tag"`
	DockerNetworks                string        `json:"docker_networks"`
	DockerLoadFile                string        `json:"docker_load_file"`
	DisableUnprivilegedContainers bool          `json:"disable_unprivileged_containers"`
	FreezeIdle                    time.Duration `json:"freeze_idle_msecs"`
	HotPoll                       time.Duration `json:"hot_poll_msecs"`
	HotLauncherTimeout            time.Duration `json:"hot_launcher_timeout_msecs"`
	HotPullTimeout                time.Duration `json:"hot_pull_timeout_msecs"`
	HotStartTimeout               time.Duration `json:"hot_start_timeout_msecs"`
	DetachedHeadRoom              time.Duration `json:"detached_head_room_msecs"`
	MaxResponseSize               uint64        `json:"max_response_size_bytes"`
	MaxHdrResponseSize            uint64        `json:"max_hdr_response_size_bytes"`
	MaxLogSize                    uint64        `json:"max_log_size_bytes"`
	MaxTotalCPU                   uint64        `json:"max_total_cpu_mcpus"`
	MaxTotalMemory                uint64        `json:"max_total_memory_bytes"`
	MaxFsSize                     uint64        `json:"max_fs_size_mb"`
	MaxPIDs                       uint64        `json:"max_pids"`
	MaxOpenFiles                  *uint64       `json:"max_open_files"`
	MaxLockedMemory               *uint64       `json:"max_locked_memory"`
	MaxPendingSignals             *uint64       `json:"max_pending_signals"`
	MaxMessageQueue               *uint64       `json:"max_message_queue"`
	PreForkPoolSize               uint64        `json:"pre_fork_pool_size"`
	PreForkImage                  string        `json:"pre_fork_image"`
	PreForkCmd                    string        `json:"pre_fork_pool_cmd"`
	PreForkUseOnce                uint64        `json:"pre_fork_use_once"`
	PreForkNetworks               string        `json:"pre_fork_networks"`
	EnableNBResourceTracker       bool          `json:"enable_nb_resource_tracker"`
	MaxTmpFsInodes                uint64        `json:"max_tmpfs_inodes"`
	DisableReadOnlyRootFs         bool          `json:"disable_readonly_rootfs"`
	DisableDebugUserLogs          bool          `json:"disable_debug_user_logs"`
	IOFSEnableTmpfs               bool          `json:"iofs_enable_tmpfs"`
	EnableFDKDebugInfo            bool          `json:"enable_fdk_debug_info"`
	IOFSAgentPath                 string        `json:"iofs_path"`
	IOFSMountRoot                 string        `json:"iofs_mount_root"`
	IOFSOpts                      string        `json:"iofs_opts"`
	ImageCleanMaxSize             uint64        `json:"image_clean_max_size"`
	ImageCleanExemptTags          string        `json:"image_clean_exempt_tags"`
	ImageEnableVolume             bool          `json:"image_enable_volume"`

Config specifies various settings for an agent

func NewConfig

func NewConfig() (*Config, error)

NewConfig returns a config set from env vars, plus defaults

type ContainerState

type ContainerState interface {
	UpdateState(ctx context.Context, newState ContainerStateType, call *call)
	GetState() string

func NewContainerState

func NewContainerState() ContainerState

type ContainerStateType

type ContainerStateType int
const (
	ContainerStateNone   ContainerStateType = iota // uninitialized
	ContainerStateWait                             // resource (cpu + mem) waiting
	ContainerStateStart                            // launching
	ContainerStateIdle                             // running: idle but not paused
	ContainerStatePaused                           // running: idle but paused
	ContainerStateBusy                             // running: busy
	ContainerStateDone                             // exited/failed/done

type DataAccess

type DataAccess interface {

XXX(reed): replace all uses of ReadDataAccess with DataAccess or vice versa, whatever is easier

type DetachedResponseWriter

type DetachedResponseWriter struct {
	Headers http.Header
	// contains filtered or unexported fields

func NewDetachedResponseWriter

func NewDetachedResponseWriter(h http.Header, statusCode int) *DetachedResponseWriter

func (*DetachedResponseWriter) Header

func (w *DetachedResponseWriter) Header() http.Header

func (*DetachedResponseWriter) Status

func (w *DetachedResponseWriter) Status() int

func (*DetachedResponseWriter) Write

func (w *DetachedResponseWriter) Write(data []byte) (int, error)

func (*DetachedResponseWriter) WriteHeader

func (w *DetachedResponseWriter) WriteHeader(statusCode int)

type EvictToken

type EvictToken struct {
	C        chan struct{}
	DoneChan chan struct{}
	// contains filtered or unexported fields

func (*EvictToken) SetEvictable

func (token *EvictToken) SetEvictable(isEvictable bool)

type Evictor

type Evictor interface {
	// CreateEvictToken creates an eviction token to be used in evictor tracking. Returns
	// an eviction token.
	CreateEvictToken(slotId string, mem, cpu uint64) *EvictToken

	// DeleteEvictToken deletes an eviction token from evictor system
	DeleteEvictToken(token *EvictToken)

	// PerformEviction performs evictions to satisfy cpu & mem arguments
	// and returns a slice of channels for evictions performed. The callers
	// can wait on these channel to ensure evictions are completed.
	PerformEviction(slotId string, mem, cpu uint64) []chan struct{}

func NewEvictor

func NewEvictor() Evictor

type LBAgentOption

type LBAgentOption func(*lbAgent) error

func WithLBAgentConfig

func WithLBAgentConfig(cfg *Config) LBAgentOption

WithLBAgentConfig sets the agent config to the provided Config

func WithLBCallOptions

func WithLBCallOptions(opts ...CallOpt) LBAgentOption

WithLBCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall

func WithLBCallOverrider

func WithLBCallOverrider(fn CallOverrider) LBAgentOption

WithLBCallOverrider is for LB agents to register a CallOverrider to modify a Call and extensions

type LogStreamer

type LogStreamer interface {
	StreamLogs(runner.RunnerProtocol_StreamLogsServer) error

Log Streamer to manage log gRPC interface

type MockAgent

type MockAgent struct {

func (*MockAgent) AddCallListener

func (m *MockAgent) AddCallListener(f fnext.CallListener)

func (*MockAgent) Close

func (m *MockAgent) Close() error

func (*MockAgent) GetCall

func (m *MockAgent) GetCall(opts ...CallOpt) (Call, error)

func (*MockAgent) Submit

func (m *MockAgent) Submit(c Call) error

type Option

type Option func(*agent) error

Option configures an agent at startup

func WithCallOptions

func WithCallOptions(opts ...CallOpt) Option

WithCallOptions adds additional call options to each call created from GetCall, these options will be executed after any other options supplied to GetCall

func WithCallOverrider

func WithCallOverrider(fn CallOverrider) Option

WithCallOverrider registers register a CallOverrider to modify a Call and extensions on call construction

func WithConfig

func WithConfig(cfg *Config) Option

WithConfig sets the agent config to the provided config

func WithDockerDriver

func WithDockerDriver(drv drivers.Driver) Option

WithDockerDriver Provides a customer driver to agent

type PureRunnerOption

type PureRunnerOption func(*pureRunner) error

func PureRunnerWithAgent

func PureRunnerWithAgent(a Agent) PureRunnerOption

func PureRunnerWithConfigFunc

func PureRunnerWithConfigFunc(configFunc func(context.Context, *runner.ConfigMsg) (*runner.ConfigStatus, error)) PureRunnerOption

func PureRunnerWithCustomHealthCheckerFunc

func PureRunnerWithCustomHealthCheckerFunc(customHealthCheckerFunc func(context.Context) (map[string]string, error)) PureRunnerOption

func PureRunnerWithDetached

func PureRunnerWithDetached() PureRunnerOption

func PureRunnerWithGRPCServerOptions

func PureRunnerWithGRPCServerOptions(options ...grpc.ServerOption) PureRunnerOption

func PureRunnerWithKdumpsOnDisk

func PureRunnerWithKdumpsOnDisk(numKdumps uint64) PureRunnerOption

PureRunnerWithKdumpsOnDisk returns a PureRunnerOption that indicates that kdumps have been found on disk. The argument numKdump is a counter that indicates how many dumps were on disk at the time the runner was created.

func PureRunnerWithLogStreamer

func PureRunnerWithLogStreamer(logStreamer LogStreamer) PureRunnerOption

func PureRunnerWithSSL

func PureRunnerWithSSL(tlsCfg *tls.Config) PureRunnerOption

func PureRunnerWithStatusImage

func PureRunnerWithStatusImage(imgName string) PureRunnerOption

PureRunnerWithStatusImage returns a PureRunnerOption that annotates a PureRunner with a statusImageName attribute. This attribute names an image name to use for the status checks. Optionally, the status image can be pre-loaded into docker using FN_DOCKER_LOAD_FILE to avoid docker pull during status checks.

func PureRunnerWithStatusNetworkEnabler

func PureRunnerWithStatusNetworkEnabler(barrierPath string) PureRunnerOption

type ReadDataAccess

type ReadDataAccess interface {
	GetAppID(ctx context.Context, appName string) (string, error)
	// GetAppByID abstracts querying the datastore for an app.
	GetAppByID(ctx context.Context, appID string) (*models.App, error)
	GetTriggerBySource(ctx context.Context, appID string, triggerType, source string) (*models.Trigger, error)
	GetFnByID(ctx context.Context, fnID string) (*models.Fn, error)

ReadDataAccess represents read operations required to operate a load balancer node

func NewCachedDataAccess

func NewCachedDataAccess(da ReadDataAccess) ReadDataAccess

NewCachedDataAccess is a wrapper that caches entries temporarily

func NewMetricReadDataAccess

func NewMetricReadDataAccess(rda ReadDataAccess) ReadDataAccess

NewMetricReadDataAccess adds metrics to a ReadDataAccess

type RequestState

type RequestState interface {
	UpdateState(ctx context.Context, newState RequestStateType, slots *slotQueue)

func NewRequestState

func NewRequestState() RequestState

type RequestStateType

type RequestStateType int
const (
	RequestStateNone RequestStateType = iota // uninitialized
	RequestStateWait                         // request is waiting
	RequestStateExec                         // request is executing
	RequestStateDone                         // request is done

type ResourceToken

type ResourceToken interface {
	// Close must be called by any thread that receives a token.
	Error() error
	NeededCapacity() (uint64, models.MilliCPUs)

type ResourceTracker

type ResourceTracker interface {
	// GetResourceToken returns a resource token.
	// Memory is expected to be provided in MB units.
	GetResourceToken(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) ResourceToken

	// GetResourceTokenNB is the non-blocking equivalent of GetResourceToken. The return value is the
	// resource token itself. If the request cannot be satisfied, a token with CapacityFull error set is
	// returned.
	// Memory is expected to be provided in MB units.
	GetResourceTokenNB(ctx context.Context, memory uint64, cpuQuota models.MilliCPUs) ResourceToken

	// IsResourcePossible returns whether it's possible to fulfill the requested resources on this machine.
	// Memory is expected to be provided in MB units.
	IsResourcePossible(memory uint64, cpuQuota models.MilliCPUs) bool

	// Retrieve current stats/usage
	GetUtilization() ResourceUtilization

A simple resource (memory, cpu, disk, etc.) tracker for scheduling. TODO: disk, network IO for future

func NewResourceTracker

func NewResourceTracker(cfg *Config) ResourceTracker

type ResourceUtilization

type ResourceUtilization struct {
	// CPU in use
	CpuUsed models.MilliCPUs
	// CPU available
	CpuAvail models.MilliCPUs
	// Memory in use in bytes
	MemUsed uint64
	// Memory available in bytes
	MemAvail uint64

type Slot

type Slot interface {
	SetError(err error)
	// contains filtered or unexported methods


Path Synopsis
Package drivers is intended as a general purpose container abstraction library.
Package drivers is intended as a general purpose container abstraction library.
Package docker provides a Docker driver for Fn.
Package docker provides a Docker driver for Fn.
Package mock provides a fake Driver implementation that is only used for testing.
Package mock provides a fake Driver implementation that is only used for testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
t or T : Toggle theme light dark auto