Commit eb4811e9 authored by Abderrahim Kitouni's avatar Abderrahim Kitouni Committed by Abderrahim Kitouni
parent e0e5117e
Pipeline #125682 failed with stages
in 24 minutes
......@@ -304,8 +304,6 @@ cve_report:
stage: reports
dependencies: [track]
script:
- pip3 install --user lxml
- ${BST} pull platform-manifest.bst sdk-manifest.bst
- ${BST} checkout platform-manifest.bst platform-manifest/
......
......@@ -21,34 +21,38 @@ import datetime
import itertools
import urllib.request
import urllib.parse
from contextlib import contextmanager
from lxml import etree as ET
namespaces = {
"feed": "http://scap.nist.gov/schema/feed/vulnerability/2.0",
"vuln": "http://scap.nist.gov/schema/vulnerability/0.4",
"cvss": "http://scap.nist.gov/schema/cvss-v2/0.2",
}
from contextlib import contextmanager, ExitStack
import signal
import socket
import json
def extract_vulns(tree):
for entry in tree.iterfind("feed:entry", namespaces=namespaces):
cve_id = entry.find("vuln:cve-id", namespaces=namespaces).text
summary = entry.find("vuln:summary", namespaces=namespaces).text
score = entry.find("vuln:cvss/cvss:base_metrics/cvss:score", namespaces=namespaces)
yield cve_id, summary, score.text if score is not None else None
for item in tree['CVE_Items']:
cve_id = item['cve']['CVE_data_meta']['ID']
summary = item['cve']['description']['description_data'][0]['value']
score = item['impact'].get('baseMetricV2', {}).get('cvssV2', {}).get('baseScore')
yield cve_id, summary, score
def extract_product_vulns(tree):
for entry in tree.iterfind("feed:entry", namespaces=namespaces):
cve_id = entry.find("vuln:cve-id", namespaces=namespaces).text
for vuln_software in entry.iterfind("vuln:vulnerable-software-list", namespaces=namespaces):
for product in vuln_software.iterfind("vuln:product", namespaces=namespaces):
product_name = product.text
def extract_product_vulns_sub(cve_id, node):
if "cpe_match" in node:
for vuln_software in node["cpe_match"]:
if vuln_software["vulnerable"]:
product_name = vuln_software["cpe23Uri"]
try:
vendor, name, version = product_name.split(':')[2:5]
vendor, name, version = product_name.split(':')[3:6]
except ValueError:
continue
yield cve_id, vendor, name, version
else:
for child in node.get("children", []):
yield from extract_product_vulns_sub(cve_id, child)
def extract_product_vulns(tree):
for item in tree['CVE_Items']:
cve_id = item['cve']['CVE_data_meta']['ID']
for node in item['configurations']["nodes"]:
yield from extract_product_vulns_sub(cve_id, node)
def ensure_tables(c):
c.execute("""CREATE TABLE IF NOT EXISTS etags
......@@ -59,8 +63,40 @@ def ensure_tables(c):
(cve_id TEXT, name TEXT, vendor TEXT, version TEXT,
UNIQUE(cve_id, name, vendor, version))""")
def update_year(c, year):
url = 'https://nvd.nist.gov/feeds/xml/cve/2.0/nvdcve-2.0-{}.xml.gz'.format(year)
class UrlOpenTimeout:
def __init__(self):
self._max = 120
self._min = 5
self._timeout = self._max
@contextmanager
def open(self, req):
with ExitStack() as stack:
def _timeout(signum, frame):
raise TimeoutError()
try:
signal.signal(signal.SIGALRM, _timeout)
signal.alarm(self._timeout)
resp = stack.enter_context(urllib.request.urlopen(req, timeout=self._timeout))
self._timeout = self._max
except TimeoutError:
self._timeout = max(int(self._timeout/2), self._min)
raise
except urllib.error.URLError as e:
if isinstance(e.reason, socket.timeout):
self._timeout = max(int(self._timeout/2), self._min)
raise TimeoutError()
else:
raise
finally:
signal.alarm(0)
yield resp
def update_year(c, year, url_timeout):
url = 'https://nvd.nist.gov/feeds/json/cve/1.1/nvdcve-1.1-{}.json.gz'.format(year)
c.execute("SELECT etag FROM etags WHERE year=?", (year,))
row = c.fetchone()
if row is not None:
......@@ -71,26 +107,33 @@ def update_year(c, year):
request = urllib.request.Request(url)
if etag is not None:
request.add_header('If-None-Match', etag)
url_opener = url_timeout.open
else:
url_opener = urllib.request.urlopen
try:
with urllib.request.urlopen(request) as resp:
with url_opener(request) as resp:
new_etag = resp.getheader('ETag')
assert new_etag is not None
if new_etag is not None:
c.execute("INSERT OR REPLACE INTO etags (year, etag) VALUES (?, ?)", (year, new_etag))
with open('nvdcve-2.0-{}.xml.gz'.format(year), 'wb') as f:
with open('nvdcve-1.1-{}.json.gz'.format(year), 'wb') as f:
while True:
buf = resp.read(4096)
if not buf:
print("Downloaded {}".format(f.name))
break
f.write(buf)
except TimeoutError:
if etag is None:
raise
print("Timeout, using cache for {}".format('nvdcve-1.1-{}.json.gz'.format(year)))
except urllib.error.HTTPError as error:
if error.code != 304:
raise
print("Cached {}".format('nvdcve-2.0-{}.xml.gz'.format(year)))
print("Cached {}".format('nvdcve-1.1-{}.json.gz'.format(year)))
with gzip.open('nvdcve-2.0-{}.xml.gz'.format(year)) as f:
tree = ET.parse(f)
with gzip.open('nvdcve-1.1-{}.json.gz'.format(year)) as f:
tree = json.load(f)
for cve_id, summary, score in extract_vulns(tree):
c.execute("INSERT OR REPLACE INTO cve (id, summary, score) VALUES (?, ?, ?)", (cve_id, summary, score))
......@@ -102,9 +145,10 @@ if __name__ == '__main__':
c = conn.cursor()
try:
ensure_tables(c)
url_timeout = UrlOpenTimeout()
for year in range(2002, datetime.datetime.now().year + 1):
update_year(c, str(year))
update_year(c, 'Modified')
update_year(c, str(year), url_timeout)
update_year(c, 'Modified', url_timeout)
conn.commit()
finally:
conn.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment