worker

package
v6.7.3-0...-1f455d7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2020 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Overview

Package worker will eventually evolve to becoming a concrete implementation of a runtime As such, Concourse core shouldn't depend on abstractions defined in this package or its child packages General Runtime abstractions will be ported over to the Runtime package The Client interface is the main interface that is consumed by Concourse core that will be shifted to the Runtime package

Index

Constants

View Source
const GetResourceLockInterval = 5 * time.Second

Variables

View Source
var (
	ErrNoWorkers             = errors.New("no workers")
	ErrFailedAcquirePoolLock = errors.New("failed to acquire pool lock")
)
View Source
var ErrBaseResourceTypeNotFound = errors.New("base resource type not found")
View Source
var ErrFailedToGetLock = errors.New("failed to get lock")
View Source
var ErrMissingVolume = errors.New("volume mounted to container is missing")
View Source
var GardenLimitDefault = uint64(0)
View Source
var ResourceConfigCheckSessionExpiredError = errors.New("no db container was found for owner")

Functions

func NewClient

func NewClient(pool Pool,
	provider WorkerProvider,
	compression compression.Compression,
	workerPollingInterval time.Duration,
	workerStatusPublishInterval time.Duration,
	enabledP2pStreaming bool,
	p2pStreamingTimeout time.Duration,
) *client

func NewContainerPlacementStrategy

func NewContainerPlacementStrategy(opts ContainerPlacementStrategyOptions) (*containerPlacementStrategy, error)

Types

type ArtifactDestination

type ArtifactDestination interface {
	// StreamIn is called with a destination directory and the tar stream to
	// expand into the destination directory.
	StreamIn(context.Context, string, baggageclaim.Encoding, io.Reader) error

	GetStreamInP2pUrl(ctx context.Context, path string) (string, error)
}

Destination is the inverse of Source. This interface allows the receiving end to determine the location of the data, e.g. based on a task's input configuration.

type ArtifactSource

type ArtifactSource interface {
	// ExistsOn attempts to locate a volume equivalent to this source on the
	// given worker. If a volume can be found, it will be used directly. If not,
	// `StreamTo` will be used to copy the data to the destination instead.
	ExistsOn(lager.Logger, Worker) (Volume, bool, error)
}

func NewCacheArtifactSource

func NewCacheArtifactSource(artifact runtime.CacheArtifact) ArtifactSource

type BindMountSource

type BindMountSource interface {
	VolumeOn(Worker) (garden.BindMount, bool, error)
}

type CertsVolumeMount

type CertsVolumeMount struct {
	Logger lager.Logger
}

func (*CertsVolumeMount) VolumeOn

func (s *CertsVolumeMount) VolumeOn(worker Worker) (garden.BindMount, bool, error)

type CheckResult

type CheckResult struct {
	Versions []atc.Version
}

type Container

type Container interface {
	gclient.Container
	runtime.Runner

	// TODO: get rid of this, its not used anywhere
	Destroy() error

	VolumeMounts() []VolumeMount

	// TODO: get rid of this, its not used anywhere
	WorkerName() string

	UpdateLastHijack() error
}

type ContainerLimits

type ContainerLimits struct {
	CPU    *uint64
	Memory *uint64
}

func (ContainerLimits) ToGardenLimits

func (cl ContainerLimits) ToGardenLimits() garden.Limits

type ContainerPlacementStrategy

type ContainerPlacementStrategy interface {
	//TODO: Don't pass around container metadata since it's not guaranteed to be deterministic.
	// Change this after check containers stop being reused
	Choose(lager.Logger, []Worker, ContainerSpec) (Worker, error)
	ModifiesActiveTasks() bool
}

func NewRandomPlacementStrategy

func NewRandomPlacementStrategy() ContainerPlacementStrategy

type ContainerPlacementStrategyChainNode

type ContainerPlacementStrategyChainNode interface {
	Choose(lager.Logger, []Worker, ContainerSpec) ([]Worker, error)
	ModifiesActiveTasks() bool
}

type ContainerPlacementStrategyOptions

type ContainerPlacementStrategyOptions struct {
	ContainerPlacementStrategy []string `` /* 351-byte string literal not displayed */
	MaxActiveTasksPerWorker    int      `` /* 205-byte string literal not displayed */
}

type ContainerSpec

type ContainerSpec struct {
	TeamID    int
	ImageSpec ImageSpec
	Env       []string
	Type      db.ContainerType

	// Working directory for processes run in the container.
	Dir string

	// artifacts configured as usable. The key reps the mount path of the input artifact
	// and value is the artifact itself
	ArtifactByPath map[string]runtime.Artifact

	// Inputs to provide to the container. Inputs with a volume local to the
	// selected worker will be made available via a COW volume; others will be
	// streamed.
	Inputs []InputSource

	// Outputs for which volumes should be created and mounted into the container.
	Outputs OutputPaths

	// Resource limits to be set on the container when creating in garden.
	Limits ContainerLimits

	// Local volumes to bind mount directly to the container when creating in garden.
	BindMounts []BindMountSource

	// Optional user to run processes as. Overwrites the one specified in the docker image.
	User string
}

func (*ContainerSpec) Get

func (cs *ContainerSpec) Get(key string) string

func (*ContainerSpec) Set

func (cs *ContainerSpec) Set(key string, value string)

type ErrCreatedVolumeNotFound

type ErrCreatedVolumeNotFound struct {
	Handle     string
	WorkerName string
}

func (ErrCreatedVolumeNotFound) Error

func (e ErrCreatedVolumeNotFound) Error() string

type FetchSource

type FetchSource interface {
	Find() (GetResult, Volume, bool, error)
	Create(context.Context) (GetResult, Volume, error)
}

type FetchSourceFactory

type FetchSourceFactory interface {
	NewFetchSource(
		logger lager.Logger,
		worker Worker,
		owner db.ContainerOwner,
		cache db.UsedResourceCache,
		resource resource.Resource,
		containerSpec ContainerSpec,
		processSpec runtime.ProcessSpec,
		containerMetadata db.ContainerMetadata,
	) FetchSource
}

func NewFetchSourceFactory

func NewFetchSourceFactory(
	resourceCacheFactory db.ResourceCacheFactory,
) FetchSourceFactory

type FetchedImage

type FetchedImage struct {
	Metadata   ImageMetadata
	Version    atc.Version
	URL        string
	Privileged bool
}

type Fetcher

type Fetcher interface {
	Fetch(
		ctx context.Context,
		logger lager.Logger,
		containerMetadata db.ContainerMetadata,
		gardenWorker Worker,
		containerSpec ContainerSpec,
		processSpec runtime.ProcessSpec,
		resource resource.Resource,
		owner db.ContainerOwner,
		cache db.UsedResourceCache,
		lockName string,
	) (GetResult, Volume, error)
}

func NewFetcher

func NewFetcher(
	clock clock.Clock,
	lockFactory lock.LockFactory,
	fetchSourceFactory FetchSourceFactory,
) Fetcher

type FewestBuildContainersPlacementStrategyNode

type FewestBuildContainersPlacementStrategyNode struct{}

func (*FewestBuildContainersPlacementStrategyNode) Choose

func (strategy *FewestBuildContainersPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)

func (*FewestBuildContainersPlacementStrategyNode) ModifiesActiveTasks

func (strategy *FewestBuildContainersPlacementStrategyNode) ModifiesActiveTasks() bool

type GetResult

type GetResult struct {
	ExitStatus    int
	VersionResult runtime.VersionResult
	GetArtifact   runtime.GetArtifact
}

type Image

type Image interface {
	FetchForContainer(
		ctx context.Context,
		logger lager.Logger,
		container db.CreatingContainer,
	) (FetchedImage, error)
}

type ImageFactory

type ImageFactory interface {
	GetImage(
		logger lager.Logger,
		worker Worker,
		volumeClient VolumeClient,
		imageSpec ImageSpec,
		teamID int,
	) (Image, error)
}

type ImageMetadata

type ImageMetadata struct {
	Env  []string `json:"env"`
	User string   `json:"user"`
}

type ImageSpec

type ImageSpec struct {
	ResourceType        string
	ImageURL            string
	ImageArtifactSource StreamableArtifactSource
	ImageArtifact       runtime.Artifact
	Privileged          bool
}

type InputSource

type InputSource interface {
	Source() ArtifactSource
	DestinationPath() string
}

type LimitActiveTasksPlacementStrategyNode

type LimitActiveTasksPlacementStrategyNode struct {
	// contains filtered or unexported fields
}

func (*LimitActiveTasksPlacementStrategyNode) Choose

func (strategy *LimitActiveTasksPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)

func (*LimitActiveTasksPlacementStrategyNode) ModifiesActiveTasks

func (strategy *LimitActiveTasksPlacementStrategyNode) ModifiesActiveTasks() bool

type NoCompatibleWorkersError

type NoCompatibleWorkersError struct {
	Spec WorkerSpec
}

func (NoCompatibleWorkersError) Error

func (err NoCompatibleWorkersError) Error() string

type OutputPaths

type OutputPaths map[string]string

OutputPaths is a mapping from output name to its path in the container.

type Pool

