From 7369247dc8ab69e3d1c5787cb7fb6f039da56f72 Mon Sep 17 00:00:00 2001 From: tdruez Date: Mon, 13 Apr 2026 19:15:08 +0400 Subject: [PATCH 1/5] Migrate VulnerableCode integration to API v3 Signed-off-by: tdruez --- scanpipe/pipes/vulnerablecode.py | 98 +- .../scanpipe/includes/vulnerability_id.html | 22 +- .../scanpipe/tabset/tab_vulnerabilities.html | 12 + .../django-5.0_package_data.json | 1079 ++++++----------- scanpipe/tests/pipes/test_vulnerablecode.py | 15 +- scanpipe/tests/test_pipelines.py | 7 +- 6 files changed, 436 insertions(+), 797 deletions(-) diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index 061c8e2028..06fbe5a644 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -35,7 +35,7 @@ VULNERABLECODE_API_URL = None VULNERABLECODE_URL = settings.VULNERABLECODE_URL if VULNERABLECODE_URL: - VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL}/api/" + VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL.rstrip('/')}/api/v3" # Basic Authentication VULNERABLECODE_USER = settings.VULNERABLECODE_USER @@ -63,7 +63,7 @@ def is_available(): return False try: - response = session.head(VULNERABLECODE_API_URL) + response = session.head(VULNERABLECODE_API_URL, allow_redirects=True) response.raise_for_status() except requests.exceptions.RequestException as request_exception: logger.debug(f"{label} is_available() error: {request_exception}") @@ -91,28 +91,6 @@ def get_purls(packages): return [package_url for package in packages if (package_url := package.package_url)] -def request_get( - url, - payload=None, - timeout=None, -): - """Send a GET request to `url` with optional `payload` and return the response.""" - if not url: - return - - params = {"format": "json"} - if payload: - params.update(payload) - - logger.debug(f"{label}: url={url} params={params}") - try: - response = session.get(url, params=params, timeout=timeout) - response.raise_for_status() - return response.json() - except (requests.RequestException, ValueError, TypeError) as exception: - logger.debug(f"{label} [Exception] {exception}") - - def request_post( url, data, @@ -127,88 +105,29 @@ def request_post( logger.debug(f"{label} [Exception] {exception}") -def _get_vulnerabilities( - url, - field_name, - field_value, - timeout=None, -): - """Get the list of vulnerabilities.""" - payload = {field_name: field_value} - - response = request_get(url=url, payload=payload, timeout=timeout) - if response and response.get("count"): - results = response["results"] - return results - - -def get_vulnerabilities_by_purl( - purl, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Get the list of vulnerabilities providing a package `purl`.""" - return _get_vulnerabilities( - url=f"{api_url}packages/", - field_name="purl", - field_value=purl, - timeout=timeout, - ) - - -def get_vulnerabilities_by_cpe( - cpe, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Get the list of vulnerabilities providing a package or component `cpe`.""" - return _get_vulnerabilities( - url=f"{api_url}cpes/", - field_name="cpe", - field_value=cpe, - timeout=timeout, - ) - - def bulk_search_by_purl( purls, timeout=None, api_url=VULNERABLECODE_API_URL, ): """Bulk search of vulnerabilities using the provided list of `purls`.""" - url = f"{api_url}packages/bulk_search" + url = f"{api_url.rstrip('/')}/packages" data = { "purls": purls, - "vulnerabilities_only": True, + "details": True, } logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}") return request_post(url, data, timeout) -def bulk_search_by_cpes( - cpes, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Bulk search of vulnerabilities using the provided list of `cpes`.""" - url = f"{api_url}cpes/bulk_search" - - data = { - "cpes": cpes, - } - - logger.debug(f"VulnerableCode: url={url} cpes_count={len(cpes)}") - return request_post(url, data, timeout) - - def filter_vulnerabilities(vulnerabilities, ignore_set): """Filter out vulnerabilities based on a list of ignored IDs and aliases.""" return [ vulnerability for vulnerability in vulnerabilities - if vulnerability.get("vulnerability_id") not in ignore_set + if vulnerability.get("advisory_id") not in ignore_set and not any(alias in ignore_set for alias in vulnerability.get("aliases", [])) ] @@ -223,9 +142,12 @@ def fetch_vulnerabilities( vulnerabilities_by_purl = {} for purls_batch in chunked(get_purls(packages), chunk_size): + # Add support for pagination + # {'count': 17, 'next': None, 'previous': None, 'results': [....] response_data = bulk_search_by_purl(purls_batch) - for vulnerability_data in response_data: - vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data + for vulnerability_data in response_data["results"]: + purl = vulnerability_data["purl"] + vulnerabilities_by_purl[purl] = vulnerability_data unsaved_objects = [] for package in packages: diff --git a/scanpipe/templates/scanpipe/includes/vulnerability_id.html b/scanpipe/templates/scanpipe/includes/vulnerability_id.html index 51b3b18163..9d9e7d5d0b 100644 --- a/scanpipe/templates/scanpipe/includes/vulnerability_id.html +++ b/scanpipe/templates/scanpipe/includes/vulnerability_id.html @@ -1,14 +1,16 @@ -{% if vulnerability.vulnerability_id|slice:":4" == "VCID" and VULNERABLECODE_URL %} - - {{ vulnerability.vulnerability_id }} - - -{% else %} - {{ vulnerability.vulnerability_id }} -{% endif %} + + + + + + + + -