Documentation ¶
Overview ¶
Package ppsutil contains utilities for various PPS-related tasks, which are shared by both the PPS API and the worker binary. These utilities include: - Getting the RC name and querying k8s reguarding pipelines - Reading and writing pipeline resource requests and limits - Reading and writing PipelineInfos[1]
[1] Note that PipelineInfo in particular is complicated because it contains fields that are not always set or are stored in multiple places. The 'Details' field is not stored in the database and must be fetched from the PFS spec commit, and a few fields in 'Details' may depend on current kubernetes state.
Index ¶
- func ContainsS3Inputs(in *pps.Input) bool
- func CrashingPipeline(ctx context.Context, db *pachsql.DB, ...) error
- func ErrorState(s pps.PipelineState) bool
- func FailPipeline(ctx context.Context, db *pachsql.DB, ...) error
- func FilterLogLines(request *pps.GetLogsRequest, r io.Reader, plainText bool, ...) error
- func FindPipelineSpecCommit(ctx context.Context, pfsServer pfsServer.APIServer, ...) (*pfs.Commit, error)
- func FindPipelineSpecCommitInTransaction(txnCtx *txncontext.TransactionContext, pfsServer pfsServer.APIServer, ...) (*pfs.Commit, error)
- func FinishJob(pachClient *client.APIClient, jobInfo *pps.JobInfo, state pps.JobState, ...) (retErr error)
- func GetLimitsResourceList(ctx context.Context, limits *pps.ResourceSpec) (*v1.ResourceList, error)
- func GetRequestsResourceListFromPipeline(ctx context.Context, pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
- func GetWorkerPipelineInfo(pachClient *client.APIClient, db *pachsql.DB, l col.PostgresListener, ...) (*pps.PipelineInfo, error)
- func JobInput(pipelineInfo *pps.PipelineInfo, outputCommit *pfs.Commit) *pps.Input
- func ListPipelineInfo(ctx context.Context, pipelines col.PostgresCollection, filter *pps.Pipeline, ...) error
- func MetaCommit(commit *pfs.Commit) *pfs.Commit
- func PipelineRcName(pi *pps.PipelineInfo) string
- func PipelineReqFromInfo(pipelineInfo *pps.PipelineInfo) *pps.CreatePipelineRequest
- func SetPipelineState(ctx context.Context, db *pachsql.DB, ...) (retErr error)
- func SidecarS3GatewayService(pipeline *pps.Pipeline, commitSetId string) string
- func UpdateJobState(pipelines col.PostgresReadWriteCollection, jobs col.ReadWriteCollection, ...) error
- func WriteJobInfo(pachClient *client.APIClient, jobInfo *pps.JobInfo) error
- type PipelineManifestReader
- type PipelineTransitionError
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContainsS3Inputs ¶
ContainsS3Inputs returns 'true' if 'in' is or contains any PFS inputs with 'S3' set to true. Any pipelines with s3 inputs lj
func CrashingPipeline ¶
func CrashingPipeline(ctx context.Context, db *pachsql.DB, pipelinesCollection col.PostgresCollection, specCommit *pfs.Commit, reason string) error
CrashingPipeline updates the pipeline's state to crashing and sets the reason
func ErrorState ¶
func ErrorState(s pps.PipelineState) bool
ErrorState returns true if s is an error state for a pipeline, that is, a state that users should be aware of and one which will have a "Reason" set for why it's in this state.
func FailPipeline ¶
func FailPipeline(ctx context.Context, db *pachsql.DB, pipelinesCollection col.PostgresCollection, specCommit *pfs.Commit, reason string) error
FailPipeline updates the pipeline's state to failed and sets the failure reason
func FilterLogLines ¶
func FilterLogLines(request *pps.GetLogsRequest, r io.Reader, plainText bool, send func(*pps.LogMessage) error) error
func FindPipelineSpecCommit ¶
func FindPipelineSpecCommitInTransaction ¶
func FindPipelineSpecCommitInTransaction(txnCtx *txncontext.TransactionContext, pfsServer pfsServer.APIServer, pipeline *pps.Pipeline, startID string) (*pfs.Commit, error)
FindPipelineSpecCommitInTransaction finds the spec commit corresponding to the pipeline version present in the commit given by startID. If startID is blank, find the current pipeline version
func GetLimitsResourceList ¶
func GetLimitsResourceList(ctx context.Context, limits *pps.ResourceSpec) (*v1.ResourceList, error)
GetLimitsResourceList returns a list of resources from a pipeline ResourceSpec that it is maximally limited to.
func GetRequestsResourceListFromPipeline ¶
func GetRequestsResourceListFromPipeline(ctx context.Context, pipelineInfo *pps.PipelineInfo) (*v1.ResourceList, error)
GetRequestsResourceListFromPipeline returns a list of resources that the pipeline, minimally requires.
func GetWorkerPipelineInfo ¶
func GetWorkerPipelineInfo(pachClient *client.APIClient, db *pachsql.DB, l col.PostgresListener, pipeline *pps.Pipeline, specCommitID string) (*pps.PipelineInfo, error)
GetWorkerPipelineInfo gets the PipelineInfo proto describing the pipeline that this worker is part of. getPipelineInfo has the side effect of adding auth to the passed pachClient
func ListPipelineInfo ¶
func ListPipelineInfo(ctx context.Context, pipelines col.PostgresCollection, filter *pps.Pipeline, history int64, f func(*pps.PipelineInfo) error) error
ListPipelineInfo calls f on each pipeline in the database matching filter (on all pipelines, if filter is nil).
func PipelineRcName ¶
func PipelineRcName(pi *pps.PipelineInfo) string
PipelineRcName generates the name of the k8s replication controller that manages a pipeline's workers
func PipelineReqFromInfo ¶
func PipelineReqFromInfo(pipelineInfo *pps.PipelineInfo) *pps.CreatePipelineRequest
PipelineReqFromInfo converts a PipelineInfo into a CreatePipelineRequest.
func SetPipelineState ¶
func SetPipelineState(ctx context.Context, db *pachsql.DB, pipelinesCollection col.PostgresCollection, specCommit *pfs.Commit, from []pps.PipelineState, to pps.PipelineState, reason string) (retErr error)
SetPipelineState is a helper that moves the state of 'pipeline' from any of the states in 'from' (if not nil) to 'to'. It will annotate any trace in 'ctx' with information about 'pipeline' that it reads.
This function logs a lot for a library function, but it's mostly (maybe exclusively?) called by the PPS master
func SidecarS3GatewayService ¶
SidecarS3GatewayService returns the name of the kubernetes service created for the job 'jobID' to hand sidecar s3 gateway requests. This helper is in ppsutil because both PPS (which creates the service, in the s3 gateway sidecar server) and the worker (which passes the endpoint to the user code) need to know it.
func UpdateJobState ¶
func UpdateJobState(pipelines col.PostgresReadWriteCollection, jobs col.ReadWriteCollection, jobInfo *pps.JobInfo, state pps.JobState, reason string) error
UpdateJobState performs the operations involved with a job state transition.
Types ¶
type PipelineManifestReader ¶
type PipelineManifestReader struct {
// contains filtered or unexported fields
}
PipelineManifestReader helps with unmarshalling pipeline configs from JSON. It's used by 'create pipeline' and 'update pipeline'
func NewPipelineManifestReader ¶
func NewPipelineManifestReader(pipelineBytes []byte) (result *PipelineManifestReader, retErr error)
NewPipelineManifestReader creates a new manifest reader from a path.
func (*PipelineManifestReader) NextCreatePipelineRequest ¶
func (r *PipelineManifestReader) NextCreatePipelineRequest() (*ppsclient.CreatePipelineRequest, error)
NextCreatePipelineRequest gets the next request from the manifest reader.
type PipelineTransitionError ¶
type PipelineTransitionError struct { Pipeline *pps.Pipeline Expected []pps.PipelineState Target, Current pps.PipelineState }
PipelineTransitionError represents an error transitioning a pipeline from one state to another.
func (PipelineTransitionError) Error ¶
func (p PipelineTransitionError) Error() string