type Pool interface {
	FindOrChooseWorker(
		lager.Logger,
		WorkerSpec,
	) (Worker, error)

	ContainerInWorker(
		lager.Logger,
		db.ContainerOwner,
		WorkerSpec,
	) (bool, error)

	FindOrChooseWorkerForContainer(
		context.Context,
		lager.Logger,
		db.ContainerOwner,
		ContainerSpec,
		WorkerSpec,
		ContainerPlacementStrategy,
	) (Worker, error)
}

func NewPool

func NewPool(
	provider WorkerProvider,
) Pool

type PutResult

type PutResult struct {
	ExitStatus    int
	VersionResult runtime.VersionResult
}

type StreamableArtifactSource

type StreamableArtifactSource interface {
	ArtifactSource
	// StreamTo copies the data from the source to the destination. Note that
	// this potentially uses a lot of network transfer, for larger artifacts, as
	// the ATC will effectively act as a middleman.
	StreamTo(context.Context, ArtifactDestination) error

	// StreamFile returns the contents of a single file in the artifact source.
	// This is used for loading a task's configuration at runtime.
	StreamFile(context.Context, string) (io.ReadCloser, error)
}

Source represents data produced by the steps, that can be transferred to other steps.

func NewStreamableArtifactSource

func NewStreamableArtifactSource(
	artifact runtime.Artifact,
	volume Volume,
	compression compression.Compression,
	enabledP2pStreaming bool,
	p2pStreamingTimeout time.Duration,
) StreamableArtifactSource

type TaskResult

type TaskResult struct {
	ExitStatus   int
	VolumeMounts []VolumeMount
}

type Volume

type Volume interface {
	Handle() string
	Path() string

	SetProperty(key string, value string) error
	Properties() (baggageclaim.VolumeProperties, error)

	SetPrivileged(bool) error

	StreamIn(ctx context.Context, path string, encoding baggageclaim.Encoding, tarStream io.Reader) error
	StreamOut(ctx context.Context, path string, encoding baggageclaim.Encoding) (io.ReadCloser, error)

	GetStreamInP2pUrl(ctx context.Context, path string) (string, error)
	StreamP2pOut(ctx context.Context, path string, destUrl string, encoding baggageclaim.Encoding) error

	COWStrategy() baggageclaim.COWStrategy

	InitializeResourceCache(db.UsedResourceCache) error
	GetResourceCacheID() int
	InitializeTaskCache(logger lager.Logger, jobID int, stepName string, path string, privileged bool) error
	InitializeArtifact(name string, buildID int) (db.WorkerArtifact, error)

	CreateChildForContainer(db.CreatingContainer, string) (db.CreatingVolume, error)

	WorkerName() string
	Destroy() error
}

func NewVolume

func NewVolume(
	bcVolume baggageclaim.Volume,
	dbVolume db.CreatedVolume,
	volumeClient VolumeClient,
) Volume

type VolumeClient

type VolumeClient interface {
	FindOrCreateVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		db.CreatingContainer,
		int,
		string,
	) (Volume, error)
	FindOrCreateCOWVolumeForContainer(
		lager.Logger,
		VolumeSpec,
		db.CreatingContainer,
		Volume,
		int,
		string,
	) (Volume, error)
	FindOrCreateVolumeForBaseResourceType(
		lager.Logger,
		VolumeSpec,
		int,
		string,
	) (Volume, error)
	CreateVolume(
		lager.Logger,
		VolumeSpec,
		int,
		string,
		db.VolumeType,
	) (Volume, error)
	FindVolumeForResourceCache(
		lager.Logger,
		db.UsedResourceCache,
	) (Volume, bool, error)
	FindVolumeForTaskCache(
		logger lager.Logger,
		teamID int,
		jobID int,
		stepName string,
		path string,
	) (Volume, bool, error)
	CreateVolumeForTaskCache(
		logger lager.Logger,
		volumeSpec VolumeSpec,
		teamID int,
		jobID int,
		stepName string,
		path string,
	) (Volume, error)
	FindOrCreateVolumeForResourceCerts(
		logger lager.Logger,
	) (volume Volume, found bool, err error)

	LookupVolume(lager.Logger, string) (Volume, bool, error)
}

func NewVolumeClient

func NewVolumeClient(
	baggageclaimClient baggageclaim.Client,
	dbWorker db.Worker,
	clock clock.Clock,

	lockFactory lock.LockFactory,
	dbVolumeRepository db.VolumeRepository,
	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
	dbTaskCacheFactory db.TaskCacheFactory,
	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
) VolumeClient

type VolumeLocalityPlacementStrategyNode

type VolumeLocalityPlacementStrategyNode struct{}

func (*VolumeLocalityPlacementStrategyNode) Choose

func (strategy *VolumeLocalityPlacementStrategyNode) Choose(logger lager.Logger, workers []Worker, spec ContainerSpec) ([]Worker, error)

func (*VolumeLocalityPlacementStrategyNode) ModifiesActiveTasks

func (strategy *VolumeLocalityPlacementStrategyNode) ModifiesActiveTasks() bool

type VolumeMount

type VolumeMount struct {
	Volume    Volume
	MountPath string
}

type VolumeProperties

type VolumeProperties map[string]string

type VolumeSpec

type VolumeSpec struct {
	Strategy   baggageclaim.Strategy
	Properties VolumeProperties
	Privileged bool
	TTL        time.Duration
}

type Worker

type Worker interface {
	BuildContainers() int

	Description() string
	Name() string
	ResourceTypes() []atc.WorkerResourceType
	Tags() atc.Tags
	Uptime() time.Duration
	IsOwnedByTeam() bool
	Ephemeral() bool
	IsVersionCompatible(lager.Logger, version.Version) bool
	Satisfies(lager.Logger, WorkerSpec) bool
	FindContainerByHandle(lager.Logger, int, string) (Container, bool, error)

	FindOrCreateContainer(
		context.Context,
		lager.Logger,
		db.ContainerOwner,
		db.ContainerMetadata,
		ContainerSpec,
	) (Container, error)

	FindVolumeForResourceCache(logger lager.Logger, resourceCache db.UsedResourceCache) (Volume, bool, error)
	FindResourceCacheForVolume(volume Volume) (db.UsedResourceCache, bool, error)
	FindVolumeForTaskCache(lager.Logger, int, int, string, string) (Volume, bool, error)
	Fetch(
		context.Context,
		lager.Logger,
		db.ContainerMetadata,
		Worker,
		ContainerSpec,
		runtime.ProcessSpec,
		resource.Resource,
		db.ContainerOwner,
		db.UsedResourceCache,
		string,
	) (GetResult, Volume, error)

	CertsVolume(lager.Logger) (volume Volume, found bool, err error)
	LookupVolume(lager.Logger, string) (Volume, bool, error)
	CreateVolume(logger lager.Logger, spec VolumeSpec, teamID int, volumeType db.VolumeType) (Volume, error)

	GardenClient() gclient.Client
	ActiveTasks() (int, error)
	IncreaseActiveTasks() error
	DecreaseActiveTasks() error
}

func NewGardenWorker

func NewGardenWorker(
	gardenClient gclient.Client,
	volumeRepository db.VolumeRepository,
	volumeClient VolumeClient,
	imageFactory ImageFactory,
	fetcher Fetcher,
	dbTeamFactory db.TeamFactory,
	dbWorker db.Worker,
	resourceCacheFactory db.ResourceCacheFactory,
	numBuildContainers int,

) Worker

NewGardenWorker constructs a Worker using the gardenWorker runtime implementation and allows container and volume creation on a specific Garden worker. A Garden Worker is comprised of: db.Worker, garden Client, container provider, and a volume client

type WorkerProvider

type WorkerProvider interface {
	RunningWorkers(lager.Logger) ([]Worker, error)

	FindWorkerForContainer(
		logger lager.Logger,
		teamID int,
		handle string,
	) (Worker, bool, error)

	FindWorkerForVolume(
		logger lager.Logger,
		teamID int,
		handle string,
	) (Worker, bool, error)

	FindWorkersForContainerByOwner(
		logger lager.Logger,
		owner db.ContainerOwner,
	) ([]Worker, error)

	NewGardenWorker(
		logger lager.Logger,
		savedWorker db.Worker,
		numBuildWorkers int,
	) Worker
}

func NewDBWorkerProvider

func NewDBWorkerProvider(
	lockFactory lock.LockFactory,
	retryBackOffFactory retryhttp.BackOffFactory,
	fetcher Fetcher,
	imageFactory ImageFactory,
	dbResourceCacheFactory db.ResourceCacheFactory,
	dbResourceConfigFactory db.ResourceConfigFactory,
	dbWorkerBaseResourceTypeFactory db.WorkerBaseResourceTypeFactory,
	dbTaskCacheFactory db.TaskCacheFactory,
	dbWorkerTaskCacheFactory db.WorkerTaskCacheFactory,
	dbVolumeRepository db.VolumeRepository,
	dbTeamFactory db.TeamFactory,
	workerFactory db.WorkerFactory,
	workerVersion version.Version,
	baggageclaimResponseHeaderTimeout, gardenRequestTimeout time.Duration,
) WorkerProvider

type WorkerSpec

type WorkerSpec struct {
	Platform     string
	ResourceType string
	Tags         []string
	TeamID       int
}

func (WorkerSpec) Description

func (spec WorkerSpec) Description() string

Directories

Path Synopsis
connection/connectionfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
gclientfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
transportfakes
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.
Code generated by counterfeiter.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL