Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scanpipe/management/commands/check-compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def check_vulnerabilities(self):
if self.verbosity > 0:
if vulnerabilities_count:
self.stderr.write(f"{vulnerabilities_count} vulnerabilities found:")
for vulnerability_id, vulnerability_data in all_vulnerabilities.items():
self.stderr.write(str(vulnerability_id))
for advisory_id, vulnerability_data in all_vulnerabilities.items():
self.stderr.write(str(advisory_id))
for affected_obj in vulnerability_data.get("affects", []):
self.stderr.write(f" > {affected_obj}")
else:
Expand Down
34 changes: 34 additions & 0 deletions scanpipe/migrations/0080_vulnerablecode_v3_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Generated by Django 6.0.3 on 2026-04-14 11:10

from django.db import migrations
from django.db.models import Q


def add_advisory_id(apps, schema_editor):
"""Copy vulnerability_id to advisory_id in affected_by_vulnerabilities entries."""
DiscoveredPackage = apps.get_model("scanpipe", "DiscoveredPackage")
DiscoveredDependency = apps.get_model("scanpipe", "DiscoveredDependency")
EMPTY_VALUES = [None, [], ""]

vulnerable = ~Q(affected_by_vulnerabilities__in=EMPTY_VALUES)

for model in [DiscoveredPackage, DiscoveredDependency]:
for instance in model.objects.filter(vulnerable):
for entry in instance.affected_by_vulnerabilities:
if "advisory_id" not in entry:
entry["advisory_id"] = entry.get("vulnerability_id", "")
instance.save(update_fields=["affected_by_vulnerabilities"])


class Migration(migrations.Migration):

dependencies = [
('scanpipe', '0079_apitoken_data'),
]

operations = [
migrations.RunPython(
add_advisory_id,
reverse_code=migrations.RunPython.noop,
),
]
36 changes: 18 additions & 18 deletions scanpipe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1573,7 +1573,7 @@ def vulnerabilities(self):
"""
Return a dict of all vulnerabilities affecting this project.

Combines package and dependency vulnerabilities, keyed by vulnerability_id.
Combines package and dependency vulnerabilities, keyed by advisory_id.
Each vulnerability includes an "affects" list of all affected packages
and dependencies.
"""
Expand All @@ -1583,11 +1583,13 @@ def vulnerabilities(self):

for queryset in querysets:
vulnerabilities = queryset.get_vulnerabilities_dict()
for vcid, vuln_data in vulnerabilities.items():
if vcid in vulnerabilities_dict:
vulnerabilities_dict[vcid]["affects"].extend(vuln_data["affects"])
for advisory_id, vuln_data in vulnerabilities.items():
if advisory_id in vulnerabilities_dict:
vulnerabilities_dict[advisory_id]["affects"].extend(
vuln_data["affects"]
)
else:
vulnerabilities_dict[vcid] = vuln_data
vulnerabilities_dict[advisory_id] = vuln_data

return vulnerabilities_dict

Expand Down Expand Up @@ -3361,32 +3363,30 @@ def get_vulnerabilities_list(self):
queryset.

Extracts and flattens the affected_by_vulnerabilities field from
all objects in the queryset. Removes duplicates based on vulnerability_id
all objects in the queryset. Removes duplicates based on advisory_id
while preserving the first occurrence of each unique vulnerability.
"""
vulnerabilities_lists = self.values_list(self.AFFECTED_BY_FIELD, flat=True)
flatten_vulnerabilities = chain.from_iterable(vulnerabilities_lists)

# Deduplicate by vulnerability_id while preserving order
# Deduplicate by advisory_id while preserving order
unique_vulnerabilities = {
vuln["vulnerability_id"]: vuln for vuln in flatten_vulnerabilities
vuln["advisory_id"]: vuln for vuln in flatten_vulnerabilities
}

return sorted(
unique_vulnerabilities.values(), key=itemgetter("vulnerability_id")
)
return sorted(unique_vulnerabilities.values(), key=itemgetter("advisory_id"))

def get_vulnerabilities_dict(self):
"""
Return a dict of vulnerabilities keyed by vulnerability_id.
Return a dict of vulnerabilities keyed by advisory_id.

Each vulnerability includes an "affects" list containing all
objects from this queryset affected by that vulnerability.

Returns:
dict: {
'VCID-1': {
'vulnerability_id': 'VCID-1',
'advisory_id': 'VCID-1',
'affects': [obj1, obj2, ...]
},
...
Expand All @@ -3397,13 +3397,13 @@ def get_vulnerabilities_dict(self):

for obj in self.vulnerable_ordered():
for vulnerability in obj.affected_by_vulnerabilities:
vcid = vulnerability.get("vulnerability_id")
if not vcid:
advisory_id = vulnerability.get("advisory_id")
if not advisory_id:
continue

if vcid not in vulnerabilities_dict:
vulnerabilities_dict[vcid] = {**vulnerability, "affects": []}
vulnerabilities_dict[vcid]["affects"].append(obj)
if advisory_id not in vulnerabilities_dict:
vulnerabilities_dict[advisory_id] = {**vulnerability, "affects": []}
vulnerabilities_dict[advisory_id]["affects"].append(obj)

return vulnerabilities_dict

Expand Down
2 changes: 1 addition & 1 deletion scanpipe/pipes/cyclonedx.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def cyclonedx_component_to_package_data(
cdx_vulnerability_json = cdx_vulnerability.as_json(view_=BaseSchemaVersion)
affected_by_vulnerabilities.append(
{
"vulnerability_id": str(cdx_vulnerability.id),
"advisory_id": str(cdx_vulnerability.id),
"summary": cdx_vulnerability.description,
"cdx_vulnerability_data": json.loads(cdx_vulnerability_json),
}
Expand Down
4 changes: 2 additions & 2 deletions scanpipe/pipes/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,7 @@ def add_vulnerabilities_sheet(workbook, project):
]

vulnerability_fields = [
"vulnerability_id",
"advisory_id",
"aliases",
"summary",
"risk_score",
Expand Down Expand Up @@ -863,7 +863,7 @@ def vulnerability_as_cyclonedx(vulnerability_data, component_bom_ref):
]

return cdx_vulnerability.Vulnerability(
id=vulnerability_data.get("vulnerability_id"),
id=vulnerability_data.get("advisory_id"),
source=source,
description=vulnerability_data.get("summary"),
affects=affects,
Expand Down
98 changes: 10 additions & 88 deletions scanpipe/pipes/vulnerablecode.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
VULNERABLECODE_API_URL = None
VULNERABLECODE_URL = settings.VULNERABLECODE_URL
if VULNERABLECODE_URL:
VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL}/api/"
VULNERABLECODE_API_URL = f"{VULNERABLECODE_URL.rstrip('/')}/api/v3"

# Basic Authentication
VULNERABLECODE_USER = settings.VULNERABLECODE_USER
Expand Down Expand Up @@ -63,7 +63,7 @@ def is_available():
return False

try:
response = session.head(VULNERABLECODE_API_URL)
response = session.head(VULNERABLECODE_API_URL, allow_redirects=True)
response.raise_for_status()
except requests.exceptions.RequestException as request_exception:
logger.debug(f"{label} is_available() error: {request_exception}")
Expand Down Expand Up @@ -91,28 +91,6 @@ def get_purls(packages):
return [package_url for package in packages if (package_url := package.package_url)]


def request_get(
url,
payload=None,
timeout=None,
):
"""Send a GET request to `url` with optional `payload` and return the response."""
if not url:
return

params = {"format": "json"}
if payload:
params.update(payload)

logger.debug(f"{label}: url={url} params={params}")
try:
response = session.get(url, params=params, timeout=timeout)
response.raise_for_status()
return response.json()
except (requests.RequestException, ValueError, TypeError) as exception:
logger.debug(f"{label} [Exception] {exception}")


def request_post(
url,
data,
Expand All @@ -127,88 +105,29 @@ def request_post(
logger.debug(f"{label} [Exception] {exception}")


def _get_vulnerabilities(
url,
field_name,
field_value,
timeout=None,
):
"""Get the list of vulnerabilities."""
payload = {field_name: field_value}

response = request_get(url=url, payload=payload, timeout=timeout)
if response and response.get("count"):
results = response["results"]
return results


def get_vulnerabilities_by_purl(
purl,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
"""Get the list of vulnerabilities providing a package `purl`."""
return _get_vulnerabilities(
url=f"{api_url}packages/",
field_name="purl",
field_value=purl,
timeout=timeout,
)


def get_vulnerabilities_by_cpe(
cpe,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
"""Get the list of vulnerabilities providing a package or component `cpe`."""
return _get_vulnerabilities(
url=f"{api_url}cpes/",
field_name="cpe",
field_value=cpe,
timeout=timeout,
)


def bulk_search_by_purl(
purls,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
"""Bulk search of vulnerabilities using the provided list of `purls`."""
url = f"{api_url}packages/bulk_search"
url = f"{api_url.rstrip('/')}/packages"

data = {
"purls": purls,
"vulnerabilities_only": True,
"details": True,
}

logger.debug(f"VulnerableCode: url={url} purls_count={len(purls)}")
return request_post(url, data, timeout)


def bulk_search_by_cpes(
cpes,
timeout=None,
api_url=VULNERABLECODE_API_URL,
):
"""Bulk search of vulnerabilities using the provided list of `cpes`."""
url = f"{api_url}cpes/bulk_search"

data = {
"cpes": cpes,
}

logger.debug(f"VulnerableCode: url={url} cpes_count={len(cpes)}")
return request_post(url, data, timeout)


def filter_vulnerabilities(vulnerabilities, ignore_set):
"""Filter out vulnerabilities based on a list of ignored IDs and aliases."""
return [
vulnerability
for vulnerability in vulnerabilities
if vulnerability.get("vulnerability_id") not in ignore_set
if vulnerability.get("advisory_id") not in ignore_set
and not any(alias in ignore_set for alias in vulnerability.get("aliases", []))
]

Expand All @@ -223,9 +142,12 @@ def fetch_vulnerabilities(
vulnerabilities_by_purl = {}

for purls_batch in chunked(get_purls(packages), chunk_size):
# Add support for pagination
# {'count': 17, 'next': None, 'previous': None, 'results': [....]
response_data = bulk_search_by_purl(purls_batch)
for vulnerability_data in response_data:
vulnerabilities_by_purl[vulnerability_data["purl"]] = vulnerability_data
for vulnerability_data in response_data["results"]:
purl = vulnerability_data["purl"]
vulnerabilities_by_purl[purl] = vulnerability_data

unsaved_objects = []
for package in packages:
Expand Down
13 changes: 3 additions & 10 deletions scanpipe/templates/scanpipe/includes/vulnerability_id.html
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
{% if vulnerability.vulnerability_id|slice:":4" == "VCID" and VULNERABLECODE_URL %}
<a href="{{ VULNERABLECODE_URL }}/vulnerabilities/{{ vulnerability.vulnerability_id }}" target="_blank">
{{ vulnerability.vulnerability_id }}
<i class="fa-solid fa-up-right-from-square is-small"></i>
</a>
{% else %}
{{ vulnerability.vulnerability_id }}
{% endif %}
{{ vulnerability.advisory_id }}

<ul class="list-unstyled mb-0">
{% for alias in aliases %}
<ul class="mb-0">
{% for alias in vulnerability.aliases %}
<li>
{% if alias|slice:":3" == "CVE" %}
<a href="https://nvd.nist.gov/vuln/detail/{{ alias }}" target="_blank">{{ alias }}
Expand Down
62 changes: 36 additions & 26 deletions scanpipe/templates/scanpipe/tabset/tab_vulnerabilities.html
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
<div class="content">
<table class="table is-bordered is-striped is-narrow is-hoverable is-fullwidth">
<thead>
<table class="table is-bordered is-striped is-narrow is-hoverable is-fullwidth">
<thead>
<tr>
<th style="width: 220px;">Affected by</th>
<th>Summary</th>
<th>Exploitability</th>
<th>Severity</th>
<th>Risk</th>
<th>Analysis</th>
</tr>
</thead>
<tbody>
{% for vulnerability in tab_data.fields.affected_by_vulnerabilities.value %}
<tr>
<th style="width: 220px;">Affected by</th>
<th>Summary</th>
<th>Analysis</th>
<td>
{% include 'scanpipe/includes/vulnerability_id.html' with vulnerability=vulnerability VULNERABLECODE_URL=tab_data.VULNERABLECODE_URL %}
</td>
<td>
{% include 'scanpipe/includes/vulnerability_summary.html' with vulnerability=vulnerability only %}
</td>
<td>
{{ vulnerability.exploitability|default_if_none:"" }}
</td>
<td>
{{ vulnerability.weighted_severity|default_if_none:"" }}
</td>
<td>
{{ vulnerability.risk_score|default_if_none:"" }}
</td>
<td>
{% for key, value in vulnerability.cdx_vulnerability.analysis.items %}
<strong>{{ key }}:</strong> {{ value }}{% if not forloop.last %}<br>{% endif %}
{% endfor %}
</td>
</tr>
</thead>
<tbody>
{% for vulnerability in tab_data.fields.affected_by_vulnerabilities.value %}
<tr>
<td>
{% include 'scanpipe/includes/vulnerability_id.html' with vulnerability=vulnerability VULNERABLECODE_URL=tab_data.VULNERABLECODE_URL %}
</td>
<td>
{% include 'scanpipe/includes/vulnerability_summary.html' with vulnerability=vulnerability only %}
</td>
<td>
{% for key, value in vulnerability.cdx_vulnerability.analysis.items %}
<strong>{{ key }}:</strong> {{ value }}{% if not forloop.last %}<br>{% endif %}
{% endfor %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% endfor %}
</tbody>
</table>
Loading
Loading