diff --git a/scanpipe/management/commands/check-compliance.py b/scanpipe/management/commands/check-compliance.py index 0016177a01..84ada60341 100644 --- a/scanpipe/management/commands/check-compliance.py +++ b/scanpipe/management/commands/check-compliance.py @@ -110,8 +110,8 @@ def check_vulnerabilities(self): if self.verbosity > 0: if vulnerabilities_count: self.stderr.write(f"{vulnerabilities_count} vulnerabilities found:") - for vulnerability_id, vulnerability_data in all_vulnerabilities.items(): - self.stderr.write(str(vulnerability_id)) + for advisory_id, vulnerability_data in all_vulnerabilities.items(): + self.stderr.write(str(advisory_id)) for affected_obj in vulnerability_data.get("affects", []): self.stderr.write(f" > {affected_obj}") else: diff --git a/scanpipe/migrations/0080_vulnerablecode_v3_data.py b/scanpipe/migrations/0080_vulnerablecode_v3_data.py new file mode 100644 index 0000000000..f89b0dd8c6 --- /dev/null +++ b/scanpipe/migrations/0080_vulnerablecode_v3_data.py @@ -0,0 +1,34 @@ +# Generated by Django 6.0.3 on 2026-04-14 11:10 + +from django.db import migrations +from django.db.models import Q + + +def add_advisory_id(apps, schema_editor): + """Copy vulnerability_id to advisory_id in affected_by_vulnerabilities entries.""" + DiscoveredPackage = apps.get_model("scanpipe", "DiscoveredPackage") + DiscoveredDependency = apps.get_model("scanpipe", "DiscoveredDependency") + EMPTY_VALUES = [None, [], ""] + + vulnerable = ~Q(affected_by_vulnerabilities__in=EMPTY_VALUES) + + for model in [DiscoveredPackage, DiscoveredDependency]: + for instance in model.objects.filter(vulnerable): + for entry in instance.affected_by_vulnerabilities: + if "advisory_id" not in entry: + entry["advisory_id"] = entry.get("vulnerability_id", "") + instance.save(update_fields=["affected_by_vulnerabilities"]) + + +class Migration(migrations.Migration): + + dependencies = [ + ('scanpipe', '0079_apitoken_data'), + ] + + operations = [ + migrations.RunPython( + add_advisory_id, + reverse_code=migrations.RunPython.noop, + ), + ] diff --git a/scanpipe/models.py b/scanpipe/models.py index f941240dd9..8680205e2f 100644 --- a/scanpipe/models.py +++ b/scanpipe/models.py @@ -1573,7 +1573,7 @@ def vulnerabilities(self): """ Return a dict of all vulnerabilities affecting this project. - Combines package and dependency vulnerabilities, keyed by vulnerability_id. + Combines package and dependency vulnerabilities, keyed by advisory_id. Each vulnerability includes an "affects" list of all affected packages and dependencies. """ @@ -1583,11 +1583,13 @@ def vulnerabilities(self): for queryset in querysets: vulnerabilities = queryset.get_vulnerabilities_dict() - for vcid, vuln_data in vulnerabilities.items(): - if vcid in vulnerabilities_dict: - vulnerabilities_dict[vcid]["affects"].extend(vuln_data["affects"]) + for advisory_id, vuln_data in vulnerabilities.items(): + if advisory_id in vulnerabilities_dict: + vulnerabilities_dict[advisory_id]["affects"].extend( + vuln_data["affects"] + ) else: - vulnerabilities_dict[vcid] = vuln_data + vulnerabilities_dict[advisory_id] = vuln_data return vulnerabilities_dict @@ -3361,24 +3363,22 @@ def get_vulnerabilities_list(self): queryset. Extracts and flattens the affected_by_vulnerabilities field from - all objects in the queryset. Removes duplicates based on vulnerability_id + all objects in the queryset. Removes duplicates based on advisory_id while preserving the first occurrence of each unique vulnerability. """ vulnerabilities_lists = self.values_list(self.AFFECTED_BY_FIELD, flat=True) flatten_vulnerabilities = chain.from_iterable(vulnerabilities_lists) - # Deduplicate by vulnerability_id while preserving order + # Deduplicate by advisory_id while preserving order unique_vulnerabilities = { - vuln["vulnerability_id"]: vuln for vuln in flatten_vulnerabilities + vuln["advisory_id"]: vuln for vuln in flatten_vulnerabilities } - return sorted( - unique_vulnerabilities.values(), key=itemgetter("vulnerability_id") - ) + return sorted(unique_vulnerabilities.values(), key=itemgetter("advisory_id")) def get_vulnerabilities_dict(self): """ - Return a dict of vulnerabilities keyed by vulnerability_id. + Return a dict of vulnerabilities keyed by advisory_id. Each vulnerability includes an "affects" list containing all objects from this queryset affected by that vulnerability. @@ -3386,7 +3386,7 @@ def get_vulnerabilities_dict(self): Returns: dict: { 'VCID-1': { - 'vulnerability_id': 'VCID-1', + 'advisory_id': 'VCID-1', 'affects': [obj1, obj2, ...] }, ... @@ -3397,13 +3397,13 @@ def get_vulnerabilities_dict(self): for obj in self.vulnerable_ordered(): for vulnerability in obj.affected_by_vulnerabilities: - vcid = vulnerability.get("vulnerability_id") - if not vcid: + advisory_id = vulnerability.get("advisory_id") + if not advisory_id: continue - if vcid not in vulnerabilities_dict: - vulnerabilities_dict[vcid] = {**vulnerability, "affects": []} - vulnerabilities_dict[vcid]["affects"].append(obj) + if advisory_id not in vulnerabilities_dict: + vulnerabilities_dict[advisory_id] = {**vulnerability, "affects": []} + vulnerabilities_dict[advisory_id]["affects"].append(obj) return vulnerabilities_dict diff --git a/scanpipe/pipes/cyclonedx.py b/scanpipe/pipes/cyclonedx.py index 00a59bd4e2..08b21ad55c 100644 --- a/scanpipe/pipes/cyclonedx.py +++ b/scanpipe/pipes/cyclonedx.py @@ -188,7 +188,7 @@ def cyclonedx_component_to_package_data( cdx_vulnerability_json = cdx_vulnerability.as_json(view_=BaseSchemaVersion) affected_by_vulnerabilities.append( { - "vulnerability_id": str(cdx_vulnerability.id), + "advisory_id": str(cdx_vulnerability.id), "summary": cdx_vulnerability.description, "cdx_vulnerability_data": json.loads(cdx_vulnerability_json), } diff --git a/scanpipe/pipes/output.py b/scanpipe/pipes/output.py index 03fc4ca51c..ce4fb9908a 100644 --- a/scanpipe/pipes/output.py +++ b/scanpipe/pipes/output.py @@ -579,7 +579,7 @@ def add_vulnerabilities_sheet(workbook, project): ] vulnerability_fields = [ - "vulnerability_id", + "advisory_id", "aliases", "summary", "risk_score", @@ -863,7 +863,7 @@ def vulnerability_as_cyclonedx(vulnerability_data, component_bom_ref): ] return cdx_vulnerability.Vulnerability( - id=vulnerability_data.get("vulnerability_id"), + id=vulnerability_data.get("advisory_id"), source=source, description=vulnerability_data.get("summary"), affects=affects, diff --git a/scanpipe/pipes/vulnerablecode.py b/scanpipe/pipes/vulnerablecode.py index 061c8e2028..06fbe5a644 100644 --- a/scanpipe/pipes/vulnerablecode.py +++ b/scanpipe/pipes/vulnerablecode.py @@ -35,7 +35,7 @@ VULNERABLECODE_API_URL = None VULNERABLECODE_URL = settings.VULNERABLECODE_URL if VULNERABLECODE_URL: - VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL}/api/" + VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL.rstrip('/')}/api/v3" # Basic Authentication VULNERABLECODE_USER = settings.VULNERABLECODE_USER @@ -63,7 +63,7 @@ def is_available(): return False try: - response = session.head(VULNERABLECODE_API_URL) + response = session.head(VULNERABLECODE_API_URL, allow_redirects=True) response.raise_for_status() except requests.exceptions.RequestException as request_exception: logger.debug(f"{label} is_available() error: {request_exception}") @@ -91,28 +91,6 @@ def get_purls(packages): return [package_url for package in packages if (package_url := package.package_url)] -def request_get( - url, - payload=None, - timeout=None, -): - """Send a GET request to `url` with optional `payload` and return the response.""" - if not url: - return - - params = {"format": "json"} - if payload: - params.update(payload) - - logger.debug(f"{label}: url={url} params={params}") - try: - response = session.get(url, params=params, timeout=timeout) - response.raise_for_status() - return response.json() - except (requests.RequestException, ValueError, TypeError) as exception: - logger.debug(f"{label} [Exception] {exception}") - - def request_post( url, data, @@ -127,88 +105,29 @@ def request_post( logger.debug(f"{label} [Exception] {exception}") -def _get_vulnerabilities( - url, - field_name, - field_value, - timeout=None, -): - """Get the list of vulnerabilities.""" - payload = {field_name: field_value} - - response = request_get(url=url, payload=payload, timeout=timeout) - if response and response.get("count"): - results = response["results"] - return results - - -def get_vulnerabilities_by_purl( - purl, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Get the list of vulnerabilities providing a package `purl`.""" - return _get_vulnerabilities( - url=f"{api_url}packages/", - field_name="purl", - field_value=purl, - timeout=timeout, - ) - - -def get_vulnerabilities_by_cpe( - cpe, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Get the list of vulnerabilities providing a package or component `cpe`.""" - return _get_vulnerabilities( - url=f"{api_url}cpes/", - field_name="cpe", - field_value=cpe, - timeout=timeout, - ) - - def bulk_search_by_purl( purls, timeout=None, api_url=VULNERABLECODE_API_URL, ): """Bulk search of vulnerabilities using the provided list of `purls`.""" - url = f"{api_url}packages/bulk_search" + url = f"{api_url.rstrip('/')}/packages" data = { "purls": purls, - "vulnerabilities_only": True, + "details": True, } logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}") return request_post(url, data, timeout) -def bulk_search_by_cpes( - cpes, - timeout=None, - api_url=VULNERABLECODE_API_URL, -): - """Bulk search of vulnerabilities using the provided list of `cpes`.""" - url = f"{api_url}cpes/bulk_search" - - data = { - "cpes": cpes, - } - - logger.debug(f"VulnerableCode: url={url} cpes_count={len(cpes)}") - return request_post(url, data, timeout) - - def filter_vulnerabilities(vulnerabilities, ignore_set): """Filter out vulnerabilities based on a list of ignored IDs and aliases.""" return [ vulnerability for vulnerability in vulnerabilities - if vulnerability.get("vulnerability_id") not in ignore_set + if vulnerability.get("advisory_id") not in ignore_set and not any(alias in ignore_set for alias in vulnerability.get("aliases", [])) ] @@ -223,9 +142,12 @@ def fetch_vulnerabilities( vulnerabilities_by_purl = {} for purls_batch in chunked(get_purls(packages), chunk_size): + # Add support for pagination + # {'count': 17, 'next': None, 'previous': None, 'results': [....] response_data = bulk_search_by_purl(purls_batch) - for vulnerability_data in response_data: - vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data + for vulnerability_data in response_data["results"]: + purl = vulnerability_data["purl"] + vulnerabilities_by_purl[purl] = vulnerability_data unsaved_objects = [] for package in packages: diff --git a/scanpipe/templates/scanpipe/includes/vulnerability_id.html b/scanpipe/templates/scanpipe/includes/vulnerability_id.html index 51b3b18163..272397af2e 100644 --- a/scanpipe/templates/scanpipe/includes/vulnerability_id.html +++ b/scanpipe/templates/scanpipe/includes/vulnerability_id.html @@ -1,14 +1,7 @@ -{% if vulnerability.vulnerability_id|slice:":4" == "VCID" and VULNERABLECODE_URL %} - - {{ vulnerability.vulnerability_id }} - - -{% else %} - {{ vulnerability.vulnerability_id }} -{% endif %} +{{ vulnerability.advisory_id }} -