dataimport

package
v4.9.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 26, 2024 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Implements importer triggering based on SNS queues. This decodes incoming SNS messages and extracts files ready for importer code to run

Exposes the interface of the dataset importer aka converter and selecting one automatically based on what files are in the folder being imported. The converter supports various formats as delivered by GDS or test instruments and this is inteded to be extendable further to other lab instruments and devices in future.

Example (DecodeImportTrigger_Manual)

Trigger for a manual dataset regeneration (user clicks save button on dataset edit page)

trigger := `{
	"datasetID": "189137412",
	"jobID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, jobId, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobId, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: "189137412"
Job: "dataimport-zmzddoytch2krd7n"
Err: "<nil>"
Example (DecodeImportTrigger_ManualBadDatasetID)
trigger := `{
	"datasetID": "",
	"jobID": "dataimport-zmzddoytch2krd7n"
}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Job: ""
Err: "Failed to find dataset ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadLogID)
trigger := `{
		"datasetID": "qwerty"
	}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Job: ""
Err: "Failed to find job ID in reprocess trigger"
Example (DecodeImportTrigger_ManualBadMsg)
trigger := `{
	"weird": "message"
}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Job: ""
Err: "Unexpected or no message type embedded in triggering SNS message"
Example (DecodeImportTrigger_OCS)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "189137412-07-09-2022-10-07-57.zip",
                    "size": 54237908,
                    "eTag": "b21ebca14f67255be1cd28c01d494508-7",
                    "sequencer": "0063243D6858D568F0"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, logID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nLog Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(logID), err)
Output:

Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "189137412-07-09-2022-10-07-57.zip"
Dataset: "189137412"
Log Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS2)

Trigger from when a new zip arrives from the pipeline

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-25T14:33:49.456Z",
            "eventName": "ObjectCreated:Put",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "3.12.95.94"
            },
            "responseElements": {
                "x-amz-request-id": "K811ZDJ52EYBJ8P2",
                "x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "197329413-25-09-2022-14-33-39.zip",
                    "size": 1388,
                    "eTag": "932bda7d32c05d90ecc550d061862994",
                    "sequencer": "00633066CD68A4BF43"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(jobID), err)
Output:

Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "197329413-25-09-2022-14-33-39.zip"
Dataset: "197329413"
Job Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS3)

Trigger from when a new zip arrives from the pipeline but pipeline stores it in a subdir of the bucket

trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:s3",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-25T14:33:49.456Z",
            "eventName": "ObjectCreated:Put",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "3.12.95.94"
            },
            "responseElements": {
                "x-amz-request-id": "K811ZDJ52EYBJ8P2",
                "x-amz-id-2": "R7bGQ2fOjvSZHkHez700w3wRVpn32nmr6jVPVYhKtNE2c2KYOmgm9hjmOA5WSQFh8faLRe6fHAmANKSTNRhwCq7Xgol0DgX4"
            },
            "s3": {
                "s3SchemaVersion": "1.0",
                "configurationId": "OTBjMjZmYzAtYThlOC00OWRmLWIwMzUtODkyZDk0YmRhNzkz",
                "bucket": {
                    "name": "prodpipeline-rawdata202c7bd0-o40ktu17o2oj",
                    "ownerIdentity": {
                        "principalId": "AP902Y0PI20DF"
                    },
                    "arn": "arn:aws:s3:::prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
                },
                "object": {
                    "key": "data/197329413-25-09-2022-14-33-39.zip",
                    "size": 1388,
                    "eTag": "932bda7d32c05d90ecc550d061862994",
                    "sequencer": "00633066CD68A4BF43"
                }
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))

// NOTE: we're only checking the length of the log ID because it's a timestamp+random chars. Other code has this stubbed out but here it's probably sufficient
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob Str Len: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, len(jobID), err)
Output:

Source Bucket: "prodpipeline-rawdata202c7bd0-o40ktu17o2oj"
Source file: "data/197329413-25-09-2022-14-33-39.zip"
Dataset: "197329413"
Job Str Len: "43"
Err: "<nil>"
Example (DecodeImportTrigger_OCS_BadEventType)
trigger := `{
    "Records": [
        {
            "eventVersion": "2.1",
            "eventSource": "aws:sqs",
            "awsRegion": "us-east-1",
            "eventTime": "2022-09-16T09:10:28.417Z",
            "eventName": "ObjectCreated:CompleteMultipartUpload",
            "userIdentity": {
                "principalId": "AWS:AIDA6AOWGDOHF37MOKWLS"
            },
            "requestParameters": {
                "sourceIPAddress": "81.154.57.137"
            },
            "responseElements": {
                "x-amz-request-id": "G3QWWT0BAYKP81QK",
                "x-amz-id-2": "qExUWHHDE1nL+UP3zim1XA7FIXRUoKxlIrJt/7ULAtn08/+EvRCt4sChLhCGEqMo7ny4CU/KufMNmOcyZsDPKGWHT2ukMbo+"
            }
        }
    ]
}`

sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Job: ""
Err: "Failed to decode dataset import trigger: Failed to decode sqs body to an S3 event: unexpected end of JSON input"
Example (DecodeImportTrigger_OCS_Error)
trigger := `{
		"Records": []
}`
sourceBucket, sourceFilePath, datasetID, jobID, err := decodeImportTrigger([]byte(trigger))
fmt.Printf("Source Bucket: \"%v\"\nSource file: \"%v\"\nDataset: \"%v\"\nJob: \"%v\"\nErr: \"%v\"\n", sourceBucket, sourceFilePath, datasetID, jobID, err)
Output:

Source Bucket: ""
Source file: ""
Dataset: ""
Job: ""
Err: "Unexpected or no message type embedded in triggering SNS message"
Example (GetUpdateType_Drive)
newSummary := protos.ScanItem{
	Meta: map[string]string{"DriveID": "997"},
}
oldSummary := protos.ScanItem{
	Meta: map[string]string{"DriveID": "0"},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

housekeeping|<nil>
Example (GetUpdateType_LessContextImages)
newSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    3,
		},
	},
}
oldSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    5,
		},
	},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

image|<nil>
Example (GetUpdateType_MoreContextImages)
newSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    3,
		},
	},
}
oldSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    0,
		},
	},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

image|<nil>
Example (GetUpdateType_NormalSpectra)
newSummary := protos.ScanItem{
	ContentCounts: map[string]int32{"NormalSpectra": 100},
}
oldSummary := protos.ScanItem{
	ContentCounts: map[string]int32{"NormalSpectra": 10},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

spectra|<nil>
Example (GetUpdateType_RTT)
newSummary := protos.ScanItem{
	Meta: map[string]string{"RTT": "1234"},
}
oldSummary := protos.ScanItem{
	Meta: map[string]string{"RTT": "123"},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

unknown|<nil>
Example (GetUpdateType_SameContextImages)
newSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    3,
		},
	},
}
oldSummary := protos.ScanItem{
	DataTypes: []*protos.ScanItem_ScanTypeCount{
		&protos.ScanItem_ScanTypeCount{
			DataType: protos.ScanDataType_SD_IMAGE,
			Count:    3,
		},
	},
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

unknown|<nil>
Example (GetUpdateType_Title)
newSummary := protos.ScanItem{
	Title: "Analysed rock",
}
oldSummary := protos.ScanItem{
	Title: "Freshly downloaded rock",
}

upd, err := getUpdateType(&newSummary, &oldSummary)

fmt.Printf("%v|%v\n", upd, err)
Output:

housekeeping|<nil>
Example (ImportForTrigger_Manual_EM)
Didnt get this working when the above was changed. Problem is this still generates the user name: SBUImport, so the
  premise of the test fails because it doesn't end up with no user id at that point!
func Test_ImportForTrigger_Manual_SBU_NoAutoShare_FailForPipeline(t *testing.T) {
	remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Manual_OK2", "", "")

	trigger := `{
	"datasetID": "test1234sbu",
	"jobID": "dataimport-unittest123sbu"
}`
	_, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

	// Make sure we got the error
	if !strings.HasSuffix(err.Error(), "Cannot work out groups to auto-share imported dataset with") {
		t.Errorf("ImportForTrigger didnt return expected error")
	}
}

Import a breadboard dataset from manual uploaded zip file

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("ManualEM_OK", specialUserIds.PIXLISESystemUserId, "PIXLFMGroupId")

trigger := `{
	"datasetID": "048300551",
	"jobID": "dataimport-unittest048300551"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

printManualOKLogOutput(log, db, "048300551", 3)
Output:

Errors: <nil>, changes: unknown, isUpdate: false
Logged "Downloading archived zip files...": true
Logged "Downloaded 0 zip files, unzipped 0 files": true
Logged "No zip files found in archive, dataset may have been manually uploaded. Trying to download...": true
Logged "Dataset 048300551 downloaded 3 files from manual upload area": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "Reading 1261 files from spectrum directory...": false
Logged "Reading spectrum [1135/1260] 90%": false
Logged "PMC 1261 has 4 MSA/spectrum entries": false
Logged "WARNING: No main context image determined": false
Logged "Diffraction db saved successfully": true
Logged "Warning: No import.json found, defaults will be used": false
Logged "No auto-share destination found, so only importing user will be able to access this dataset.": false
<nil>|{"id":"048300551","title":"048300551","dataTypes":[{"dataType":"SD_IMAGE","count":4},{"dataType":"SD_XRF","count":242}],"instrument":"PIXL_EM","instrumentConfig":"PIXL-EM-E2E","meta":{"DriveId":"1712","RTT":"048300551","SCLK":"678031418","Site":"","SiteId":"4","Sol":"0125","Target":"","TargetId":"?"},"contentCounts":{"BulkSpectra":2,"DwellSpectra":0,"MaxSpectra":2,"NormalSpectra":242,"PseudoIntensities":121},"creatorUserId":"PIXLISEImport"}
Example (ImportForTrigger_Manual_JPL)

Import a breadboard dataset from manual uploaded zip file

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Manual_OK", specialUserIds.JPLImport, "JPLTestUserGroupId")

trigger := `{
	"datasetID": "test1234",
	"jobID": "dataimport-unittest123"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

printManualOKLogOutput(log, db, "test1234", 3)
Output:

Errors: <nil>, changes: unknown, isUpdate: false
Logged "Downloading archived zip files...": true
Logged "Downloaded 0 zip files, unzipped 0 files": true
Logged "No zip files found in archive, dataset may have been manually uploaded. Trying to download...": true
Logged "Dataset test1234 downloaded 3 files from manual upload area": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "Reading 1261 files from spectrum directory...": true
Logged "Reading spectrum [1135/1260] 90%": true
Logged "PMC 1261 has 4 MSA/spectrum entries": true
Logged "WARNING: No main context image determined": true
Logged "Diffraction db saved successfully": true
Logged "Warning: No import.json found, defaults will be used": true
Logged "No auto-share destination found, so only importing user will be able to access this dataset.": false
<nil>|{"id":"test1234","title":"test1234","dataTypes":[{"dataType":"SD_XRF","count":2520}],"instrument":"JPL_BREADBOARD","instrumentConfig":"Breadboard","meta":{"DriveId":"0","RTT":"","SCLK":"0","Site":"","SiteId":"0","Sol":"","Target":"","TargetId":"0"},"contentCounts":{"BulkSpectra":2,"DwellSpectra":0,"MaxSpectra":2,"NormalSpectra":2520,"PseudoIntensities":0},"creatorUserId":"JPLImport"}
Example (ImportForTrigger_Manual_SBU)

Import a breadboard dataset from manual uploaded zip file

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Manual_OK2", specialUserIds.SBUImport, "SBUTestUserGroupId")

trigger := `{
	"datasetID": "test1234sbu",
	"jobID": "dataimport-unittest123sbu"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

printManualOKLogOutput(log, db, "test1234sbu", 4)
Output:

Errors: <nil>, changes: unknown, isUpdate: false
Logged "Downloading archived zip files...": true
Logged "Downloaded 0 zip files, unzipped 0 files": true
Logged "No zip files found in archive, dataset may have been manually uploaded. Trying to download...": true
Logged "Dataset test1234sbu downloaded 4 files from manual upload area": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "Reading 1261 files from spectrum directory...": true
Logged "Reading spectrum [1135/1260] 90%": true
Logged "PMC 1261 has 4 MSA/spectrum entries": true
Logged "WARNING: No main context image determined": true
Logged "Diffraction db saved successfully": true
Logged "Warning: No import.json found, defaults will be used": false
Logged "No auto-share destination found, so only importing user will be able to access this dataset.": false
<nil>|{"id":"test1234sbu","title":"test1234sbu","dataTypes":[{"dataType":"SD_XRF","count":2520}],"instrument":"SBU_BREADBOARD","instrumentConfig":"StonyBrookBreadboard","meta":{"DriveId":"0","RTT":"","SCLK":"0","Site":"","SiteId":"0","Sol":"","Target":"","TargetId":"0"},"contentCounts":{"BulkSpectra":2,"DwellSpectra":0,"MaxSpectra":2,"NormalSpectra":2520,"PseudoIntensities":0},"creatorUserId":"SBUImport"}
Example (ImportForTrigger_OCS_Archive_BadData)
func startTestWithMockMongo(name string, t *testing.T, testFunc func(mt *mtest.T)) {
	mt := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock))
	defer mt.Close()

	mt.Run(name, testFunc)
}

Import unknown dataset (simulate trigger by OCS pipeline), file goes to archive, then all files downloaded from archive, dataset create fails due to unknown data type

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Archive_BadData", specialUserIds.PIXLISESystemUserId, "PIXLFMGroupId")

// In case it ran before, delete the file from dataset bucket, otherwise we will end for the wrong reason
os.Remove(datasetBucket + "/Archive/70000_069-02-09-2021-06-25-13.zip")

trigger := `{
	"Records": [
		{
			"eventVersion": "2.1",
			"eventSource": "aws:s3",
			"awsRegion": "us-east-1",
			"eventTime": "2022-10-16T22:07:40.929Z",
			"eventName": "ObjectCreated:CompleteMultipartUpload",
			"userIdentity": {
				"principalId": "AWS:123"
			},
			"requestParameters": {
				"sourceIPAddress": "3.213.168.4"
			},
			"responseElements": {
				"x-amz-request-id": "234",
				"x-amz-id-2": "345+678"
			},
			"s3": {
				"s3SchemaVersion": "1.0",
				"configurationId": "id1234",
				"bucket": {
					"name": "./test-data/Archive_BadData/raw-data-bucket",
					"ownerIdentity": {
						"principalId": "AP902Y0PI20DF"
					},
					"arn": "arn:aws:s3:::raw-data-bucket"
				},
				"object": {
					"key": "70000_069-02-09-2021-06-25-13.zip",
					"size": 602000,
					"eTag": "1234567890",
					"sequencer": "00112233445566"
				}
			}
		}
	]
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

// Ensure these log msgs appeared...
requiredLogs := []string{
	"Downloading archived zip files...",
	"Downloaded 2 zip files, unzipped 6 files",
	"Downloading pseudo-intensity ranges...",
	"Downloading user customisation files...",
	"SelectDataConverter: Path contains 3 files...",
	"Failed to open detector.json when determining dataset type",
}

for _, msg := range requiredLogs {
	fmt.Printf("Logged \"%v\": %v\n", msg, log.LogContains(msg))
}
Output:

Errors: Failed to determine dataset type to import., changes: , isUpdate: false
Logged "Downloading archived zip files...": true
Logged "Downloaded 2 zip files, unzipped 6 files": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "SelectDataConverter: Path contains 3 files...": true
Logged "Failed to open detector.json when determining dataset type": true
Example (ImportForTrigger_OCS_Archive_Exists)

Import FM-style (simulate trigger by OCS pipeline), file already in archive, so should do nothing

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Archive_Exists", specialUserIds.PIXLISESystemUserId, "PIXLFMGroupId")
trigger := `{
	"Records": [
		{
			"eventVersion": "2.1",
			"eventSource": "aws:s3",
			"awsRegion": "us-east-1",
			"eventTime": "2022-10-16T22:07:40.929Z",
			"eventName": "ObjectCreated:CompleteMultipartUpload",
			"userIdentity": {
				"principalId": "AWS:123"
			},
			"requestParameters": {
				"sourceIPAddress": "3.213.168.4"
			},
			"responseElements": {
				"x-amz-request-id": "234",
				"x-amz-id-2": "345+678"
			},
			"s3": {
				"s3SchemaVersion": "1.0",
				"configurationId": "id1234",
				"bucket": {
					"name": "./test-data/Archive_Exists/raw-data-bucket",
					"ownerIdentity": {
						"principalId": "AP902Y0PI20DF"
					},
					"arn": "arn:aws:s3:::raw-data-bucket"
				},
				"object": {
					"key": "70000_069-02-09-2021-06-25-13.zip",
					"size": 602000,
					"eTag": "1234567890",
					"sequencer": "00112233445566"
				}
			}
		}
	]
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

msg := "Archiving source file: \"s3://./test-data/Archive_Exists/raw-data-bucket/70000_069-02-09-2021-06-25-13.zip\""
fmt.Printf("Logged \"%v\": %v\n", msg, log.LogContains(msg))

fmt.Printf("Log shows exists in archive: %v\n", strings.Contains(log.LastLogLine(), "File already exists in archive, processing stopped. File was: \"70000_069-02-09-2021-06-25-13.zip\""))
Output:

Errors: <nil>, changes: , isUpdate: false
Logged "Archiving source file: "s3://./test-data/Archive_Exists/raw-data-bucket/70000_069-02-09-2021-06-25-13.zip"": true
Log shows exists in archive: true
Example (ImportForTrigger_OCS_Archive_OK)

Import FM-style (simulate trigger by OCS pipeline), file goes to archive, then all files downloaded from archive and dataset created

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Archive_OK", specialUserIds.PIXLISESystemUserId, "PIXLFMGroupId")
// In case it ran before, delete the file from dataset bucket, otherwise we will end for the wrong reason
os.Remove(datasetBucket + "/Archive/048300551-27-06-2021-09-52-25.zip")

trigger := `{
	"Records": [
		{
			"eventVersion": "2.1",
			"eventSource": "aws:s3",
			"awsRegion": "us-east-1",
			"eventTime": "2022-10-16T22:07:40.929Z",
			"eventName": "ObjectCreated:CompleteMultipartUpload",
			"userIdentity": {
				"principalId": "AWS:123"
			},
			"requestParameters": {
				"sourceIPAddress": "3.213.168.4"
			},
			"responseElements": {
				"x-amz-request-id": "234",
				"x-amz-id-2": "345+678"
			},
			"s3": {
				"s3SchemaVersion": "1.0",
				"configurationId": "id1234",
				"bucket": {
					"name": "./test-data/Archive_OK/raw-data-bucket",
					"ownerIdentity": {
						"principalId": "AP902Y0PI20DF"
					},
					"arn": "arn:aws:s3:::raw-data-bucket"
				},
				"object": {
					"key": "048300551-27-06-2021-09-52-25.zip",
					"size": 602000,
					"eTag": "1234567890",
					"sequencer": "00112233445566"
				}
			}
		}
	]
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

printArchiveOKLogOutput(log, db)
Output:

Errors: <nil>, changes: unknown, isUpdate: false
Logged "Downloading archived zip files...": true
Logged "Downloaded 20 zip files, unzipped 364 files": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "This dataset's detector config is PIXL": true
Logged "PMC 218 has 4 MSA/spectrum entries": true
Logged "Main context image: PCW_0125_0678031992_000RCM_N00417120483005510091075J02.png": true
Logged "Diffraction db saved successfully": true
Logged "Applying custom title: Naltsos": true
Logged "Matched aligned image: PCCR0577_0718181212_000MSA_N029000020073728500030LUD01.tif, offset(0, 0), scale(1, 1). Match for aligned index: 0": true
<nil>|{"contentCounts": {"BulkSpectra": 2,"DwellSpectra": 0,"MaxSpectra": 2,"NormalSpectra": 242,"PseudoIntensities": 121},"creatorUserId": "PIXLISEImport","dataTypes": [{"count": 5,"dataType": "SD_IMAGE"},{"count": 1,"dataType": "SD_RGBU"},{"count": 242,"dataType": "SD_XRF"}],"id": "048300551","instrument": "PIXL_FM","instrumentConfig": "PIXL","meta": {"DriveId": "1712","RTT": "048300551","SCLK": "678031418","Site": "","SiteId": "4","Sol": "0125","Target": "","TargetId": "?"},"title": "Naltsos"}
Example (ImportForTrigger_OCS_DatasetEdit)

Import FM-style (simulate trigger by dataset edit screen), should create dataset with custom name+image

remoteFS, log, envName, configBucket, datasetBucket, manualBucket, db := initTest("Archive_OK", specialUserIds.PIXLISESystemUserId, "PIXLFMGroupId")

// To save from checking in 2 sets of the same zip files for this and Example_ImportForTrigger_OCS_Archive_OK, here we copy
// the archive files from the Archive_OK test to here.
// NOTE: This test doesn't get triggered by the arrival of a new archive file, so we have to copy the "new" file from
// the Archive_OK raw bucket separately
err := fileaccess.CopyFileLocally("./test-data/Archive_OK/raw-data-bucket/048300551-27-06-2021-09-52-25.zip", datasetBucket+"/Archive/048300551-27-06-2021-09-52-25.zip")
if err != nil {
	fmt.Println("Failed to copy new archive file")
}

localFS := fileaccess.FSAccess{}
archiveFiles, err := localFS.ListObjects("./test-data/Archive_OK/dataset-bucket/Archive/", "")
if err != nil {
	fmt.Println("Failed to copy archive from OK test to Edit test")
}
for _, fileName := range archiveFiles {
	if strings.HasSuffix(fileName, ".zip") { // Guard from .DS_Store and other garbage
		err = fileaccess.CopyFileLocally("./test-data/Archive_OK/dataset-bucket/Archive/"+fileName, datasetBucket+"/Archive/"+fileName)
		if err != nil {
			fmt.Println("Failed to copy archive from OK test to Edit test")
		}
	}
}

trigger := `{
	"datasetID": "048300551",
	"jobID": "dataimport-unittest123"
}`

result, err := ImportForTrigger([]byte(trigger), envName, configBucket, datasetBucket, manualBucket, db, log, remoteFS)

fmt.Printf("Errors: %v, changes: %v, isUpdate: %v\n", err, result.WhatChanged, result.IsUpdate)

printArchiveOKLogOutput(log, db)
Output:

Errors: <nil>, changes: unknown, isUpdate: true
Logged "Downloading archived zip files...": true
Logged "Downloaded 20 zip files, unzipped 364 files": true
Logged "Downloading pseudo-intensity ranges...": true
Logged "Downloading user customisation files...": true
Logged "This dataset's detector config is PIXL": true
Logged "PMC 218 has 4 MSA/spectrum entries": true
Logged "Main context image: PCW_0125_0678031992_000RCM_N00417120483005510091075J02.png": true
Logged "Diffraction db saved successfully": true
Logged "Applying custom title: Naltsos": true
Logged "Matched aligned image: PCCR0577_0718181212_000MSA_N029000020073728500030LUD01.tif, offset(0, 0), scale(1, 1). Match for aligned index: 0": true
<nil>|{"contentCounts": {"BulkSpectra": 2,"DwellSpectra": 0,"MaxSpectra": 2,"NormalSpectra": 242,"PseudoIntensities": 121},"creatorUserId": "PIXLISEImport","dataTypes": [{"count": 5,"dataType": "SD_IMAGE"},{"count": 1,"dataType": "SD_RGBU"},{"count": 242,"dataType": "SD_XRF"}],"id": "048300551","instrument": "PIXL_FM","instrumentConfig": "PIXL","meta": {"DriveId": "1712","RTT": "048300551","SCLK": "678031418","Site": "","SiteId": "4","Sol": "0125","Target": "","TargetId": "?"},"title": "Naltsos"}

Index

Examples

Constants

This section is empty.

Variables

View Source
var JobIDAutoImportPrefix = "auto-import-"

Functions

func ImportDataset

func ImportDataset(
	localFS fileaccess.FileAccess,
	remoteFS fileaccess.FileAccess,
	configBucket string,
	manualUploadBucket string,
	datasetBucket string,
	db *mongo.Database,
	datasetID string,
	log logger.ILogger,
	justArchived bool,
) (string, *protos.ScanItem, string, bool, error)

ImportFromArchive - Importing from dataset archive area. Calls ImportFromLocalFileSystem Returns: WorkingDir Saved dataset summary structure What changed (as a string), so caller can know what kind of notification to send (if any) IsUpdate flag Error (if any)

func ImportFromLocalFileSystem

func ImportFromLocalFileSystem(
	localFS fileaccess.FileAccess,
	remoteFS fileaccess.FileAccess,
	db *mongo.Database,
	workingDir string,
	localImportPath string,
	localPseudoIntensityRangesPath string,
	datasetBucket string,
	datasetID string,
	log logger.ILogger) (string, error)

ImportFromLocalFileSystem - As the name says, imports from directory on local file system Returns: Dataset ID (in case it was modified during conversion) Error (if there was one)

func TriggerDatasetReprocessViaSNS

func TriggerDatasetReprocessViaSNS(snsSvc awsutil.SNSInterface, jobId string, scanId string, snsTopic string) (*sns.PublishOutput, error)

Firing a trigger message. Anything calling this is triggering a dataset reimport via a lambda function

Types

type DatasetCustomMeta

type DatasetCustomMeta struct {
	Title               string `json:"title"`
	DefaultContextImage string `json:"defaultContextImage"`
}

type ImportResult

type ImportResult struct {
	WorkingDir   string         // so it can be cleaned up by caller if needed
	WhatChanged  string         // what changed between this import and a previous one, for notification reasons
	IsUpdate     bool           // IsUpdate flag
	DatasetTitle string         // Name of the dataset that was imported
	DatasetID    string         // ID of the dataset that was imported
	Logger       logger.ILogger // Caller must call Close() on it, otherwise we may lose the last few log events
}

Structure returned after importing NOTE: the logger must have Close() called on it, otherwise we may lose the last few log events

func ImportForTrigger

func ImportForTrigger(
	triggerMessage []byte,
	envName string,
	configBucket string,
	datasetBucket string,
	manualBucket string,
	db *mongo.Database,
	log logger.ILogger,
	remoteFS fileaccess.FileAccess) (ImportResult, error)

ImportForTrigger - Parses a trigger message (from SNS) and decides what to import Returns: Result struct - NOTE: logger must have Close() called on it, otherwise we may lose the last few log events Error (or nil)

Directories

Path Synopsis
internal
datasetArchive
Implements archiving/retrieval of dataset source zip files as delivered by GDS.
Implements archiving/retrieval of dataset source zip files as delivered by GDS.
output
Allows outputting (in PIXLISE protobuf dataset format) of in-memory representation of PIXL data that importer has read.
Allows outputting (in PIXLISE protobuf dataset format) of in-memory representation of PIXL data that importer has read.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL