processor

package
v0.1.0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package processor provides streaming ingestion of SRA metadata from tar.gz archives, supporting both HTTP URLs and local files.

Index

Constants

This section is empty.

Variables

View Source
var OptimizationRecommendations = []string{
	"1. Increase batch size from 1000 to 5000 for better database throughput",
	"2. Use database transactions to wrap batch inserts",
	"3. Implement connection pooling with max connections = CPU cores",
	"4. Use sync.Pool for buffer reuse to reduce GC pressure",
	"5. Enable HTTP/2 for better network efficiency",
	"6. Consider parallel processing for independent record types",
	"7. Use prepared statements for repeated queries",
	"8. Implement write-ahead logging (WAL) for SQLite",
	"9. Add indexes on frequently queried fields",
	"10. Use PRAGMA optimizations for SQLite performance",
}

Optimization recommendations based on testing

View Source
var SQLiteOptimizations = []string{
	"PRAGMA journal_mode = WAL",
	"PRAGMA synchronous = NORMAL",
	"PRAGMA cache_size = 100000",
	"PRAGMA temp_store = MEMORY",
	"PRAGMA mmap_size = 1073741824",
	"PRAGMA page_size = 32768",
	"PRAGMA wal_checkpoint = PASSIVE",
	"PRAGMA wal_autocheckpoint = 10000",
	"PRAGMA busy_timeout = 10000",
	"PRAGMA foreign_keys = OFF",
}

Database optimizations for SQLite

View Source
var SupportedAnalysisTypes = []string{
	"REFERENCE_ALIGNMENT", "SEQUENCE_VARIATION", "SEQUENCE_ASSEMBLY",
	"SEQUENCE_ANNOTATION", "REFERENCE_SEQUENCE", "SAMPLE_PHENOTYPE",
	"TRANSCRIPTOME_ASSEMBLY", "TAXONOMIC_REFERENCE_SET", "DE_NOVO_ASSEMBLY",
	"GENOME_MAP", "AMR_ANTIBIOGRAM", "PATHOGEN_ANALYSIS",
	"PROCESSED_READS", "SEQUENCE_FLATFILE",
}

Analysis type enumeration constants

View Source
var SupportedFileTypes = []string{
	"sra", "srf", "sff", "fastq", "fasta", "tab", "bam", "bai", "cram", "crai",
	"vcf", "bcf", "vcf_aggregate", "bcf_aggregate", "gff", "gtf", "bed", "bigwig",
	"wiggle", "454_native", "Illumina_native", "Illumina_native_qseq",
	"Illumina_native_scarf", "Illumina_native_fastq", "SOLiD_native",
	"SOLiD_native_csfasta", "SOLiD_native_qual", "PacBio_HDF5",
	"CompleteGenomics_native", "OxfordNanopore_native", "agp", "unlocalised_list",
	"info", "manifest", "readme", "phenotype_file", "BioNano_native",
	"Bionano_native", "chromosome_list", "sample_list", "other",
}

File type enumeration constants

View Source
var SupportedLibrarySelections = []string{
	"RANDOM", "PCR", "RANDOM PCR", "RT-PCR", "HMPR", "MF", "CF-S", "CF-M",
	"CF-H", "CF-T", "MDA", "MSLL", "cDNA", "cDNA_randomPriming",
	"cDNA_oligo_dT", "PolyA", "Oligo-dT", "Inverse rRNA", "Inverse rRNA selection",
	"ChIP", "ChIP-Seq", "MNase", "DNase", "Hybrid Selection",
	"Reduced Representation", "Restriction Digest", "5-methylcytidine antibody",
	"MBD2 protein methyl-CpG binding domain", "CAGE", "RACE", "size fractionation",
	"Padlock probes capture method", "other", "unspecified",
}

Library selection enumeration constants

View Source
var SupportedLibrarySources = []string{
	"GENOMIC", "GENOMIC SINGLE CELL", "TRANSCRIPTOMIC",
	"TRANSCRIPTOMIC SINGLE CELL", "METAGENOMIC", "METATRANSCRIPTOMIC",
	"SYNTHETIC", "VIRAL RNA", "OTHER",
}

