crunchrun

package
v0.0.0-...-c92af4d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2021 License: AGPL-3.0, Apache-2.0, CC-BY-SA-3.0 Imports: 53 Imported by: 0

Documentation

Overview

Copyright (C) The Arvados Authors. All rights reserved.

SPDX-License-Identifier: AGPL-3.0

Copyright (C) The Arvados Authors. All rights reserved.

SPDX-License-Identifier: AGPL-3.0

Index

Constants

View Source
const (
	// MaxLogLine is the maximum length of stdout/stderr lines before they are split.
	MaxLogLine = 1 << 12
)
View Source
const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"

RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.

Variables

View Source
var Command = command{}
View Source
var ErrCancelled = errors.New("Cancelled")

ErrCancelled is the error returned when the container is cancelled.

Functions

func Detach

func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int

Detach acquires a lock for the given uuid, and starts the current program as a child process (with -no-detach prepended to the given arguments so the child knows not to detach again). The lock is passed along to the child process.

Stdout and stderr in the child process are sent to the systemd journal using the systemd-cat program.

func KillProcess

func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int

KillProcess finds the crunch-run process corresponding to the given uuid, and sends the given signal to it. It then waits up to 1 second for the process to die. It returns 0 if the process is successfully killed or didn't exist in the first place.

func ListProcesses

func ListProcesses(stdout, stderr io.Writer) int

ListProcesses lists UUIDs of active crunch-run processes.

func RFC3339Timestamp

func RFC3339Timestamp(t time.Time) string

RFC3339Timestamp formats t as RFC3339NanoFixed.

func ReadWriteLines

func ReadWriteLines(in io.Reader, writer io.Writer, done chan<- bool)

ReadWriteLines reads lines from a reader and writes to a Writer, with long line splitting.

Types

type ArvLogWriter

type ArvLogWriter struct {
	ArvClient IArvadosClient
	UUID      string
	// contains filtered or unexported fields
}

ArvLogWriter is an io.WriteCloser that processes each write by writing it through to another io.WriteCloser (typically a CollectionFileWriter) and creating an Arvados log entry.

func (*ArvLogWriter) Close

func (arvlog *ArvLogWriter) Close() (err error)

Close the underlying writer

func (*ArvLogWriter) Write

func (arvlog *ArvLogWriter) Write(p []byte) (int, error)

type ContainerRunner

type ContainerRunner struct {
	DispatcherArvClient  IArvadosClient
	DispatcherKeepClient IKeepClient

	ContainerArvClient  IArvadosClient
	ContainerKeepClient IKeepClient

	Container arvados.Container

	ExitCode     *int
	NewLogWriter NewLogWriter
	CrunchLog    *ThrottledLogger
	Stdout       io.WriteCloser
	Stderr       io.WriteCloser

	LogCollection arvados.CollectionFileSystem
	LogsPDH       *string
	RunArvMount   RunArvMount
	MkTempDir     MkTempDir
	ArvMount      *exec.Cmd
	ArvMountPoint string
	HostOutputDir string
	Volumes       map[string]struct{}
	OutputPDH     *string
	SigChan       chan os.Signal
	ArvMountExit  chan error
	SecretMounts  map[string]arvados.Mount
	MkArvClient   func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
	// contains filtered or unexported fields
}

ContainerRunner is the main stateful struct used for a single execution of a container.

func NewContainerRunner

func NewContainerRunner(dispatcherClient *arvados.Client,
	dispatcherArvClient IArvadosClient,
	dispatcherKeepClient IKeepClient,
	containerUUID string) (*ContainerRunner, error)

NewContainerRunner creates a new container runner.

func (*ContainerRunner) ArvMountCmd

func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error)

func (*ContainerRunner) CaptureOutput

func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error

CaptureOutput saves data from the container's output directory if needed, and updates the container output accordingly.

func (*ContainerRunner) CleanupDirs

func (runner *ContainerRunner) CleanupDirs()

func (*ContainerRunner) CommitLogs

func (runner *ContainerRunner) CommitLogs() error

CommitLogs posts the collection containing the final container logs.

func (*ContainerRunner) ContainerToken

func (runner *ContainerRunner) ContainerToken() (string, error)

ContainerToken returns the api_token the container (and any arv-mount processes) are allowed to use.

func (*ContainerRunner) CreateContainer

func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error

CreateContainer creates the docker container.

func (*ContainerRunner) IsCancelled

func (runner *ContainerRunner) IsCancelled() bool

IsCancelled returns the value of Cancelled, with goroutine safety.

func (*ContainerRunner) LoadImage

func (runner *ContainerRunner) LoadImage() (string, error)

LoadImage determines the docker image id from the container record and checks if it is available in the local Docker image store. If not, it loads the image from Keep.

func (*ContainerRunner) LogContainerRecord

func (runner *ContainerRunner) LogContainerRecord() error

LogContainerRecord gets and saves the raw JSON container record from the API server

func (*ContainerRunner) LogHostInfo

func (runner *ContainerRunner) LogHostInfo() (err error)

LogHostInfo logs info about the current host, for debugging and accounting purposes. Although it's logged as "node-info", this is about the environment where crunch-run is actually running, which might differ from what's described in the node record (see LogNodeRecord).

func (*ContainerRunner) LogNodeRecord

func (runner *ContainerRunner) LogNodeRecord() error

LogNodeRecord logs the current host's InstanceType config entry (or the arvados#node record, if running via crunch-dispatch-slurm).

func (*ContainerRunner) NewArvLogWriter

func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error)

NewArvLogWriter creates an ArvLogWriter

func (*ContainerRunner) Run

func (runner *ContainerRunner) Run() (err error)

Run the full container lifecycle.

func (*ContainerRunner) SetupArvMountPoint

func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error)

func (*ContainerRunner) SetupMounts

func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error)

func (*ContainerRunner) StartContainer

func (runner *ContainerRunner) StartContainer() error

StartContainer starts the docker container created by CreateContainer.

func (*ContainerRunner) UpdateContainerFinal

func (runner *ContainerRunner) UpdateContainerFinal() error

UpdateContainerFinal updates the container record state on API server to "Complete" or "Cancelled"

func (*ContainerRunner) UpdateContainerRunning

func (runner *ContainerRunner) UpdateContainerRunning() error

UpdateContainerRunning updates the container state to "Running"

func (*ContainerRunner) WaitFinish

func (runner *ContainerRunner) WaitFinish() error

WaitFinish waits for the container to terminate, capture the exit code, and close the stdout/stderr logging.

type Gateway

type Gateway struct {
	DockerContainerID *string
	ContainerUUID     string
	Address           string // listen host:port; if port=0, Start() will change it to the selected port
	AuthSecret        string
	Log               interface {
		Printf(fmt string, args ...interface{})
	}
	// return local ip address of running container, or "" if not available
	ContainerIPAddress func() (string, error)
	// contains filtered or unexported fields
}

func (*Gateway) Start

func (gw *Gateway) Start() error

Start starts an http server that allows authenticated clients to open an interactive "docker exec" session and (in future) connect to tcp ports inside the docker container.

type IArvadosClient

type IArvadosClient interface {
	Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
	Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
	Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error
	Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) error
	CallRaw(method string, resourceType string, uuid string, action string, parameters arvadosclient.Dict) (reader io.ReadCloser, err error)
	Discovery(key string) (interface{}, error)
}

IArvadosClient is the minimal Arvados API methods used by crunch-run.

type IKeepClient

type IKeepClient interface {
	PutB(buf []byte) (string, int, error)
	ReadAt(locator string, p []byte, off int) (int, error)
	ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
	LocalLocator(locator string) (string, error)
	ClearBlockCache()
}

IKeepClient is the minimal Keep API methods used by crunch-run.

type MkTempDir

type MkTempDir func(string, string) (string, error)

type NewLogWriter

type NewLogWriter func(name string) (io.WriteCloser, error)

NewLogWriter is a factory function to create a new log writer.

type PsProcess

type PsProcess interface {
	CmdlineSlice() ([]string, error)
}

type RunArvMount

type RunArvMount func(args []string, tok string) (*exec.Cmd, error)

type ThrottledLogger

type ThrottledLogger struct {
	*log.Logger

	sync.Mutex

	Timestamper
	Immediate *log.Logger
	// contains filtered or unexported fields
}

ThrottledLogger accepts writes, prepends a timestamp to each line of the write, and periodically flushes to a downstream writer. It supports the "Logger" and "WriteCloser" interfaces.

func NewThrottledLogger

func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger

NewThrottledLogger creates a new thottled logger that (a) prepends timestamps to each line (b) batches log messages and only calls the underlying Writer

at most once per "crunchLogSecondsBetweenEvents" seconds.

func (*ThrottledLogger) Close

func (tl *ThrottledLogger) Close() error

Close the flusher goroutine and wait for it to complete, then close the underlying Writer.

func (*ThrottledLogger) Write

func (tl *ThrottledLogger) Write(p []byte) (n int, err error)

Write prepends a timestamp to each line of the input data and appends to the internal buffer. Each line is also logged to tl.Immediate, if tl.Immediate is not nil.

type Timestamper

type Timestamper func(t time.Time) string

Timestamper is the signature for a function that takes a timestamp and return a formated string value.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL