Documentation ¶
Index ¶
- Constants
- func CrashingMonitorSideEffect(toggle sideEffectToggle) sideEffect
- func FinishCommitsSideEffect() sideEffect
- func GetBackendSecretVolumeAndMount() (v1.Volume, v1.VolumeMount)
- func NewAPIServer(env Env) (ppsiface.APIServer, error)
- func NewAPIServerNoMaster(env Env) (ppsiface.APIServer, error)
- func NewSidecarAPIServer(env Env, namespace string, workerGrpcPort uint16, peerPort uint16) (*apiServer, error)
- func ParseLokiLine(inputLine string, msg *pps.LogMessage) error
- func PipelineMonitorSideEffect(toggle sideEffectToggle) sideEffect
- func RepoNameToEnvString(repoName string) string
- func ResourcesSideEffect(toggle sideEffectToggle) sideEffect
- func RestartSideEffect() sideEffect
- func ScaleWorkersSideEffect(toggle sideEffectToggle) sideEffect
- type APIServer
- type Env
- type InfraDriver
- type JobFinisher
- type JobStopper
- type PipelineStateDriver
- type Propagater
- type Worker
- type WorkerEnv
Constants ¶
const ( // DefaultUserImage is the image used for jobs when the user does not specify // an image. DefaultUserImage = "ubuntu:20.04" // DefaultDatumTries is the default number of times a datum will be tried // before we give up and consider the job failed. DefaultDatumTries = 3 // DefaultLogsFrom is the default duration to return logs from, i.e. by // default we return logs from up to 24 hours ago. DefaultLogsFrom = time.Hour * 24 // DefaultDatumBatchSize is the default number of datums to return to the client // per CreateDatum request DefaultDatumBatchSize = 100 )
const ( // WorkerServiceAccountEnvVar is the name of the environment variable used to tell pachd // what service account to assign to new worker RCs, for the purpose of // creating S3 gateway services. WorkerServiceAccountEnvVar = "WORKER_SERVICE_ACCOUNT" // DefaultWorkerServiceAccountName is the default value to use if WorkerServiceAccountEnvVar is // undefined (for compatibility purposes) DefaultWorkerServiceAccountName = "pachyderm-worker" // UploadConcurrencyLimitEnvVar is the environment variable for the upload concurrency limit. // EnvVar defined in src/internal/serviceenv/config.go UploadConcurrencyLimitEnvVar = "STORAGE_UPLOAD_CONCURRENCY_LIMIT" StorageCompactionShardSizeThresholdEnvVar = "STORAGE_COMPACTION_SHARD_SIZE_THRESHOLD" StorageCompactionShardCountThresholdEnvVar = "STORAGE_COMPACTION_SHARD_COUNT_THRESHOLD" StorageMemoryThresholdEnvVar = "STORAGE_MEMORY_THRESHOLD" StorageLevelFactorEnvVar = "STORAGE_LEVEL_FACTOR" StorageMaxFanInEnvVar = "STORAGE_COMPACTION_MAX_FANIN" StorageMaxOpenFileSetsEnvVar = "STORAGE_FILESETS_MAX_OPEN" StorageDiskCacheSizeEnvVar = "STORAGE_DISK_CACHE_SIZE" StorageMemoryCacheSizeEnvVar = "STORAGE_MEMORY_CACHE_SIZE" SidecarMemoryRequestEnvVar = "K8S_MEMORY_REQUEST" SidecarMemoryLimitEnvVar = "K8S_MEMORY_LIMIT" )
Variables ¶
This section is empty.
Functions ¶
func CrashingMonitorSideEffect ¶
func CrashingMonitorSideEffect(toggle sideEffectToggle) sideEffect
func FinishCommitsSideEffect ¶
func FinishCommitsSideEffect() sideEffect
func GetBackendSecretVolumeAndMount ¶
func GetBackendSecretVolumeAndMount() (v1.Volume, v1.VolumeMount)
GetBackendSecretVolumeAndMount returns a properly configured Volume and VolumeMount object
func NewAPIServer ¶
NewAPIServer creates an APIServer and runs the master loop in the background
func NewAPIServerNoMaster ¶
NewAPIServerNoMaster creates an APIServer without running the master loop in the background.
func NewSidecarAPIServer ¶
func NewSidecarAPIServer( env Env, namespace string, workerGrpcPort uint16, peerPort uint16, ) (*apiServer, error)
NewSidecarAPIServer creates an APIServer that has limited functionalities and is meant to be run as a worker sidecar. It cannot, for instance, create pipelines.
func ParseLokiLine ¶
func ParseLokiLine(inputLine string, msg *pps.LogMessage) error
func PipelineMonitorSideEffect ¶
func PipelineMonitorSideEffect(toggle sideEffectToggle) sideEffect
func RepoNameToEnvString ¶
RepoNameToEnvString is a helper which uppercases a repo name for use in environment variable names.
func ResourcesSideEffect ¶
func ResourcesSideEffect(toggle sideEffectToggle) sideEffect
func RestartSideEffect ¶
func RestartSideEffect() sideEffect
func ScaleWorkersSideEffect ¶
func ScaleWorkersSideEffect(toggle sideEffectToggle) sideEffect
Types ¶
type Env ¶
type Env struct { DB *pachsql.DB TxnEnv *txnenv.TransactionEnv Listener collection.PostgresListener KubeClient kubernetes.Interface EtcdClient *etcd.Client EtcdPrefix string TaskService task.Service // TODO: make this just a *loki.Client // This is not a circular dependency GetLokiClient func() (*loki.Client, error) PFSServer pfsserver.APIServer AuthServer authserver.APIServer // TODO: This should just be a pach client for the needed services. // serviceenv blocks until everything is done though, so we can't get it until after setup is done. GetPachClient func(context.Context) *client.APIClient Reporter *metrics.Reporter BackgroundContext context.Context Config pachconfig.Configuration PachwInSidecar bool }
Env contains the dependencies needed to create an API Server
type InfraDriver ¶
type InfraDriver interface { // Creates a pipeline's services, secrets, and replication controllers. CreatePipelineResources(ctx context.Context, pi *pps.PipelineInfo) error // Deletes a pipeline's services, secrets, and replication controllers. // NOTE: It doesn't return a stepError, leaving retry behavior to the caller DeletePipelineResources(ctx context.Context, pipeline *pps.Pipeline) error ReadReplicationController(ctx context.Context, pi *pps.PipelineInfo) (*v1.ReplicationControllerList, error) // UpdateReplicationController intends to server {scaleUp,scaleDown}Pipeline. // It includes all of the logic for writing an updated RC spec to kubernetes, // and updating/retrying if k8s rejects the write. It presents a strange API, // since the the RC being updated is already available to the caller, but update() // may be called muliple times if the k8s write fails. It may be helpful to think // of the 'old' rc passed to update() as mutable. UpdateReplicationController(ctx context.Context, old *v1.ReplicationController, update func(rc *v1.ReplicationController) bool) error ListReplicationControllers(ctx context.Context) (*v1.ReplicationControllerList, error) WatchPipelinePods(ctx context.Context) (<-chan watch.Event, func(), error) }
type JobFinisher ¶
type JobFinisher struct {
// contains filtered or unexported fields
}
func (*JobFinisher) FinishJob ¶
func (jf *JobFinisher) FinishJob(commitInfo *pfs.CommitInfo)
type JobStopper ¶
type JobStopper struct {
// contains filtered or unexported fields
}
JobStopper is an object that is used to stop jobs in response to a commitset being removed by PFS. The transactionenv package provides the interface for this and will call the Run function at the end of a transaction.
func (*JobStopper) Run ¶
func (t *JobStopper) Run(ctx context.Context) error
Run stops any jobs for the removed commitsets or commits.
func (*JobStopper) StopJob ¶ added in v2.8.0
func (t *JobStopper) StopJob(commit *pfs.Commit)
StopJob notifies PPS that a commit has been deleted in the transaction, and any jobs will be stopped for it at the end of the transaction. This will be performed by the Run function.
func (*JobStopper) StopJobSet ¶ added in v2.8.0
func (t *JobStopper) StopJobSet(commitset *pfs.CommitSet)
StopJobSet notifies PPS that a commitset has been deleted in the transaction, and any jobs will be stopped for it at the end of the transaction. This will be performed by the Run function.
type PipelineStateDriver ¶
type PipelineStateDriver interface { // returns PipelineInfo corresponding to the latest pipeline state, a context loaded with the pipeline's auth info, and error // NOTE: returns nil, nil, nil if the step is found to be a delete operation FetchState(ctx context.Context, pipeline *pps.Pipeline) (*pps.PipelineInfo, context.Context, error) // setPipelineState set's pc's state in the collection to 'state'. This will trigger a // collection watch event and cause step() to eventually run again. SetState(ctx context.Context, specCommit *pfs.Commit, state pps.PipelineState, reason string) error // TransitionState is similar to SetState, except that it checks whether pipelineInfo @ specCommit // is in one of the 'from' states TransitionState(ctx context.Context, specCommit *pfs.Commit, from []pps.PipelineState, to pps.PipelineState, reason string) error // wraps a Watcher on the pipelines collection Watch(ctx context.Context) (<-chan *watch.Event, func(), error) // list all PipelineInfos ListPipelineInfo(ctx context.Context, f func(*pps.PipelineInfo) error) error GetPipelineInfo(ctx context.Context, pipeline *pps.Pipeline, version int) (*pps.PipelineInfo, error) }
type Propagater ¶
type Propagater struct {
// contains filtered or unexported fields
}
Propagater is an object that is used to create jobs in response to a new commitset in PFS. The transactionenv package provides the interface for this and will call the Run function at the end of a transaction.
func (*Propagater) PropagateJobs ¶
func (t *Propagater) PropagateJobs()
PropagateJobs notifies PPS that a commitset has been modified in the transaction, and any jobs will be created for it at the end of the transaction. This will be performed by the Run function.