Library source enumeration constants

View Source
var SupportedLibraryStrategies = []string{
	"WGS", "WGA", "WXS", "RNA-Seq", "ssRNA-seq", "miRNA-Seq", "ncRNA-Seq",
	"FL-cDNA", "EST", "Hi-C", "ATAC-seq", "WCS", "RAD-Seq", "CLONE",
	"POOLCLONE", "AMPLICON", "CLONEEND", "FINISHING", "ChIP-Seq",
	"MNase-Seq", "DNase-Hypersensitivity", "Bisulfite-Seq", "CTS",
	"MRE-Seq", "MeDIP-Seq", "MBD-Seq", "Tn-Seq", "VALIDATION", "FAIRE-seq",
	"SELEX", "RIP-Seq", "ChIA-PET", "Synthetic-Long-Read", "Targeted-Capture",
	"Tethered Chromatin Conformation Capture", "OTHER",
}

Library strategy enumeration constants

View Source
var SupportedPlatforms = []string{
	"LS454", "ILLUMINA", "HELICOS", "ABI_SOLID", "COMPLETE_GENOMICS",
	"BGISEQ", "OXFORD_NANOPORE", "PACBIO_SMRT", "ION_TORRENT",
	"VELA_DIAGNOSTICS", "CAPILLARY", "GENAPSYS", "DNBSEQ", "ELEMENT",
	"GENEMIND", "ULTIMA", "TAPESTRI", "SALUS", "GENEUS_TECH",
	"SINGULAR_GENOMICS", "GENEXUS", "REVOLOCITY",
}

Platform enumeration constants

Functions

func GetOptimizationSuggestions

func GetOptimizationSuggestions(metrics PerformanceMetrics) []string

GetOptimizationSuggestions returns suggestions based on current performance

Types

type ComprehensiveExtractor

type ComprehensiveExtractor struct {
	// contains filtered or unexported fields
}

ComprehensiveExtractor handles comprehensive extraction of SRA XML data

func NewComprehensiveExtractor

func NewComprehensiveExtractor(db Database, options ExtractionOptions) *ComprehensiveExtractor

NewComprehensiveExtractor creates a new comprehensive extractor

func (*ComprehensiveExtractor) ExtractAnalyses

func (ce *ComprehensiveExtractor) ExtractAnalyses(ctx context.Context, reader io.Reader) error

ExtractAnalyses extracts analysis data

func (*ComprehensiveExtractor) ExtractExperiments

func (ce *ComprehensiveExtractor) ExtractExperiments(ctx context.Context, reader io.Reader) error

ExtractExperiments extracts comprehensive experiment data

func (*ComprehensiveExtractor) ExtractRuns

func (ce *ComprehensiveExtractor) ExtractRuns(ctx context.Context, reader io.Reader) error

ExtractRuns extracts comprehensive run data

func (*ComprehensiveExtractor) ExtractSamples

func (ce *ComprehensiveExtractor) ExtractSamples(ctx context.Context, reader io.Reader) error

ExtractSamples extracts comprehensive sample data

func (*ComprehensiveExtractor) ExtractStudies

func (ce *ComprehensiveExtractor) ExtractStudies(ctx context.Context, reader io.Reader) error

ExtractStudies extracts comprehensive study data

func (*ComprehensiveExtractor) ExtractSubmission

func (ce *ComprehensiveExtractor) ExtractSubmission(ctx context.Context, submission parser.Submission) error

ExtractSubmission extracts a single submission

func (*ComprehensiveExtractor) GetStats

func (ce *ComprehensiveExtractor) GetStats() ExtractionStats

GetStats returns the extraction statistics

func (*ComprehensiveExtractor) PrintStats

func (ce *ComprehensiveExtractor) PrintStats()

PrintStats prints extraction statistics

type Database

type Database interface {
	InsertStudy(study *database.Study) error
	InsertExperiment(exp *database.Experiment) error
	InsertSample(sample *database.Sample) error
	InsertRun(run *database.Run) error
	InsertSubmission(submission *database.Submission) error
	InsertAnalysis(analysis *database.Analysis) error
	BatchInsertExperiments(experiments []database.Experiment) error

	// Pool/multiplex support
	InsertSamplePool(pool *database.SamplePool) error
	GetSamplePools(parentSample string) ([]database.SamplePool, error)
	CountSamplePools() (int, error)
	GetAveragePoolSize() (float64, error)
	GetMaxPoolSize() (int, error)

	// Identifier and link support
	InsertIdentifier(identifier *database.Identifier) error
	GetIdentifiers(recordType, recordAccession string) ([]database.Identifier, error)
	FindRecordsByIdentifier(idValue string) ([]database.Identifier, error)
	InsertLink(link *database.Link) error
	GetLinks(recordType, recordAccession string) ([]database.Link, error)
}

Database defines the storage operations required by the stream processor.

type ExtractionOptions

type ExtractionOptions struct {
	ExtractAttributes     bool // Extract all custom attributes
	ExtractLinks          bool // Extract all external links
	NormalizeOrganism     bool // Normalize organism names
	ExtractFromAttributes bool // Extract known fields from attributes
	BatchSize             int  // Batch size for database operations
	ValidateXML           bool // Validate XML against schemas
	StrictValidation      bool // Fail on validation errors (vs warnings only)
	LogValidationIssues   bool // Log validation issues
}

ExtractionOptions configures extraction behavior

func DefaultExtractionOptions

func DefaultExtractionOptions() ExtractionOptions

DefaultExtractionOptions returns default extraction options

type ExtractionStats

type ExtractionStats struct {
	StudiesProcessed     int
	ExperimentsProcessed int
	SamplesProcessed     int
	RunsProcessed        int
	AnalysesProcessed    int
	SubmissionsProcessed int
	StudiesExtracted     int
	ExperimentsExtracted int
	SamplesExtracted     int
	RunsExtracted        int
	AnalysesExtracted    int
	SubmissionsExtracted int
	ValidationErrors     int
	ValidationWarnings   int
	Errors               []string
	StartTime            time.Time
}

ExtractionStats tracks extraction statistics

type FilterOptions

type FilterOptions struct {
	// Taxonomy filters
	TaxonomyIDs   []int // List of NCBI taxonomy IDs to include
	ExcludeTaxIDs []int // Taxonomy IDs to exclude

	// Date filters
	DateFrom  time.Time // Start date (submission/published date)
	DateTo    time.Time // End date
	DateField string    // "submission", "published", or "last_updated"

	// Organism filters
	Organisms        []string // Scientific names to include
	ExcludeOrganisms []string // Scientific names to exclude

	// Technical filters
	Platforms        []string // Sequencing platforms (ILLUMINA, OXFORD_NANOPORE, etc.)
	Strategies       []string // Library strategies (RNA-Seq, WGS, WES, etc.)
	StudyTypes       []string // Study types
	InstrumentModels []string // Specific instrument models

	// Quality filters
	MinReads int64 // Minimum read count (total_spots)
	MaxReads int64 // Maximum read count
	MinBases int64 // Minimum base count (total_bases)
	MaxBases int64 // Maximum base count

	// Source filters
	Centers   []string // Submission centers
	Countries []string // Geographic origin (from attributes)

	// Control flags
	SkipIfNoMatch bool // Skip entire file if no matches
	StatsOnly     bool // Just count matches without inserting
	Verbose       bool // Print detailed filtering information
}

FilterOptions defines the filtering criteria for processing SRA metadata

func (*FilterOptions) HasFilters

func (f *FilterOptions) HasFilters() bool

HasFilters returns true if any filters are set

func (*FilterOptions) String

func (f *FilterOptions) String() string

String returns a human-readable description of the filters

func (*FilterOptions) Validate

func (f *FilterOptions) Validate() error

Validate checks if the filter options are valid

type FilterStats

type FilterStats struct {
	// Overall counts
	TotalProcessed int64
	TotalMatched   int64
	TotalSkipped   int64

	// Skip reasons
	SkippedByTaxonomy int64
	SkippedByDate     int64
	SkippedByOrganism int64
	SkippedByPlatform int64
	SkippedByStrategy int64
	SkippedByReads    int64
	SkippedByCenter   int64

	// Unique record tracking
	UniqueStudies     map[string]bool
	UniqueExperiments map[string]bool
	UniqueSamples     map[string]bool
	UniqueRuns        map[string]bool

	// Performance metrics
	StartTime      time.Time
	ProcessingTime time.Duration
}

FilterStats tracks statistics about filtered records

func NewFilterStats

func NewFilterStats() *FilterStats

NewFilterStats creates a new FilterStats instance

func (*FilterStats) GetSummary

func (s *FilterStats) GetSummary() string

GetSummary returns a summary of the filter statistics

type FilteredProcessor

type FilteredProcessor struct {
	*StreamProcessor
	// contains filtered or unexported fields
}

FilteredProcessor extends StreamProcessor with filtering capabilities

func NewFilteredProcessor

func NewFilteredProcessor(db Database, filters FilterOptions) (*FilteredProcessor, error)

NewFilteredProcessor creates a new processor with filtering capabilities

func (*FilteredProcessor) GetStats

func (fp *FilteredProcessor) GetStats() *FilterStats

GetStats returns the current filter statistics

func (*FilteredProcessor) ProcessExperiment

func (fp *FilteredProcessor) ProcessExperiment(exp *parser.Experiment) error

ProcessExperiment applies filters to an experiment record

func (*FilteredProcessor) ProcessRun

func (fp *FilteredProcessor) ProcessRun(run *parser.Run) error

ProcessRun applies filters to a run record

func (*FilteredProcessor) ProcessSample

func (fp *FilteredProcessor) ProcessSample(sample *parser.Sample) error

ProcessSample applies filters to a sample record

func (*FilteredProcessor) ProcessStudy

func (fp *FilteredProcessor) ProcessStudy(study *parser.Study) error

ProcessStudy applies filters to a study record

func (*FilteredProcessor) ProcessWithFilters

func (fp *FilteredProcessor) ProcessWithFilters(ctx context.Context, source string) error

ProcessWithFilters processes data with filtering applied

type IdentifierHandler

type IdentifierHandler struct {
	// contains filtered or unexported fields
}

IdentifierHandler manages structured identifier and link storage

func NewIdentifierHandler

func NewIdentifierHandler(db Database) *IdentifierHandler

NewIdentifierHandler creates a new identifier handler

func (*IdentifierHandler) ExtractIdentifiers

func (ih *IdentifierHandler) ExtractIdentifiers(identifiers *parser.Identifiers, recordType, recordAccession string) []StructuredIdentifier

ExtractIdentifiers extracts all identifiers from a record

func (ih *IdentifierHandler) ExtractLinks(links []parser.Link, recordType, recordAccession string) []StructuredLink

ExtractLinks extracts all links from a record

func (*IdentifierHandler) FindRecordsByIdentifier

func (ih *IdentifierHandler) FindRecordsByIdentifier(idValue string) ([]StructuredIdentifier, error)

FindRecordsByIdentifier finds all records with a specific identifier value

func (*IdentifierHandler) GetCrossReferences

func (ih *IdentifierHandler) GetCrossReferences(recordType, recordAccession string) (map[string][]string, error)

GetCrossReferences gets all cross-references for a record

func (*IdentifierHandler) GetIdentifiers

func (ih *IdentifierHandler) GetIdentifiers(recordType, recordAccession string) ([]StructuredIdentifier, error)

GetIdentifiers retrieves all identifiers for a record

func (ih *IdentifierHandler) GetLinks(recordType, recordAccession string) ([]StructuredLink, error)

GetLinks retrieves all links for a record

func (*IdentifierHandler) StoreIdentifiers

func (ih *IdentifierHandler) StoreIdentifiers(identifiers []StructuredIdentifier) error

StoreIdentifiers stores structured identifiers in the database

func (ih *IdentifierHandler) StoreLinks(links []StructuredLink) error

StoreLinks stores structured links in the database

type OptimizedStreamProcessor

type OptimizedStreamProcessor struct {
	*StreamProcessor
	// contains filtered or unexported fields
}

OptimizedStreamProcessor extends StreamProcessor with performance optimizations

func NewOptimizedStreamProcessor

func NewOptimizedStreamProcessor(db Database, opts ProcessorOptions) *OptimizedStreamProcessor

NewOptimizedStreamProcessor creates an optimized processor

type PerformanceMetrics

type PerformanceMetrics struct {
	RecordsPerSecond  float64
	BytesPerSecond    float64
	AvgBatchTime      float64
	DatabaseWriteTime float64
	XMLParseTime      float64
	NetworkTime       float64
	TotalMemoryUsed   int64
	GCPauses          int
	CPUUtilization    float64
}

PerformanceMetrics tracks detailed performance metrics

type PlatformHandler

type PlatformHandler struct {
	// contains filtered or unexported fields
}

PlatformHandler handles platform enumeration extraction and validation

func NewPlatformHandler

func NewPlatformHandler() *PlatformHandler

NewPlatformHandler creates a new platform handler with initialized maps

func (*PlatformHandler) ExtractPlatformDetails

func (ph *PlatformHandler) ExtractPlatformDetails(platform parser.Platform) (string, string)

ExtractPlatformDetails extracts platform and model from experiment

func (*PlatformHandler) GetAllPlatforms

func (ph *PlatformHandler) GetAllPlatforms() []string

GetAllPlatforms returns list of all supported platforms

func (*PlatformHandler) GetModelsForPlatform

func (ph *PlatformHandler) GetModelsForPlatform(platform string) []string

GetModelsForPlatform returns list of valid models for a platform

func (*PlatformHandler) GetPlatformStatistics

func (ph *PlatformHandler) GetPlatformStatistics() map[string]int

GetPlatformStatistics returns counts of each platform type in experiments

func (*PlatformHandler) ValidateLibrarySelection

func (ph *PlatformHandler) ValidateLibrarySelection(selection string) error

ValidateLibrarySelection validates a library selection method

func (*PlatformHandler) ValidateLibrarySource

func (ph *PlatformHandler) ValidateLibrarySource(source string) error

ValidateLibrarySource validates a library source

func (*PlatformHandler) ValidateLibraryStrategy

func (ph *PlatformHandler) ValidateLibraryStrategy(strategy string) error

ValidateLibraryStrategy validates a library strategy

func (*PlatformHandler) ValidatePlatform

func (ph *PlatformHandler) ValidatePlatform(platform, model string) error

ValidatePlatform checks if platform and model are valid

type PoolHandler

type PoolHandler struct {
	// contains filtered or unexported fields
}

PoolHandler manages sample pooling and multiplexing relationships

func NewPoolHandler

func NewPoolHandler(db Database) *PoolHandler

NewPoolHandler creates a new pool handler

func (*PoolHandler) ExtractBarcodeInfo

func (ph *PoolHandler) ExtractBarcodeInfo(attrs []parser.Attribute) map[string]string

ExtractBarcodeInfo extracts barcode information from experiment attributes

func (*PoolHandler) ExtractPoolRelationships

func (ph *PoolHandler) ExtractPoolRelationships(exp parser.Experiment) ([]SamplePool, error)

ExtractPoolRelationships extracts pool relationships from an experiment

func (*PoolHandler) GetPoolMembers

func (ph *PoolHandler) GetPoolMembers(parentSample string) ([]SamplePool, error)

GetPoolMembers retrieves all members of a pool

func (*PoolHandler) GetPoolStatistics

func (ph *PoolHandler) GetPoolStatistics() (*PoolStats, error)

GetPoolStatistics returns pool statistics

func (*PoolHandler) StorePoolRelationships

func (ph *PoolHandler) StorePoolRelationships(relationships []SamplePool) error

StorePoolRelationships stores pool relationships in the database

func (*PoolHandler) ValidatePoolProportions

func (ph *PoolHandler) ValidatePoolProportions(relationships []SamplePool) error

ValidatePoolProportions validates that pool member proportions sum to 1.0

type PoolStats

type PoolStats struct {
	TotalPools        int
	AveragePoolSize   float64
	MaxPoolSize       int
	PoolsWithBarcodes int
}

PoolStats contains pool statistics

type ProcessorOptions

type ProcessorOptions struct {
	BatchSize         int  // Records per batch (default: 5000)
	UseTransactions   bool // Use DB transactions (default: true)
	EnableCompression bool // Enable response compression (default: true)
	WorkerCount       int  // Parallel workers (default: 4)
	BufferSize        int  // Read buffer size (default: 64KB)
}

ProcessorOptions contains configuration for optimized processing

func DefaultProcessorOptions

func DefaultProcessorOptions() ProcessorOptions

DefaultProcessorOptions returns optimized default options

type Progress

type Progress struct {
	BytesProcessed         int64
	TotalBytes             int64
	RecordsProcessed       int64
	CurrentFile            string
	PercentComplete        float64
	BytesPerSecond         float64
	TimeElapsed            time.Duration
	EstimatedTimeRemaining time.Duration
}

Progress contains information about the current processing progress

type ProgressFunc

type ProgressFunc func(progress Progress)

ProgressFunc is called periodically with progress updates

type ResumableProcessor

type ResumableProcessor struct {
	*StreamProcessor
	// contains filtered or unexported fields
}

ResumableProcessor extends StreamProcessor with resume capabilities

func NewResumableProcessor

func NewResumableProcessor(db Database) (*ResumableProcessor, error)

NewResumableProcessor creates a processor with resume capabilities

func (*ResumableProcessor) ProcessFileWithResume

func (rp *ResumableProcessor) ProcessFileWithResume(ctx context.Context, filePath string, opts ResumeOptions) error

ProcessFileWithResume processes a local file with resume support

func (*ResumableProcessor) ProcessURLWithResume

func (rp *ResumableProcessor) ProcessURLWithResume(ctx context.Context, url string, opts ResumeOptions) error

ProcessURLWithResume processes a URL with resume support

type ResumeOptions

type ResumeOptions struct {
	ForceRestart    bool          // Force fresh start even if progress exists
	Interactive     bool          // Ask user about resume
	CheckpointEvery time.Duration // How often to checkpoint
	MaxRetries      int           // Maximum retry attempts
	RetryDelay      time.Duration // Initial retry delay
}

ResumeOptions configures resume behavior

type SamplePool

type SamplePool struct {
	PoolID       int
	ParentSample string
	MemberSample string
	MemberName   string
	Proportion   float64
	ReadLabel    string
	Barcode      string
	BarcodeRead  int
}

SamplePool represents a pool relationship

type StreamProcessor

type StreamProcessor struct {
	// contains filtered or unexported fields
}

StreamProcessor handles streaming processing of tar.gz files from HTTP

func NewStreamProcessor

func NewStreamProcessor(db Database) *StreamProcessor

NewStreamProcessor creates a new stream processor

func (*StreamProcessor) GetStats

func (sp *StreamProcessor) GetStats() map[string]interface{}

GetStats returns processing statistics

func (*StreamProcessor) ProcessFile

func (sp *StreamProcessor) ProcessFile(ctx context.Context, filePath string) error

ProcessFile streams and processes a local tar.gz file

func (*StreamProcessor) ProcessURL

func (sp *StreamProcessor) ProcessURL(ctx context.Context, url string) error

ProcessURL streams and processes a tar.gz file from the given URL

func (*StreamProcessor) SetProgressFunc

func (sp *StreamProcessor) SetProgressFunc(f ProgressFunc)

SetProgressFunc sets the progress callback function

type StructuredIdentifier

type StructuredIdentifier struct {
	RecordType      string // study, sample, experiment, run, analysis
	RecordAccession string
	IDType          string // primary, secondary, external, submitter, uuid
	IDNamespace     string
	IDValue         string
	IDLabel         string
}

StructuredIdentifier represents a normalized identifier

type StructuredLink struct {
	RecordType      string
	RecordAccession string
	LinkType        string // url, xref, entrez
	DB              string
	ID              string
	Label           string
	URL             string
	Query           string
}

StructuredLink represents a normalized link

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL