Documentation ¶
Index ¶
- func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
- func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, ...) *core.CatalogMetadata
- func GenerateArtifactTagName(ctx context.Context, inputs *core.LiteralMap) (string, error)
- func GenerateDatasetIDForTask(ctx context.Context, k catalog.Key) (*datacatalog.DatasetID, error)
- func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, ...) (*core.LiteralMap, error)
- func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
- func GetOrDefault(m map[string]string, key, defaultValue string) string
- func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) (*core.TaskExecutionIdentifier, error)
- type CatalogClient
- func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, ...) (*datacatalog.Artifact, error)
- func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
- func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry, error)
- func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
- func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
- func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, ...) (*datacatalog.Reservation, error)
- func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, ...) (catalog.Status, error)
- func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DatasetIDToIdentifier ¶ added in v0.7.0
func DatasetIDToIdentifier(id *datacatalog.DatasetID) *core.Identifier
func EventCatalogMetadata ¶ added in v0.7.0
func EventCatalogMetadata(datasetID *datacatalog.DatasetID, tag *datacatalog.Tag, sourceID *core.TaskExecutionIdentifier) *core.CatalogMetadata
Given the Catalog Information (returned from a Catalog call), returns the CatalogMetadata that is populated in the event.
func GenerateArtifactTagName ¶
Generate a tag by hashing the input values
func GenerateDatasetIDForTask ¶
Get the DataSetID for a task. NOTE: the version of the task is a combination of both the discoverable_version and the task signature. This is because the interface may of changed even if the discoverable_version hadn't.
func GenerateTaskOutputsFromArtifact ¶
func GenerateTaskOutputsFromArtifact(id core.Identifier, taskInterface core.TypedInterface, artifact *datacatalog.Artifact) (*core.LiteralMap, error)
Transform the artifact Data into task execution outputs as a literal map
func GetArtifactMetadataForSource ¶ added in v0.7.0
func GetArtifactMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
func GetDatasetMetadataForSource ¶ added in v0.7.0
func GetDatasetMetadataForSource(taskExecutionID *core.TaskExecutionIdentifier) *datacatalog.Metadata
Understanding Catalog Identifiers DatasetID represents the ID of the dataset. For Flyte this represents the ID of the generating task and the version calculated as the hash of the interface & cache version. refer to `GenerateDatasetIDForTask` TaskID is the same as the DatasetID + name: (DataSetID - namespace) + task version which is stored in the metadata ExecutionID is stored only in the metadata (project and domain available after Jul-2020) NodeExecID = Execution ID + Node ID (available after Jul-2020) TaskExecID is the same as the NodeExecutionID + attempt (attempt is available in Metadata) after Jul-2020
func GetOrDefault ¶ added in v0.7.0
Returns a default value, if the given key is not found in the map, else returns the value in the map
func GetSourceFromMetadata ¶ added in v0.7.0
func GetSourceFromMetadata(datasetMd, artifactMd *datacatalog.Metadata, currentID core.Identifier) (*core.TaskExecutionIdentifier, error)
GetSourceFromMetadata returns the Source TaskExecutionIdentifier from the catalog metadata For all the information not available it returns Unknown. This is because as of July-2020 Catalog does not have all the information. After the first deployment of this code, it will have this and the "unknown's" can be phased out
Types ¶
type CatalogClient ¶
type CatalogClient struct {
// contains filtered or unexported fields
}
This is the client that caches task executions to DataCatalog service.
func NewDataCatalog ¶
func NewDataCatalog(ctx context.Context, endpoint string, insecureConnection bool, maxCacheAge time.Duration, useAdminAuth bool, authOpt grpc.DialOption) (*CatalogClient, error)
Create a new Datacatalog client for task execution caching
func (*CatalogClient) CreateArtifact ¶
func (m *CatalogClient) CreateArtifact(ctx context.Context, datasetID *datacatalog.DatasetID, outputs *core.LiteralMap, md *datacatalog.Metadata) (*datacatalog.Artifact, error)
func (*CatalogClient) CreateDataset ¶
func (m *CatalogClient) CreateDataset(ctx context.Context, key catalog.Key, metadata *datacatalog.Metadata) (*datacatalog.DatasetID, error)
func (*CatalogClient) Get ¶
Get the cached task execution from Catalog. These are the steps taken: - Verify there is a Dataset created for the Task - Lookup the Artifact that is tagged with the hash of the input values - The artifactData contains the literal values that serve as the task outputs
func (*CatalogClient) GetArtifactByTag ¶
func (m *CatalogClient) GetArtifactByTag(ctx context.Context, tagName string, dataset *datacatalog.Dataset) (*datacatalog.Artifact, error)
Helper method to retrieve an artifact by the tag
func (*CatalogClient) GetDataset ¶
func (m *CatalogClient) GetDataset(ctx context.Context, key catalog.Key) (*datacatalog.Dataset, error)
Helper method to retrieve a dataset that is associated with the task
func (*CatalogClient) GetOrExtendReservation ¶ added in v0.16.1
func (m *CatalogClient) GetOrExtendReservation(ctx context.Context, key catalog.Key, ownerID string, heartbeatInterval time.Duration) (*datacatalog.Reservation, error)
GetOrExtendReservation attempts to get a reservation for the cachable task. If you have previously acquired a reservation it will be extended. If another entity holds the reservation that is returned.
func (*CatalogClient) Put ¶
func (m *CatalogClient) Put(ctx context.Context, key catalog.Key, reader io.OutputReader, metadata catalog.Metadata) (catalog.Status, error)
Catalog the task execution as a cached Artifact. We associate an Artifact as the cached data by tagging the Artifact with the hash of the input values.
The steps taken to cache an execution: - Ensure a Dataset exists for the Artifact. The Dataset represents the proj/domain/name/version of the task - Create an Artifact with the execution data that belongs to the dataset - Tag the Artifact with a hash generated by the input values
func (*CatalogClient) ReleaseReservation ¶ added in v0.16.1
func (m *CatalogClient) ReleaseReservation(ctx context.Context, key catalog.Key, ownerID string) error
ReleaseReservation attempts to release a reservation for a cachable task. If the reservation does not exist (e.x. it never existed or has been acquired by another owner) then this call still succeeds.