Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertJobToPreservationTask(job amclient.Job) datatypes.PreservationTask
- type Config
- type DeleteTransferActivity
- type DeleteTransferActivityParams
- type DeleteTransferActivityResult
- type JobTracker
- type PollIngestActivity
- type PollIngestActivityParams
- type PollIngestActivityResult
- type PollTransferActivity
- type PollTransferActivityParams
- type PollTransferActivityResult
- type StartTransferActivity
- type StartTransferActivityParams
- type StartTransferActivityResult
- type UploadTransferActivity
- type UploadTransferActivityParams
- type UploadTransferActivityResult
Constants ¶
const DeleteTransferActivityName = "DeleteTransferActivity"
const PollIngestActivityName = "poll-ingest-activity"
const PollTransferActivityName = "poll-transfer-activity"
const StartTransferActivityName = "start-transfer-activity"
const UploadTransferActivityName = "UploadTransferActivity"
Variables ¶
var ( // ErrWorkOngoing indicates work is ongoing and polling should continue. ErrWorkOngoing = errors.New("work ongoing") // ErrBadRequest respresents an AM "400 Bad request" response, which can // occur while a transfer or ingest is still processing and may require // special handling. ErrBadRequest = errors.New("Archivematica response: 400 Bad request") )
Functions ¶
func ConvertJobToPreservationTask ¶
func ConvertJobToPreservationTask(job amclient.Job) datatypes.PreservationTask
ConvertJobToPreservationTask converts an amclient.Job to a datatypes.PreservationTask.
Types ¶
type Config ¶
type Config struct { // Archivematica server address. Address string // Archivematica API user. User string // Archivematica API key. APIKey string // Archivematica processing configuration to use (default: "automated"). ProcessingConfig string // SFTP configuration for uploading transfers to Archivematica. SFTP sftp.Config // TransferSourcePath is the path to an Archivematica transfer source // directory. It is used in the POST /api/v2beta/package "path" parameter // to start a transfer via the API. TransferSourcePath must be prefixed with // the UUID of an AMSS transfer source directory, optionally followed by a // relative path from the source dir (e.g. // "749ef452-fbed-4d50-9072-5f98bc01e52e:sftp_upload"). TransferSourcePath string // ZipPIP specifies whether or not a PIP should be zipped before being sent // from Enduro to Archivematica. ZipPIP bool // Capacity sets the maximum number of worker sessions the worker can // handle at one time (default: 1). Capacity int // PollInterval is the time to wait between poll requests to the AM API. PollInterval time.Duration // TransferDeadline is the maximum time to wait for a transfer to complete. // Set to zero for no deadline. TransferDeadline time.Duration }
type DeleteTransferActivity ¶
type DeleteTransferActivity struct {
// contains filtered or unexported fields
}
func NewDeleteTransferActivity ¶
func NewDeleteTransferActivity(client sftp.Client) *DeleteTransferActivity
func (*DeleteTransferActivity) Execute ¶
func (a *DeleteTransferActivity) Execute( ctx context.Context, params *DeleteTransferActivityParams, ) (*DeleteTransferActivityResult, error)
type DeleteTransferActivityParams ¶
type DeleteTransferActivityParams struct {
Destination string
}
type DeleteTransferActivityResult ¶
type DeleteTransferActivityResult struct{}
type JobTracker ¶
type JobTracker struct {
// contains filtered or unexported fields
}
func NewJobTracker ¶
func NewJobTracker( clock clockwork.Clock, jobSvc amclient.JobsService, ingestsvc ingest.Service, presActionID int, ) *JobTracker
func (*JobTracker) SavePreservationTasks ¶
SavePreservationTasks queries the Archivematica jobs list endpoint to get a list of completed jobs related to the transfer or ingest identified by unitID, then saves any new jobs as preservation tasks.
type PollIngestActivity ¶
type PollIngestActivity struct {
// contains filtered or unexported fields
}
func NewPollIngestActivity ¶
func NewPollIngestActivity( cfg *Config, clock clockwork.Clock, ingSvc amclient.IngestService, jobSvc amclient.JobsService, ingestsvc ingest.Service, ) *PollIngestActivity
func (*PollIngestActivity) Execute ¶
func (a *PollIngestActivity) Execute( ctx context.Context, params *PollIngestActivityParams, ) (*PollIngestActivityResult, error)
Execute polls Archivematica for the status of an ingest and returns when ingest is complete or returns an error status. Execute sends an activity heartbeat after each poll.
A response status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.
type PollTransferActivity ¶
type PollTransferActivity struct {
// contains filtered or unexported fields
}
func NewPollTransferActivity ¶
func NewPollTransferActivity( cfg *Config, clock clockwork.Clock, tfrSvc amclient.TransferService, jobSvc amclient.JobsService, ingestsvc ingest.Service, ) *PollTransferActivity
func (*PollTransferActivity) Execute ¶
func (a *PollTransferActivity) Execute( ctx context.Context, params *PollTransferActivityParams, ) (*PollTransferActivityResult, error)
Execute polls Archivematica for the status of a transfer and returns when the transfer is complete or returns an error status. Execute sends an activity heartbeat after each poll.
On each poll, Execute requests an updated list of AM jobs performed and saves the job data to the ingest service as preservation tasks.
A transfer status of "REJECTED", "FAILED", "USER_INPUT", or "BACKLOG" returns a temporal.NonRetryableApplicationError to indicate that processing can not continue.
type StartTransferActivity ¶
type StartTransferActivity struct {
// contains filtered or unexported fields
}
func NewStartTransferActivity ¶
func NewStartTransferActivity(cfg *Config, amps amclient.PackageService) *StartTransferActivity
func (*StartTransferActivity) Execute ¶
func (a *StartTransferActivity) Execute( ctx context.Context, opts *StartTransferActivityParams, ) (*StartTransferActivityResult, error)
Execute sends a request to the Archivematica API to start a new "auto-approved" transfer. If the request is successful a transfer UUID is returned. An error response will return a retryable or non-retryable temporal.ApplicationError, depending on the nature of the error.
type StartTransferActivityResult ¶
type StartTransferActivityResult struct {
TransferID string
}
type UploadTransferActivity ¶
type UploadTransferActivity struct {
// contains filtered or unexported fields
}
UploadTransferActivity uploads a transfer via the SFTP client, and sends a periodic Temporal Heartbeat at the given heartRate.
func NewUploadTransferActivity ¶
func NewUploadTransferActivity(client sftp.Client, heartRate time.Duration) *UploadTransferActivity
NewUploadTransferActivity initializes and returns a new UploadTransferActivity.
func (*UploadTransferActivity) Execute ¶
func (a *UploadTransferActivity) Execute( ctx context.Context, params *UploadTransferActivityParams, ) (*UploadTransferActivityResult, error)
Execute copies the source transfer to the destination via SFTP.
func (*UploadTransferActivity) Heartbeat ¶
func (a *UploadTransferActivity) Heartbeat(ctx context.Context, upload sftp.AsyncUpload, fileSize int64) error
Heartbeat sends a periodic Temporal heartbeat, which includes the number of bytes uploaded, until the upload is complete, cancelled or returns an error.
type UploadTransferActivityParams ¶
type UploadTransferActivityParams struct { // Local path of the source file. SourcePath string }
type UploadTransferActivityResult ¶
type UploadTransferActivityResult struct { // Bytes copied to the remote file over the SFTP connection. BytesCopied int64 // Full path of the destination file including `remoteDir` config path. RemoteFullPath string // Path of the destination file relative to the `remoteDir` config path. RemoteRelativePath string }