Package sink provides a server for aggregating test results and sending them to the ResultDB backend.



View Source
const (
	// AuthTokenKey is the key of the HTTP request header where the auth token should
	// be specified. Note that the an HTTP request must have the auth token in the header
	// with the following format.
	// Authorization: ResultSink <auth_token>
	AuthTokenKey = "Authorization"
	// AuthTokenPrefix is embedded into the value of the Authorization HTTP request header,
	// where the auth_token must be present. For the details about the value format of
	// the Authoization HTTP request header, find the description of `AuthTokenKey`.
	AuthTokenPrefix = "ResultSink"

	// DefaultArtChannelMaxLeases is the default value of ServerConfig.ArtChannelMaxLeases
	DefaultArtChannelMaxLeases = 16

	// DefaultTestResultChannelMaxLeases is the default value of ServerConfig.TestResultChannelMaxLeases
	DefaultTestResultChannelMaxLeases = 4


View Source
var ErrCloseBeforeStart error = errors.Reason("the server is not started yet").Err()

ErrCloseBeforeStart is returned by Close(), when it was invoked before the server started.


func Run

func Run(ctx context.Context, cfg ServerConfig, callback func(context.Context, ServerConfig) error) (err error)

Run starts a server and runs callback in a context where the server is running.

The context passed to callback will be cancelled if the server has stopped due to critical errors or Close being invoked. The context also has the server's information exported into it. If callback finishes, Run will stop the server and return the error callback returned.


type Server

type Server struct {
	// contains filtered or unexported fields

Server contains state relevant to the server itself. It should always be created by a call to NewServer. After a call to Serve(), Server will accept connections on its Port and gather test results to send to its Recorder.

func NewServer

func NewServer(ctx context.Context, cfg ServerConfig) (*Server, error)

NewServer creates a Server value and populates optional values with defaults.

func (*Server) Close

func (s *Server) Close(ctx context.Context) (err error)

Close stops the server.

Server processes sinkRequests asynchronously, and Close doesn't guarantee completion of all the ongoing requests. After Close returns, wait for Done to be closed to ensure that all the pending results have been uploaded.

It's recommended to use Shutdown() instead of Close with Done.

func (*Server) Config

func (s *Server) Config() ServerConfig

Config retrieves the ServerConfig of a previously created Server.

Use this to retrieve the resolved values of unset optional fields in the original ServerConfig.

func (*Server) Done

func (s *Server) Done() <-chan struct{}

Done returns a channel that is closed when the server terminated and finished processing all the ongoing requests.

func (*Server) Err

func (s *Server) Err() error

Err returns a server error, explaining the reason of the sink server closed.

If Done is not yet closed, Err returns nil. If Done is closed, Err returns a non-nil error explaining why.

func (*Server) Export

func (s *Server) Export(ctx context.Context) context.Context

Export exports lucictx.ResultSink derived from the server configuration into the context.

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) (err error)

Shutdown gracefully shuts down the server without interrupting any ongoing requests.

Shutdown works by first closing the listener for incoming requests, and then waiting for all the ongoing requests to be processed and pending results to be uploaded. If the provided context expires before the shutdown is complete, Shutdown returns the context's error.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start runs the server.

On success, Start will return nil, and a subsequent error can be obtained from Err.

type ServerConfig

type ServerConfig struct {
	// Recorder is the gRPC client to the Recorder service exposed by ResultDB.
	Recorder pb.RecorderClient
	// ArtifactStreamClient is an HTTP client to be used for streaming artifacts larger than
	// MaxBatchableArtifactSize.
	ArtifactStreamClient *http.Client
	// ArtifactStreamHost is the hostname of an ResultDB service instance to which
	// artifacts are streamed.
	ArtifactStreamHost string

	// AuthToken is a secret token to expect from clients. If it is "" then it
	// will be randomly generated in a secure way.
	AuthToken string
	// Address is the HTTP address to listen on.
	// If empty, the server will use "localhost" with a random port available at
	// the time of Server.Start call, and the generated address can be found at
	// Server.Config().Address.
	Address string

	// Invocation is the name of the invocation that test results should append to.
	Invocation string

	// UpdateToken is the token that allows writes to Invocation.
	UpdateToken string

	// TestIDPrefix will be prepended to the test_id of each TestResult.
	TestIDPrefix string

	// BaseVariant will be added to the variant of each TestResult. If there are duplicate
	// keys, the variant value given by the test command always wins.
	BaseVariant *pb.Variant

	// TestLocationBase will be prepended to the Location.FileName of each TestResult.
	TestLocationBase string

	// ArtChannelMaxLeases specifies that the max lease of the Artifact upload channel.
	ArtChannelMaxLeases uint

	// TestResultChannelMaxLeases specifies that the max lease of the TestResult upload channel.
	TestResultChannelMaxLeases uint

	// BaseTags will be added to each TestResult in addition to the original tags that
	// the tests were reported with.
	BaseTags []*pb.StringPair

	// CoerceNegativeDuration specifies whether ReportTestResults() should coerece
	// the negative duration of a test result.
	// If true, the API will coerce negative durations to 0.
	// If false, the API will return an error for negative durations.
	CoerceNegativeDuration bool

	// LocationTags is a map from a directory to the tags of this directory.
	// For each test with test location, look for the location's directory (or
	// ancestor directory) in the map and append the directory's tags to the
	// test results' tags.
	LocationTags *sinkpb.LocationTags

	// MaxBatchableArtifactSize is the maximum size of an artifact that can be uploaded
	// in a batch.
	// Artifacts smaller or equal to this size will be uploaded in a batch, whereas
	// greater artifacts will be uploaded in a stream manner.
	// Must be < 10MiB, and NewServer panics, otherwise.
	MaxBatchableArtifactSize int64

	// ExonerateUnexpectedPass is a flag to control if an unexpected pass should
	// be exonerated.
	ExonerateUnexpectedPass bool
	// contains filtered or unexported fields

ServerConfig defines the parameters of the server.

func (*ServerConfig) Validate

func (c *ServerConfig) Validate() error

Validate validates all the config fields.


Path Synopsis