Package sink provides a server for aggregating test results and sending them to the ResultDB backend.



View Source
const (
	// DefaultAddr is the TCP address that the Server listens on by default.
	DefaultAddr = "localhost:62115"
	// AuthTokenKey is the key of the HTTP request header where the auth token should
	// be specified. Note that the an HTTP request must have the auth token in the header
	// with the following format.
	// Authorization: ResultSink <auth_token>
	AuthTokenKey = "Authorization"
	// AuthTokenPrefix is embedded into the value of the Authorization HTTP request header,
	// where the auth_token must be present. For the details about the value format of
	// the Authoization HTTP request header, find the description of `AuthTokenKey`.
	AuthTokenPrefix = "ResultSink"

	// DefaultArtChannelMaxLeases is the default value of ServerConfig.ArtChannelMaxLeases
	DefaultArtChannelMaxLeases = 16

	// DefaultTestResultChannelMaxLeases is the default value of ServerConfig.TestResultChannelMaxLeases
	DefaultTestResultChannelMaxLeases = 4
View Source
const DefaultMaxInMemoryFileSize = 64 * 1024

DefaultMaxInMemoryFileSize is the default value for ArtifactUploader.MaxInMemoryFileSize.


View Source
var ErrCloseBeforeStart error = errors.Reason("the server is not started yet").Err()

ErrCloseBeforeStart is returned by Close(), when it was invoked before the server started.


func Run

func Run(ctx context.Context, cfg ServerConfig, callback func(context.Context, ServerConfig) error) (err error)

Run starts a server and runs callback in a context where the server is running.

The context passed to callback will be cancelled if the server has stopped due to critical errors or Close being invoked. The context also has the server's information exported into it. If callback finishes, Run will stop the server and return the error callback returned.


type ArtifactUploader

type ArtifactUploader struct {
	// Client is an HTTP client used for uploading artifacts to ResultDB.
	Client *http.Client
	// Host is the host of a ResultDB instance to upload artifacts to.
	Host string

	// MaxInMemoryFileSize is the maximum size of an artifact file that can be loaded into
	// memory.
	// ArtifactUploader reads the entire contents of a file twice; one read for calculating
	// the hash and another read for transmitting the contents over network. If the size
	// is equal to or smaller than MaxInMemoryFileSize, ArtifactUploader loads the entire
	// contents into a memory buffer to minimize disk I/O.
	// If this field is set with 0, DefaultMaxInMemoryFileSize is used.
	MaxInMemoryFileSize int64

ArtifactUploader provides functions for uploading artifacts to ResultDB.

func (*ArtifactUploader) Upload

func (u *ArtifactUploader) Upload(ctx context.Context, name, contentType string, contents []byte, updateToken string) error

Upload uploads an artifact from a given slice of bytes.

`name` is the artifact name, which uniquely identifies the artifact globally. It must conform to the format described in the name property of message Artifact at package provides utility functions to generate an Artifact name for test-result-level and invocation-level artifacts. - -

func (*ArtifactUploader) UploadFromFile

func (u *ArtifactUploader) UploadFromFile(ctx context.Context, name, contentType, path, updateToken string) error

UploadFromFile uploads an artifact from a given file path.

`name` is the artifact name, which uniquely identifies the artifact globally. It must conform to the format described in the name property of message Artifact at

type Server

type Server struct {
	// contains filtered or unexported fields

Server contains state relevant to the server itself. It should always be created by a call to NewServer. After a call to Serve(), Server will accept connections on its Port and gather test results to send to its Recorder.

func NewServer

func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error)

NewServer creates a Server value and populates optional values with defaults.

func (*Server) Close

func (s *Server) Close(ctx context.Context) (err error)

Close stops the server.

Server processes sinkRequests asynchronously, and Close doesn't guarantee completion of all the ongoing requests. After Close returns, wait for Done to be closed to ensure that all the pending results have been uploaded.

It's recommended to use Shutdown() instead of Close with Done.

func (*Server) Config

func (s *Server) Config() ServerConfig

Config retrieves the ServerConfig of a previously created Server.

Use this to retrieve the resolved values of unset optional fields in the original ServerConfig.

func (*Server) Done

func (s *Server) Done() <-chan struct{}

Done returns a channel that is closed when the server terminated and finished processing all the ongoing requests.

func (*Server) Err

func (s *Server) Err() error

Err returns a server error, explaining the reason of the sink server closed.

If Done is not yet closed, Err returns nil. If Done is closed, Err returns a non-nil error explaining why.

func (*Server) Export

func (s *Server) Export(ctx context.Context) context.Context

Export exports lucictx.ResultSink derived from the server configuration into the context.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) (err error)

Shutdown gracefully shuts down the server without interrupting any ongoing requests.

Shutdown works by first closing the listener for incoming requests, and then waiting for all the ongoing requests to be processed and pending results to be uploaded. If the provided context expires before the shutdown is complete, Shutdown returns the context's error.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start runs the server.

On success, Start will return nil, and a subsequent error can be obtained from Err.

type ServerConfig

type ServerConfig struct {
	// Recorder is the gRPC client to the Recorder service exposed by ResultDB.
	Recorder pb.RecorderClient
	// ArtifactUploader implements methods for uploading artifacts to ResultDB.
	ArtifactUploader *ArtifactUploader

	// AuthToken is a secret token to expect from clients. If it is "" then it
	// will be randomly generated in a secure way.
	AuthToken string
	// Address is the HTTP address to listen on. If empty, the server will use
	// the default address.
	Address string

	// Invocation is the name of the invocation that test results should append to.
	Invocation string

	// UpdateToken is the token that allows writes to Invocation.
	UpdateToken string

	// TestIDPrefix will be prepended to the test_id of each TestResult.
	TestIDPrefix string

	// BaseVariant will be added to the variant of each TestResult. If there are duplicate
	// keys, the variant value given by the test command always wins.
	BaseVariant *pb.Variant

	// TestLocationBase will be prepended to the Location.FileName of each TestResult.
	TestLocationBase string

	// ArtChannelMaxLeases specifies that the max lease of the Artifact upload channel.
	ArtChannelMaxLeases uint

	// TestResultChannelMaxLeases specifies that the max lease of the TestResult upload channel.
	TestResultChannelMaxLeases uint

	// BaseTags will be added to each TestResult in addition to the original tags that
	// the tests were reported with.
	BaseTags []*pb.StringPair

	// CoerceNegativeDuration specifies whether ReportTestResults() should coerece
	// the negative duration of a test result.
	// If true, the API will coerce negative durations to 0.
	// If false, the API will return an error for negative durations.
	CoerceNegativeDuration bool

	// LocationTags is a map from a directory to the tags of this directory.
	// For each test with test location, look for the location's directory (or
	// ancestor directory) in the map and append the directory's tags to the
	// test results' tags.
	LocationTags *sinkpb.LocationTags
	// contains filtered or unexported fields

ServerConfig defines the parameters of the server.

func (*ServerConfig) Validate

func (c *ServerConfig) Validate() error

Validate validates all the config fields.


Path Synopsis