workflow

package
v0.41.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2024 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Overview

Package workflow contains an experimental workflow for Archivemica transfers.

It's not generalized since it contains client-specific activities. However, the long-term goal is to build a system where workflows and activities are dynamically set up based on user input.

Index

Constants

This section is empty.

Variables

View Source
var AsyncCompletionActivityName = "async-completion-activity"
View Source
var ErrAsyncCompletionAbandoned = errors.New("user abandoned")

Functions

This section is empty.

Types

type AsyncCompletionActivity added in v0.22.0

type AsyncCompletionActivity struct {
	// contains filtered or unexported fields
}

func NewAsyncCompletionActivity added in v0.22.0

func NewAsyncCompletionActivity(colsvc collection.Service) *AsyncCompletionActivity

func (*AsyncCompletionActivity) Execute added in v0.22.0

func (a *AsyncCompletionActivity) Execute(ctx context.Context, colID uint) (string, error)

type ProcessingWorkflow

type ProcessingWorkflow struct {
	// contains filtered or unexported fields
}

func NewProcessingWorkflow

func NewProcessingWorkflow(h *hooks.Hooks, colsvc collection.Service, pipelineRegistry *pipeline.Registry, l logr.Logger) *ProcessingWorkflow

func (*ProcessingWorkflow) Execute

ProcessingWorkflow orchestrates all the activities related to the processing of a SIP in Archivematica, including is retrieval, creation of transfer, etc...

Retrying this workflow would result in a new Archivematica transfer. We do not have a retry policy in place. The user could trigger a new instance via the API.

func (*ProcessingWorkflow) SessionHandler

func (w *ProcessingWorkflow) SessionHandler(sessCtx temporalsdk_workflow.Context, attempt int, tinfo *TransferInfo, nameInfo nha.NameInfo, validationConfig validation.Config, timer *Timer) error

SessionHandler runs activities that belong to the same session.

type Timer added in v0.33.0

type Timer struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewTimer added in v0.33.0

func NewTimer() *Timer

func (*Timer) Exceeded added in v0.33.0

func (t *Timer) Exceeded() bool

func (*Timer) WithTimeout added in v0.33.0

type TransferInfo

type TransferInfo struct {
	// TempFile is the temporary location where the blob is downloaded.
	//
	// It is populated by the workflow with the result of DownloadActivity.
	TempFile string

	// TransferID given by Archivematica.
	//
	// It is populated by TransferActivity.
	TransferID string

	// SIPID given by Archivematica.
	//
	// It is populated by PollTransferActivity.
	SIPID string

	// Enduro internal collection ID.
	// The zero value represents a new collection. It can be used to indicate
	// an existing collection in retries.
	//
	// It is populated via the workflow request or createPackageLocalActivity.
	CollectionID uint

	// Name of the watcher that received this blob.
	//
	// It is populated via the workflow request. Expect an empty string when
	// the workflow was started by a batch.
	WatcherName string

	// Name of the pipeline to be used for processing.
	//
	// It is populated by this workflow after the list provided by the user or
	// the list of configured pipelines in the system.
	PipelineName string

	// Retention period.
	// Period of time to schedule the deletion of the original blob from the
	// watched data source. nil means no deletion.
	//
	// It is populated via the workflow request.
	RetentionPeriod *time.Duration

	// Directory where the transfer is moved to once processing has completed
	// successfully.
	//
	// It is populated via the workflow request.
	CompletedDir string

	// Whether the top-level directory is meant to be stripped.
	//
	// It is populated via the workflow request.
	StripTopLevelDir bool

	// Whether hidden files are excluded from the transfer
	//
	// It is populated via the workflow request.
	ExcludeHiddenFiles bool

	// Key of the blob.
	//
	// It is populated via the workflow request.
	Key string

	// Whether the blob is a directory (fs watcher)
	//
	// It is populated via the workflow request.
	IsDir bool

	// Batch directory that contains the blob.
	//
	// It is populated via the workflow request.
	BatchDir string

	// StoredAt is the time when the AIP is stored.
	//
	// It is populated by PollIngestActivity as long as Ingest completes.
	StoredAt time.Time

	// PipelineConfig is the configuration of the pipeline that this workflow
	// uses to provide access to its activities.
	//
	// It is populated by loadConfigLocalActivity.
	PipelineConfig *pipeline.Config

	// Processing configuration name.
	//
	// It is populated via the workflow request.
	ProcessingConfig string

	// PipelineID is the UUID of the Archivematica pipeline. Extracted from
	// the API response header when the transfer is submitted.
	//
	// It is populated by transferActivity.
	PipelineID string

	// Hooks is the hook config store.
	//
	// It is populated by loadConfigLocalActivity.
	Hooks map[string]map[string]interface{}

	// Information about the bundle (transfer) that we submit to Archivematica.
	// Full path, relative path...
	//
	// It is populated by BundleActivity.
	Bundle activities.BundleActivityResult

	// Archivematica transfer type.
	//
	// It is populated via the workflow request.
	TransferType string

	MetadataConfig metadata.Config
}

TransferInfo is shared state that is passed down to activities. It can be useful for hooks that may require quick access to processing state. TODO: clean this up, e.g.: it can embed a collection.Collection.

func (TransferInfo) ProcessingConfiguration added in v0.34.0

func (tinfo TransferInfo) ProcessingConfiguration() string

Directories

Path Synopsis
Package activities implements Enduro's workflow activities.
Package activities implements Enduro's workflow activities.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL