From 7323f675635c63c8db0ec87f55457d37f2fcc8a0 Mon Sep 17 00:00:00 2001 From: Madhava Jay Date: Fri, 17 Apr 2026 10:31:30 +1000 Subject: [PATCH] adding vcf --- rust/bioscript-formats/src/genotype.rs | 423 +++++++++++++++++-- rust/bioscript-formats/tests/file_formats.rs | 71 ++++ tools/fetch_test_data.sh | 46 +- tools/sources.yaml | 12 + 4 files changed, 515 insertions(+), 37 deletions(-) diff --git a/rust/bioscript-formats/src/genotype.rs b/rust/bioscript-formats/src/genotype.rs index 794f5f8..7571e83 100644 --- a/rust/bioscript-formats/src/genotype.rs +++ b/rust/bioscript-formats/src/genotype.rs @@ -57,6 +57,7 @@ pub struct GenotypeStore { enum QueryBackend { RsidMap(RsidMapBackend), Delimited(DelimitedBackend), + Vcf(VcfBackend), Cram(CramBackend), } @@ -73,6 +74,11 @@ struct DelimitedBackend { zip_entry_name: Option, } +#[derive(Debug, Clone)] +struct VcfBackend { + path: PathBuf, +} + #[derive(Debug, Clone)] struct CramBackend { path: PathBuf, @@ -137,23 +143,17 @@ impl GenotypeStore { None, )), GenotypeSourceFormat::Zip => Self::from_zip_file(path), - GenotypeSourceFormat::Vcf => Self::from_vcf_file(path), + GenotypeSourceFormat::Vcf => Ok(Self::from_vcf_file(path)), GenotypeSourceFormat::Cram => Self::from_cram_file(path, options), } } - fn from_vcf_file(path: &Path) -> Result { - let lines = if path - .extension() - .and_then(|ext| ext.to_str()) - .is_some_and(|ext| ext.eq_ignore_ascii_case("gz")) - { - read_bgzf_lines(path)? - } else { - read_plain_lines(path)? - }; - - Self::from_vcf_lines(lines) + fn from_vcf_file(path: &Path) -> Self { + Self { + backend: QueryBackend::Vcf(VcfBackend { + path: path.to_path_buf(), + }), + } } fn from_zip_file(path: &Path) -> Result { @@ -265,6 +265,10 @@ impl GenotypeStore { rsid_lookup: true, locus_lookup: true, }, + QueryBackend::Vcf(_) => BackendCapabilities { + rsid_lookup: true, + locus_lookup: true, + }, QueryBackend::Cram(_) => BackendCapabilities { rsid_lookup: false, locus_lookup: true, @@ -284,6 +288,7 @@ impl GenotypeStore { match &self.backend { QueryBackend::RsidMap(map) => map.backend_name(), QueryBackend::Delimited(backend) => backend.backend_name(), + QueryBackend::Vcf(backend) => backend.backend_name(), QueryBackend::Cram(backend) => backend.backend_name(), } } @@ -292,6 +297,7 @@ impl GenotypeStore { match &self.backend { QueryBackend::RsidMap(map) => Ok(map.values.get(rsid).cloned()), QueryBackend::Delimited(backend) => backend.get(rsid), + QueryBackend::Vcf(backend) => backend.get(rsid), QueryBackend::Cram(backend) => backend .lookup_variant(&VariantSpec { rsids: vec![rsid.to_owned()], @@ -308,6 +314,7 @@ impl GenotypeStore { match &self.backend { QueryBackend::RsidMap(map) => map.lookup_variant(variant), QueryBackend::Delimited(backend) => backend.lookup_variant(variant), + QueryBackend::Vcf(backend) => backend.lookup_variant(variant), QueryBackend::Cram(backend) => backend.lookup_variant(variant), } } @@ -319,6 +326,9 @@ impl GenotypeStore { if let QueryBackend::Delimited(backend) = &self.backend { return backend.lookup_variants(variants); } + if let QueryBackend::Vcf(backend) = &self.backend { + return backend.lookup_variants(variants); + } let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect(); indexed.sort_by_cached_key(|(_, variant)| variant_sort_key(variant)); @@ -400,6 +410,32 @@ impl DelimitedBackend { } } +impl VcfBackend { + fn backend_name(&self) -> &'static str { + "vcf" + } + + fn get(&self, rsid: &str) -> Result, RuntimeError> { + let results = self.lookup_variants(&[VariantSpec { + rsids: vec![rsid.to_owned()], + ..VariantSpec::default() + }])?; + Ok(results.into_iter().next().and_then(|obs| obs.genotype)) + } + + fn lookup_variant(&self, variant: &VariantSpec) -> Result { + let mut results = self.lookup_variants(std::slice::from_ref(variant))?; + Ok(results.pop().unwrap_or_default()) + } + + fn lookup_variants( + &self, + variants: &[VariantSpec], + ) -> Result, RuntimeError> { + scan_vcf_variants(self, variants) + } +} + impl CramBackend { fn backend_name(&self) -> &'static str { "cram" @@ -1388,6 +1424,356 @@ fn split_csv_line(line: &str) -> Vec { fields } +#[derive(Debug, Clone)] +struct ParsedVcfRow { + rsid: Option, + chrom: String, + position: i64, + reference: String, + alternates: Vec, + genotype: String, +} + +fn scan_vcf_variants( + backend: &VcfBackend, + variants: &[VariantSpec], +) -> Result, RuntimeError> { + let mut indexed: Vec<(usize, &VariantSpec)> = variants.iter().enumerate().collect(); + indexed.sort_by_cached_key(|(_, variant)| variant_sort_key(variant)); + + let mut probe_lines = Vec::new(); + let detected_assembly = { + let file = File::open(&backend.path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open VCF file {}: {err}", + backend.path.display() + )) + })?; + let mut reader: Box = if is_bgzf_path(&backend.path) { + Box::new(BufReader::new(bgzf::io::Reader::new(file))) + } else { + Box::new(BufReader::new(file)) + }; + + let mut buf = String::new(); + for _ in 0..256 { + buf.clear(); + let bytes = reader.read_line(&mut buf).map_err(|err| { + RuntimeError::Io(format!( + "failed to read VCF file {}: {err}", + backend.path.display() + )) + })?; + if bytes == 0 { + break; + } + let line = buf.trim_end_matches(['\n', '\r']).to_owned(); + let stop = line.starts_with("#CHROM\t"); + probe_lines.push(line); + if stop { + break; + } + } + + detect_vcf_assembly(&backend.path, &probe_lines) + }; + + let mut rsid_targets: HashMap> = HashMap::new(); + let mut coord_targets: HashMap<(String, i64), Vec> = HashMap::new(); + let mut results = vec![VariantObservation::default(); variants.len()]; + let mut unresolved = variants.len(); + + for (idx, variant) in &indexed { + for rsid in &variant.rsids { + rsid_targets.entry(rsid.clone()).or_default().push(*idx); + } + + if let Some(locus) = choose_variant_locus_for_assembly(variant, detected_assembly) { + let chrom = normalize_chromosome_name(&locus.chrom); + coord_targets + .entry((chrom.clone(), locus.start)) + .or_default() + .push(*idx); + if matches!( + variant.kind, + Some(VariantKind::Deletion | VariantKind::Insertion | VariantKind::Indel) + ) { + let anchor = locus.start.saturating_sub(1); + coord_targets.entry((chrom, anchor)).or_default().push(*idx); + } + } + } + + let file = File::open(&backend.path).map_err(|err| { + RuntimeError::Io(format!( + "failed to open VCF file {}: {err}", + backend.path.display() + )) + })?; + let mut reader: Box = if is_bgzf_path(&backend.path) { + Box::new(BufReader::new(bgzf::io::Reader::new(file))) + } else { + Box::new(BufReader::new(file)) + }; + + let mut buf = String::new(); + loop { + buf.clear(); + let bytes = reader.read_line(&mut buf).map_err(|err| { + RuntimeError::Io(format!( + "failed to read VCF file {}: {err}", + backend.path.display() + )) + })?; + if bytes == 0 || unresolved == 0 { + break; + } + if let Some(row) = parse_vcf_record(buf.trim_end_matches(['\n', '\r']))? { + resolve_vcf_row( + backend, + &row, + variants, + detected_assembly, + &rsid_targets, + &coord_targets, + &mut results, + &mut unresolved, + ); + } + } + + for (idx, variant) in indexed { + if results[idx].genotype.is_none() { + results[idx] = VariantObservation { + backend: backend.backend_name().to_owned(), + assembly: detected_assembly, + evidence: vec![format!( + "no matching rsid or locus found for {}", + describe_query(variant) + )], + ..VariantObservation::default() + }; + } + } + + Ok(results) +} + +fn resolve_vcf_row( + backend: &VcfBackend, + row: &ParsedVcfRow, + variants: &[VariantSpec], + detected_assembly: Option, + rsid_targets: &HashMap>, + coord_targets: &HashMap<(String, i64), Vec>, + results: &mut [VariantObservation], + unresolved: &mut usize, +) { + if let Some(rsid) = row.rsid.as_ref() + && let Some(target_indexes) = rsid_targets.get(rsid) + { + for &target_idx in target_indexes { + if results[target_idx].genotype.is_none() { + results[target_idx] = VariantObservation { + backend: backend.backend_name().to_owned(), + matched_rsid: Some(rsid.clone()), + assembly: detected_assembly, + genotype: Some(row.genotype.clone()), + evidence: vec![format!("resolved by rsid {rsid}")], + ..VariantObservation::default() + }; + *unresolved = (*unresolved).saturating_sub(1); + } + } + } + + if *unresolved == 0 { + return; + } + + let key = (normalize_chromosome_name(&row.chrom), row.position); + if let Some(target_indexes) = coord_targets.get(&key) { + for &target_idx in target_indexes { + if results[target_idx].genotype.is_none() + && vcf_row_matches_variant(row, &variants[target_idx], detected_assembly) + { + results[target_idx] = VariantObservation { + backend: backend.backend_name().to_owned(), + matched_rsid: row.rsid.clone(), + assembly: detected_assembly, + genotype: Some(row.genotype.clone()), + evidence: vec![format!("resolved by locus {}:{}", row.chrom, row.position)], + ..VariantObservation::default() + }; + *unresolved = (*unresolved).saturating_sub(1); + } + } + } +} + +fn parse_vcf_record(line: &str) -> Result, RuntimeError> { + let trimmed = line.trim(); + if trimmed.is_empty() || trimmed.starts_with('#') { + return Ok(None); + } + + let fields: Vec<&str> = trimmed.split('\t').collect(); + if fields.len() < 10 { + return Ok(None); + } + + let chrom = fields[0].trim(); + let position = fields[1].trim().parse::().map_err(|err| { + RuntimeError::Io(format!("failed to parse VCF position '{}': {err}", fields[1].trim())) + })?; + let rsid = { + let value = fields[2].trim(); + (!value.is_empty() && value != ".").then(|| value.to_owned()) + }; + let reference = fields[3].trim(); + if reference.is_empty() || reference == "." { + return Ok(None); + } + + let alternates: Vec = fields[4] + .split(',') + .map(str::trim) + .filter(|alt| !alt.is_empty() && *alt != ".") + .map(ToOwned::to_owned) + .collect(); + if alternates.is_empty() { + return Ok(None); + } + + let genotype = extract_vcf_sample_genotype(fields[8], fields[9], reference, &alternates) + .unwrap_or_else(|| "--".to_owned()); + + Ok(Some(ParsedVcfRow { + rsid, + chrom: chrom.to_owned(), + position, + reference: reference.to_owned(), + alternates, + genotype, + })) +} + +fn extract_vcf_sample_genotype( + format_field: &str, + sample_field: &str, + reference: &str, + alternates: &[String], +) -> Option { + let gt_index = format_field + .split(':') + .position(|field| field.eq_ignore_ascii_case("GT"))?; + let sample_parts: Vec<&str> = sample_field.split(':').collect(); + let sample_gt = sample_parts.get(gt_index).copied().unwrap_or("."); + let alternate_refs: Vec<&str> = alternates.iter().map(String::as_str).collect(); + genotype_from_vcf_gt(sample_gt, reference, &alternate_refs) +} + +fn detect_vcf_assembly(path: &Path, probe_lines: &[String]) -> Option { + let combined = probe_lines.join("\n").to_ascii_lowercase(); + if combined.contains("assembly=b37") + || combined.contains("assembly=grch37") + || combined.contains("assembly=hg19") + || combined.contains("reference=grch37") + || combined.contains("reference=hg19") + { + return Some(Assembly::Grch37); + } + if combined.contains("assembly=b38") + || combined.contains("assembly=grch38") + || combined.contains("assembly=hg38") + || combined.contains("reference=grch38") + || combined.contains("reference=hg38") + { + return Some(Assembly::Grch38); + } + + let lower = path.to_string_lossy().to_ascii_lowercase(); + if lower.contains("grch37") || lower.contains("hg19") || lower.contains("b37") { + Some(Assembly::Grch37) + } else if lower.contains("grch38") || lower.contains("hg38") || lower.contains("b38") { + Some(Assembly::Grch38) + } else { + None + } +} + +fn choose_variant_locus_for_assembly( + variant: &VariantSpec, + assembly: Option, +) -> Option { + match assembly { + Some(Assembly::Grch37) => variant + .grch37 + .clone() + .or_else(|| variant.grch38.clone()), + Some(Assembly::Grch38) => variant + .grch38 + .clone() + .or_else(|| variant.grch37.clone()), + None => variant + .grch37 + .clone() + .or_else(|| variant.grch38.clone()), + } +} + +fn normalize_chromosome_name(value: &str) -> String { + value.trim() + .trim_start_matches("chr") + .to_ascii_lowercase() +} + +fn vcf_row_matches_variant( + row: &ParsedVcfRow, + variant: &VariantSpec, + assembly: Option, +) -> bool { + let Some(locus) = choose_variant_locus_for_assembly(variant, assembly) else { + return false; + }; + + if normalize_chromosome_name(&row.chrom) != normalize_chromosome_name(&locus.chrom) { + return false; + } + + match variant.kind.unwrap_or(VariantKind::Other) { + VariantKind::Snp => { + row.position == locus.start + && variant + .reference + .as_ref() + .is_none_or(|reference| reference.eq_ignore_ascii_case(&row.reference)) + && variant.alternate.as_ref().is_none_or(|alternate| { + row.alternates + .iter() + .any(|candidate| candidate.eq_ignore_ascii_case(alternate)) + }) + } + VariantKind::Deletion => { + let expected_len = variant.deletion_length.unwrap_or(0); + row.position == locus.start.saturating_sub(1) + && row.alternates.iter().any(|alternate| { + let actual_len = row.reference.len().saturating_sub(alternate.len()); + (expected_len == 0 || actual_len == expected_len) + && alternate.len() < row.reference.len() + }) + } + VariantKind::Insertion | VariantKind::Indel => row.position == locus.start.saturating_sub(1), + VariantKind::Other => row.position == locus.start, + } +} + +fn is_bgzf_path(path: &Path) -> bool { + path.extension() + .and_then(|ext| ext.to_str()) + .is_some_and(|ext| ext.eq_ignore_ascii_case("gz")) +} + fn read_plain_lines(path: &Path) -> Result, RuntimeError> { let file = File::open(path).map_err(|err| { RuntimeError::Io(format!( @@ -1778,17 +2164,6 @@ fn find_header_index(header: &[String], aliases: &[&str]) -> Option { }) } -fn read_bgzf_lines(path: &Path) -> Result, RuntimeError> { - let file = File::open(path).map_err(|err| { - RuntimeError::Io(format!( - "failed to open genotype file {}: {err}", - path.display() - )) - })?; - let reader = bgzf::io::Reader::new(file); - read_lines_from_reader(BufReader::new(reader), path) -} - fn read_lines_from_reader( mut reader: R, path: &Path, diff --git a/rust/bioscript-formats/tests/file_formats.rs b/rust/bioscript-formats/tests/file_formats.rs index f060935..8233136 100644 --- a/rust/bioscript-formats/tests/file_formats.rs +++ b/rust/bioscript-formats/tests/file_formats.rs @@ -264,6 +264,77 @@ fn vcf_variant_lookup_ignores_symbolic_non_ref_alt_when_decoding_gt() { assert_eq!(store.get("rs9357296").unwrap().as_deref(), Some("AG")); } +#[test] +fn real_world_clean_vcf_supports_locus_lookup_without_rsids() { + let test_name = "real_world_clean_vcf_supports_locus_lookup_without_rsids"; + let Some(clean_vcf) = shared_fixture_or_skip(test_name, "1k-genomes/vcf/NA06985.clean.vcf.gz") + else { + return; + }; + let Some(original_vcf) = shared_fixture_or_skip(test_name, "1k-genomes/vcf/NA06985.vcf.gz") + else { + return; + }; + + let clean_store = GenotypeStore::from_file(&clean_vcf).expect("open cleaned VCF"); + let original_store = GenotypeStore::from_file(&original_vcf).expect("open original VCF"); + + let queries = [ + ( + VariantSpec { + grch37: Some(bioscript_core::GenomicLocus { + chrom: "1".to_owned(), + start: 12_783, + end: 12_783, + }), + reference: Some("G".to_owned()), + alternate: Some("A".to_owned()), + kind: Some(VariantKind::Snp), + ..VariantSpec::default() + }, + "GA", + ), + ( + VariantSpec { + grch37: Some(bioscript_core::GenomicLocus { + chrom: "1".to_owned(), + start: 13_110, + end: 13_110, + }), + reference: Some("G".to_owned()), + alternate: Some("A".to_owned()), + kind: Some(VariantKind::Snp), + ..VariantSpec::default() + }, + "GA", + ), + ( + VariantSpec { + rsids: vec!["rs78601809".to_owned()], + grch37: Some(bioscript_core::GenomicLocus { + chrom: "1".to_owned(), + start: 15_211, + end: 15_211, + }), + reference: Some("T".to_owned()), + alternate: Some("G".to_owned()), + kind: Some(VariantKind::Snp), + ..VariantSpec::default() + }, + "GG", + ), + ]; + + for (query, expected_genotype) in queries { + let clean = clean_store.lookup_variant(&query).expect("clean lookup"); + let original = original_store.lookup_variant(&query).expect("original lookup"); + + assert_eq!(clean.backend, "vcf"); + assert_eq!(clean.genotype.as_deref(), Some(expected_genotype)); + assert_eq!(original.genotype, clean.genotype); + } +} + struct CramFixture { cram: PathBuf, reference: PathBuf, diff --git a/tools/fetch_test_data.sh b/tools/fetch_test_data.sh index 19b7e31..2c66624 100755 --- a/tools/fetch_test_data.sh +++ b/tools/fetch_test_data.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash set -euo pipefail -# Fetch test data defined in test-data/sources.yaml. +# Fetch test data defined in tools/sources.yaml. # Only downloads files that are not already present locally. # # Usage: @@ -51,15 +51,19 @@ ensure_parent_dir() { mkdir -p "$(dirname "$1")" } +is_split_archive_part() { + case "$1" in + *.tar.gz.??) return 0 ;; + *) return 1 ;; + esac +} + reconstruct_split_archive() { local cache_dir="$1" local repo_dir="$2" local filename="$3" - case "$filename" in - *.tar.gz.??) ;; - *) return 0 ;; - esac + is_split_archive_part "$filename" || return 0 local archive_name="${filename%.[A-Za-z][A-Za-z]}" local final_name="${archive_name%.tar.gz}" @@ -90,18 +94,32 @@ reconstruct_split_archive() { local tmpdir tmpdir="$(mktemp -d "${cache_dir}/reconstruct.XXXXXX")" - if cat "${parts[@]}" | tar xzf - -C "$tmpdir"; then - local extracted - extracted="$(find "$tmpdir" -type f | head -n 1)" - if [ -n "$extracted" ] && [ -f "$extracted" ]; then - mv "$extracted" "$final_cache_path" - ensure_repo_symlink "$final_cache_path" "$final_repo_path" + if cat "${parts[@]}" | tar tzf - >/dev/null 2>&1; then + if cat "${parts[@]}" | tar xzf - -C "$tmpdir"; then + local extracted + extracted="$(find "$tmpdir" -type f | head -n 1)" + if [ -n "$extracted" ] && [ -f "$extracted" ]; then + mv "$extracted" "$final_cache_path" + ensure_repo_symlink "$final_cache_path" "$final_repo_path" + fi fi fi rm -rf "$tmpdir" } +ensure_repo_materialized() { + local cache_path="$1" + local repo_path="$2" + local filename="$3" + + if is_split_archive_part "$filename"; then + return 0 + fi + + ensure_repo_symlink "$cache_path" "$repo_path" +} + ensure_repo_symlink() { local cache_path="$1" local repo_path="$2" @@ -174,6 +192,8 @@ for sample_name, sample in cfg.get('sample_data_urls', {}).items(): ('ref_index', 'ref'), ('aligned', 'aligned'), ('aligned_index', 'aligned'), + ('vcf', 'vcf'), + ('vcf_index', 'vcf'), ('snp', 'snp'), ): value = sample.get(key) @@ -237,7 +257,7 @@ Filter examples: --dataset 23andme --only "*v5*" Only the v5 23andMe file Datasets: - 1k-genomes GRCh38 reference genome + NA06985 high-coverage CRAM + 1k-genomes GRCh38 reference genome + NA06985 CRAM + cleaned NA06985 VCF 23andme 23andMe SNP exports (v2-v5) from OpenMined biovault-data dynamicdna Dynamic DNA GSAv3-DTC synthetic export on GRCh38 @@ -330,7 +350,7 @@ while IFS=$'\t' read -r dataset subdir filename url; do echo " ${url}" if curl -fSL --progress-bar --retry 3 --retry-delay 5 -o "$cache_path" "$url"; then - ensure_repo_symlink "$cache_path" "$repo_path" + ensure_repo_materialized "$cache_path" "$repo_path" "$filename" reconstruct_split_archive "$cache_dir" "$repo_dir" "$filename" size=$(file_size "$cache_path") info "${label} ($(human_size "$size"))" diff --git a/tools/sources.yaml b/tools/sources.yaml index e9435c4..2be9d8b 100644 --- a/tools/sources.yaml +++ b/tools/sources.yaml @@ -24,6 +24,18 @@ datasets: - name: "NA06985.final.cram.crai" url: "https://ftp-trace.ncbi.nih.gov/1000genomes/ftp/1000G_2504_high_coverage/data/ERR3239276/NA06985.final.cram.crai" description: "CRAM index" + vcf: + description: "NA06985 cleaned VCF on GRCh37" + files: + - name: "NA06985.clean.vcf.gz.tar.gz.aa" + url: "https://raw.githubusercontent.com/OpenMined/biovault-data/main/vcf/NA06985/NA06985.clean.vcf.gz.tar.gz.aa" + description: "First split archive chunk containing the cleaned NA06985 VCF" + - name: "NA06985.clean.vcf.gz.tar.gz.ab" + url: "https://raw.githubusercontent.com/OpenMined/biovault-data/main/vcf/NA06985/NA06985.clean.vcf.gz.tar.gz.ab" + description: "Second split archive chunk containing the cleaned NA06985 VCF" + - name: "NA06985.clean.vcf.gz.tbi" + url: "https://raw.githubusercontent.com/OpenMined/biovault-data/main/vcf/NA06985/NA06985.clean.vcf.gz.tbi" + description: "Tabix index for the cleaned NA06985 VCF" 23andme: description: "23andMe SNP genotype files from OpenMined biovault-data (PGP participants)"