crunchrun

package
v0.0.0-...-52d6522 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 17, 2024 License: AGPL-3.0, Apache-2.0, CC-BY-SA-3.0 Imports: 64 Imported by: 0

Documentation

Overview

Copyright (C) The Arvados Authors. All rights reserved.

SPDX-License-Identifier: AGPL-3.0

Copyright (C) The Arvados Authors. All rights reserved.

SPDX-License-Identifier: AGPL-3.0

Index

Constants

View Source
const DockerAPIVersion = "1.35"

DockerAPIVersion is the API version we use to communicate with the docker service. The oldest OS we support is Ubuntu 18.04 (bionic) which originally shipped docker 1.17.12 / API 1.35 so there is no reason to use an older API version. See https://dev.arvados.org/issues/15370#note-38 and https://docs.docker.com/engine/api/.

View Source
const (
	// MaxLogLine is the maximum length of stdout/stderr lines before they are split.
	MaxLogLine = 1 << 12
)
View Source
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"

RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.

Variables

View Source
var Command = command{}
View Source
var ErrCancelled = errors.New("Cancelled")

ErrCancelled is the error returned when the container is cancelled.

Functions

func Detach

func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int

Detach acquires a lock for the given uuid, and starts the current program as a child process (with -no-detach prepended to the given arguments so the child knows not to detach again). The lock is passed along to the child process.

Stdout and stderr in the child process are sent to the systemd journal using the systemd-cat program.

func KillProcess

func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int

KillProcess finds the crunch-run process corresponding to the given uuid, and sends the given signal to it. It then waits up to 1 second for the process to die. It returns 0 if the process is successfully killed or didn't exist in the first place.

func ListProcesses

func ListProcesses(stdin io.Reader, stdout, stderr io.Writer) int

ListProcesses lists UUIDs of active crunch-run processes.

func RFC3339Timestamp

func RFC3339Timestamp(t time.Time) string

RFC3339Timestamp formats t as RFC3339NanoFixed.

func ReadWriteLines

func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool)

ReadWriteLines reads lines from a reader and writes to a Writer, with long line splitting.

Types

type ArvLogWriter

type ArvLogWriter struct {
	ArvClient IArvadosClient
	UUID      string
	// contains filtered or unexported fields
}

ArvLogWriter is an io.WriteCloser that processes each write by writing it through to another io.WriteCloser (typically a CollectionFileWriter) and creating an Arvados log entry.

func (*ArvLogWriter) Close

func (arvlog *ArvLogWriter) Close() (err error)

Close the underlying writer

func (*ArvLogWriter) Write

func (arvlog *ArvLogWriter) Write(p []byte) (int, error)

type ConfigData

type ConfigData struct {
	Env          map[string]string
	KeepBuffers  int
	EC2SpotCheck bool
	Cluster      *arvados.Cluster
}

ConfigData contains environment variables and (when needed) cluster configuration, passed from dispatchcloud to crunch-run on stdin.

type ContainerRunner

type ContainerRunner struct {
	DispatcherArvClient  IArvadosClient
	DispatcherKeepClient IKeepClient

	ContainerArvClient  IArvadosClient
	ContainerKeepClient IKeepClient

	Container arvados.Container

	ExitCode     *int
	NewLogWriter NewLogWriter
	CrunchLog    *ThrottledLogger

	LogCollection arvados.CollectionFileSystem
	LogsPDH       *string
	RunArvMount   RunArvMount
	MkTempDir     MkTempDir
	ArvMount      *exec.Cmd
	ArvMountPoint string
	HostOutputDir string
	Volumes       map[string]struct{}
	OutputPDH     *string
	SigChan       chan os.Signal
	ArvMountExit  chan error
	SecretMounts  map[string]arvados.Mount
	MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
	// contains filtered or unexported fields
}

ContainerRunner is the main stateful struct used for a single execution of a container.

func NewContainerRunner

func NewContainerRunner(dispatcherClient *arvados.Client,
	dispatcherArvClient IArvadosClient,
	dispatcherKeepClient IKeepClient,
	containerUUID string) (*ContainerRunner, error)

NewContainerRunner creates a new container runner.

func (*ContainerRunner) ArvMountCmd

func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error)

func (*ContainerRunner) CaptureOutput

func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error

CaptureOutput saves data from the container's output directory if needed, and updates the container output accordingly.

func (*ContainerRunner) CleanupDirs

func (runner *ContainerRunner) CleanupDirs()

func (*ContainerRunner) CommitLogs

func (runner *ContainerRunner) CommitLogs() error

CommitLogs posts the collection containing the final container logs.

func (*ContainerRunner) ContainerToken

func (runner *ContainerRunner) ContainerToken() (string, error)

ContainerToken returns the api_token the container (and any arv-mount processes) are allowed to use.

func (*ContainerRunner) CreateContainer

func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error

CreateContainer creates the docker container.

func (*ContainerRunner) IsCancelled

func (runner *ContainerRunner) IsCancelled() bool

IsCancelled returns the value of Cancelled, with goroutine safety.

func (*ContainerRunner) LoadImage

func (runner *ContainerRunner) LoadImage() (string, error)

LoadImage determines the docker image id from the container record and checks if it is available in the local Docker image store. If not, it loads the image from Keep.

func (*ContainerRunner) LogContainerRecord

func (runner *ContainerRunner) LogContainerRecord() error

LogContainerRecord gets and saves the raw JSON container record from the API server

func (*ContainerRunner) LogHostInfo

func (runner *ContainerRunner) LogHostInfo() (err error)

LogHostInfo logs info about the current host, for debugging and accounting purposes. Although it's logged as "node-info", this is about the environment where crunch-run is actually running, which might differ from what's described in the node record (see LogNodeRecord).

func (*ContainerRunner) LogNodeRecord

func (runner *ContainerRunner) LogNodeRecord() error

LogNodeRecord logs the current host's InstanceType config entry (or the arvados#node record, if running via crunch-dispatch-slurm).

func (*ContainerRunner) NewArvLogWriter

func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error)

NewArvLogWriter creates an ArvLogWriter

func (*ContainerRunner) Run

func (runner *ContainerRunner) Run() (err error)

Run the full container lifecycle.

func (*ContainerRunner) SetupArvMountPoint

func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error)

func (*ContainerRunner) SetupMounts

func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error)

func (*ContainerRunner) StartContainer

func (runner *ContainerRunner) StartContainer() error

StartContainer starts the docker container created by CreateContainer.

func (*ContainerRunner) UpdateContainerFinal

func (runner *ContainerRunner) UpdateContainerFinal() error

UpdateContainerFinal updates the container record state on API server to "Complete" or "Cancelled"

func (*ContainerRunner) UpdateContainerRunning

func (runner *ContainerRunner) UpdateContainerRunning(logId string) error

UpdateContainerRunning updates the container state to "Running"

func (*ContainerRunner) WaitFinish

func (runner *ContainerRunner) WaitFinish() error

WaitFinish waits for the container to terminate, capture the exit code, and close the stdout/stderr logging.

type Gateway

type Gateway struct {
	ContainerUUID string
	// Caller should set Address to "", or "host:0" or "host:port"
	// where host is a known external IP address; port is a
	// desired port number to listen on; and ":0" chooses an
	// available dynamic port.
	//
	// If Address is "", Start() listens only on the loopback
	// interface (and changes Address to "127.0.0.1:port").
	// Otherwise it listens on all interfaces.
	//
	// If Address is "host:0", Start() updates Address to
	// "host:port".
	Address    string
	AuthSecret string
	Target     GatewayTarget
	Log        interface {
		Printf(fmt string, args ...interface{})
	}
	// If non-nil, set up a ContainerGatewayTunnel, so that the
	// controller can connect to us even if our external IP
	// address is unknown or not routable from controller.
	ArvadosClient *arvados.Client

	// When a tunnel is connected or reconnected, this func (if
	// not nil) will be called with the InternalURL of the
	// controller process at the other end of the tunnel.
	UpdateTunnelURL func(url string)

	// Source for serving WebDAV requests with
	// X-Webdav-Source: /log
	LogCollection arvados.CollectionFileSystem
	// contains filtered or unexported fields
}

func (*Gateway) ServeHTTP

func (gw *Gateway) ServeHTTP(w http.ResponseWriter, req *http.Request)

func (*Gateway) Start

func (gw *Gateway) Start() error

Start starts an http server that allows authenticated clients to open an interactive "docker exec" session and (in future) connect to tcp ports inside the docker container.

type GatewayTarget

type GatewayTarget interface {
	// Command that will execute cmd inside the container
	InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error)

	// IP address inside container
	IPAddress() (string, error)
}

type GatewayTargetStub

type GatewayTargetStub struct{}

func (GatewayTargetStub) IPAddress

func (GatewayTargetStub) IPAddress() (string, error)

func (GatewayTargetStub) InjectCommand

func (GatewayTargetStub) InjectCommand(ctx context.Context, detachKeys, username string, usingTTY bool, cmd []string) (*exec.Cmd, error)

type IArvadosClient

type IArvadosClient interface {
	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
	Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
	Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
	CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
	Discovery(key string) (interface{}, error)
}

IArvadosClient is the minimal Arvados API methods used by crunch-run.

type IKeepClient

type IKeepClient interface {
	BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
	ReadAt(locator string, p []byte, off int) (int, error)
	ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
	LocalLocator(locator string) (string, error)
	SetStorageClasses(sc []string)
}

IKeepClient is the minimal Keep API methods used by crunch-run.

type MkTempDir

type MkTempDir func(string, string) (string, error)

type NewLogWriter

type NewLogWriter func(name string) (io.WriteCloser, error)

NewLogWriter is a factory function to create a new log writer.

type PsProcess

type PsProcess interface {
	CmdlineSlice() ([]string, error)
}

type RunArvMount

type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error)

type ThrottledLogger

type ThrottledLogger struct {
	*log.Logger

	sync.Mutex

	Timestamper
	Immediate *log.Logger
	// contains filtered or unexported fields
}

ThrottledLogger accepts writes, prepends a timestamp to each line of the write, and periodically flushes to a downstream writer. It supports the "Logger" and "WriteCloser" interfaces.

func NewThrottledLogger

func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger

NewThrottledLogger creates a new thottled logger that

  • prepends timestamps to each line, and
  • batches log messages and only calls the underlying Writer at most once per "crunchLogSecondsBetweenEvents" seconds.

func (*ThrottledLogger) Close

func (tl *ThrottledLogger) Close() error

Close the flusher goroutine and wait for it to complete, then close the underlying Writer.

func (*ThrottledLogger) Write

func (tl *ThrottledLogger) Write(p []byte) (n int, err error)

Write prepends a timestamp to each line of the input data and appends to the internal buffer. Each line is also logged to tl.Immediate, if tl.Immediate is not nil.

type Timestamper

type Timestamper func(t time.Time) string

Timestamper is the signature for a function that takes a timestamp and return a formated string value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL