Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/dcd_mapping/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ def _get_blat_output(
output = parse_blat(out_file, "blat-psl")

except ValueError:
_logger.debug(
"Initial BLAT parse failed for %s, retrying with -q=dnax -t=dnax",
metadata.urn,
exc_info=True,
)
target_args = "-q=dnax -t=dnax"
process_result = _run_blat(
target_args, query_file, reference_genome_file, "/dev/stdout", silent
Expand All @@ -209,6 +214,7 @@ def _get_blat_output(
output = parse_blat(out_file, "blat-psl")
except ValueError as e:
msg = f"Unable to run successful BLAT on {metadata.urn}"
_logger.exception(msg=msg, exc_info=e)
raise AlignmentError(msg) from e

return output
Expand Down
43 changes: 41 additions & 2 deletions src/dcd_mapping/lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,39 @@ def cdot_rest() -> RESTDataProvider:
# ---------------------------------- Global ---------------------------------- #


class _TimeoutUtaDatabase(UtaDatabase):
"""UtaDatabase subclass with an increased command timeout.

The upstream default is 60s, which can be insufficient for range queries
against the public UTA server's ``tx_exon_aln_v`` view.
"""

async def create_pool(self) -> None:
"""Create connection pool with a 5-minute command timeout."""
if not self._connection_pool:
import asyncpg

self.args = self._get_conn_args()
try:
self._connection_pool = await asyncpg.create_pool(
min_size=1,
max_size=10,
max_inactive_connection_lifetime=3,
command_timeout=300,
host=self.args["host"],
port=self.args["port"],
user=self.args["user"],
password=self.args["password"],
database=self.args["database"],
)
except asyncpg.InterfaceError as e:
_logger.error(
"While creating connection pool, encountered exception %s", e
)
msg = "Could not create connection pool"
raise Exception(msg) from e


class CoolSeqToolBuilder:
"""Singleton constructor for ``cool-seq-tool`` instance."""

Expand All @@ -126,7 +159,7 @@ def derive_refget_accession(self, ac: str) -> str | None:
try:
aliases = self.translate_sequence_identifier(ac, namespace="ga4gh")
except KeyError:
_logger.error("KeyError when getting refget accession: %s", ac)
_logger.exception("KeyError when getting refget accession: %s", ac)
else:
if aliases:
refget_accession = aliases[0].split("ga4gh:")[-1]
Expand All @@ -152,7 +185,7 @@ def __init__(
self.mane_transcript_mappings = ManeTranscriptMappings(
mane_data_path=mane_data_path
)
self.uta_db = UtaDatabase(db_url=db_url)
self.uta_db = _TimeoutUtaDatabase(db_url=db_url)
self.alignment_mapper = AlignmentMapper(
self.seqrepo_access, self.transcript_mappings, self.uta_db
)
Expand Down Expand Up @@ -263,6 +296,9 @@ async def get_protein_accession(transcript: str) -> str | None:
""" # noqa: S608
result = await uta.execute_query(query)
except Exception as e:
_logger.exception(
"Failed to get protein accession for transcript %s", transcript
)
raise DataLookupError from e
if result:
return result[0]["pro_ac"]
Expand Down Expand Up @@ -291,6 +327,9 @@ async def get_transcripts(
""" # noqa: S608
result = await uta.execute_query(query)
except Exception as e:
_logger.exception(
"Failed to get transcripts for %s:%d-%d", chromosome_ac, start, end
)
raise DataLookupError from e

return [(row["tx_ac"], row["hgnc"]) for row in result]
Expand Down
43 changes: 30 additions & 13 deletions src/dcd_mapping/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async def _check_data_prereqs(silent: bool) -> None:
try:
await check_uta()
except Exception:
_logger.exception("UTA check failed")
success = False
_emit_info(
"* UTA appears to be unavailable. Check the logs for more information. For troubleshooting, we recommend checking the UTA readme (https://github.com/biocommons/uta?tab=readme-ov-file#installing-uta-locally) and the Cool-Seq-Tool installation instructions (https://coolseqtool.readthedocs.io/0.4.0-dev3/install.html#set-up-uta). Remember that the UTA connection is configurable via a libpq URI provided under the environment variable UTA_DB_URL (see Cool-Seq-Tool docs: https://coolseqtool.readthedocs.io/0.4.0-dev3/usage.html#environment-configuration) -- otherwise, by default it attempts a connection to `postgresql://uta_admin:uta@localhost:5433/uta/uta_20210129b`.",
Expand All @@ -77,6 +78,7 @@ async def _check_data_prereqs(silent: bool) -> None:
try:
check_seqrepo()
except Exception:
_logger.exception("SeqRepo check failed")
success = False
_emit_info(
"* SeqRepo appears inaccessible or unusable. Check the logs for more information. Ensure that a local SeqRepo snapshot has been downloaded (it should've taken a while -- see https://github.com/biocommons/biocommons.seqrepo?tab=readme-ov-file#requirements), that it's located either at `/usr/local/share/seqrepo/latest` or at the location designated by the `SEQREPO_ROOT_DIR` environment variable, and that it's writeable (see https://github.com/biocommons/biocommons.seqrepo/blob/main/docs/store.rst).",
Expand All @@ -86,6 +88,7 @@ async def _check_data_prereqs(silent: bool) -> None:
try:
check_gene_normalizer()
except Exception:
_logger.exception("Gene Normalizer check failed")
success = False
_emit_info(
"* Gene Normalizer appears to be unavailable. Check the logs for more information. Note that a data snapshot needs to be acquired, or the data update routine must be routine (this should've taken at least a few seconds, if not several minutes). For troubleshooting, review the Gene Normalizer installation instructions and documentation: https://gene-normalizer.readthedocs.io/0.3.0-dev1/install.html",
Expand Down Expand Up @@ -168,20 +171,22 @@ async def map_scoreset(
try:
# dictionary where keys are target gene labels or accession ids, and values are alignment result objects
alignment_results = build_alignment_result(metadata, silent)
except BlatNotFoundError as e:
except BlatNotFoundError:
msg = "BLAT command appears missing. Ensure it is available on the $PATH or use the environment variable BLAT_BIN_PATH to point to it. See instructions in the README prerequisites section for more."
_emit_info(msg, silent, logging.ERROR)
raise e
raise
except ResourceAcquisitionError as e:
_emit_info(f"BLAT resource could not be acquired: {e}", silent, logging.ERROR)
raise e
raise
except AlignmentError as e:
_emit_info(
f"Alignment failed for scoreset {metadata.urn} {e}", silent, logging.ERROR
)
final_output = write_scoreset_mapping_to_json(
metadata.urn,
ScoresetMapping(metadata=metadata, error_message=str(e).strip("'")),
ScoresetMapping(
metadata=metadata, error_message=f"{type(e).__name__}: {e}"
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
Expand All @@ -190,7 +195,9 @@ async def map_scoreset(
_emit_info(f"Score set not supported: {e}", silent, logging.ERROR)
final_output = write_scoreset_mapping_to_json(
metadata.urn,
ScoresetMapping(metadata=metadata, error_message=str(e).strip("'")),
ScoresetMapping(
metadata=metadata, error_message=f"{type(e).__name__}: {e}"
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
Expand All @@ -211,14 +218,14 @@ async def map_scoreset(
silent,
logging.ERROR,
)
raise e
raise
except DataLookupError as e:
_emit_info(
f"Data lookup error occurred during transcript selection: {e}",
silent,
logging.ERROR,
)
raise e
raise
_emit_info("Reference selection complete.", silent)

_emit_info("Mapping to VRS...", silent)
Expand Down Expand Up @@ -252,7 +259,9 @@ async def map_scoreset(
)
final_output = write_scoreset_mapping_to_json(
metadata.urn,
ScoresetMapping(metadata=metadata, error_message=str(e).strip("'")),
ScoresetMapping(
metadata=metadata, error_message=f"{type(e).__name__}: {e}"
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
Expand Down Expand Up @@ -286,14 +295,17 @@ async def map_scoreset(
vrs_version,
)
except Exception as e:
_logger.exception("VRS annotation failed for scoreset %s", metadata.urn)
_emit_info(
f"VRS annotation failed for scoreset {metadata.urn}",
silent,
logging.ERROR,
)
final_output = write_scoreset_mapping_to_json(
metadata.urn,
ScoresetMapping(metadata=metadata, error_message=str(e).strip("'")),
ScoresetMapping(
metadata=metadata, error_message=f"{type(e).__name__}: {e}"
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
Expand Down Expand Up @@ -323,14 +335,19 @@ async def map_scoreset(
output_path,
)
except Exception as e:
_logger.exception(
"Error in creating or saving final score set mapping for %s", metadata.urn
)
_emit_info(
f"Error in creating or saving final score set mapping for {metadata.urn} {e}",
silent,
logging.ERROR,
)
final_output = write_scoreset_mapping_to_json(
metadata.urn,
ScoresetMapping(metadata=metadata, error_message=str(e).strip("'")),
ScoresetMapping(
metadata=metadata, error_message=f"{type(e).__name__}: {e}"
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
Expand Down Expand Up @@ -364,17 +381,17 @@ async def map_scoreset_urn(
urn,
ScoresetMapping(
metadata=None,
error_message=str(e).strip("'"),
error_message=f"{type(e).__name__}: {e}",
),
output_path,
)
_emit_info(f"Score set mapping output saved to: {final_output}.", silent)
return
except ResourceAcquisitionError as e:
msg = f"Unable to acquire resource from MaveDB: {e}"
_logger.critical(msg)
_logger.critical(msg, exc_info=True)
click.echo(f"Error: {msg}")
raise e
raise

if not records:
_emit_info("Score set contains no variants to map", silent, logging.ERROR)
Expand Down
26 changes: 24 additions & 2 deletions src/dcd_mapping/resource_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import httpx
from tqdm import tqdm

from dcd_mapping.exceptions import ResourceAcquisitionError

_logger = logging.getLogger(__name__)

# Common representations of missing/null data in CSV files
Expand Down Expand Up @@ -111,14 +113,23 @@ def request_with_backoff(
Immediately raises on other HTTP errors (e.g., 4xx client errors).
"""
attempt = 0
last_exception: Exception | None = None
while attempt < max_retries:
try:
kwargs.setdefault("timeout", 60)
response = httpx.get(url, **kwargs)
except (httpx.TimeoutException, httpx.ConnectError):
except (httpx.TimeoutException, httpx.ConnectError) as e:
last_exception = e
# Retry on transient network failures
if attempt == max_retries - 1:
raise
_logger.debug(
"Transient network error fetching %s (attempt %d/%d): %s",
url,
attempt + 1,
max_retries,
e,
)
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
Expand All @@ -141,6 +152,10 @@ def request_with_backoff(
else backoff_factor * (2**attempt)
)
except ValueError:
_logger.debug(
"Invalid Retry-After header value: %s, using exponential backoff",
retry_after,
)
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
Expand All @@ -150,6 +165,13 @@ def request_with_backoff(
if 500 <= status < 600:
if attempt == max_retries - 1:
response.raise_for_status()
_logger.debug(
"Server error %d fetching %s (attempt %d/%d)",
status,
url,
attempt + 1,
max_retries,
)
sleep_time = backoff_factor * (2**attempt)
time.sleep(sleep_time)
attempt += 1
Expand All @@ -160,4 +182,4 @@ def request_with_backoff(

# Exhausted retries without success
msg = f"Failed to fetch {url} after {max_retries} attempts"
raise Exception(msg)
raise ResourceAcquisitionError(msg) from last_exception
Loading
Loading