import argparse
import os
import re
import shutil
from itertools import chain
from pathlib import Path
from typing import List, Optional
from ..context import Context
from ..instance import Instance
from ..parallel import Pool
from ..target import Target
from ..util import download, run
[docs]class Juliet(Target):
"""
The `Juliet Test Suite for C/C++
<https://samate.nist.gov/SARD/documentation#juliet>`_.
This test suite contains a large amount of programs, categorized by
vulnerability type (CWE). Most programs include both a "good" and "bad"
version, where the good version should succeed (no bug) whereas the bad
version should be detected by the applied mitigation. In other words, the
good version tests for false positives, and the bad version for false
negatives.
The ``--cwe`` command-line argument specifies which CWEs to build and/or
run, and can be a CWE-ID (``416`` or ``CWE416``) or an alias (e.g.,
``uaf``). A mix of CWE-IDs and aliases is allowed.
The Juliet suite contains multiple *flow variants* per test case. These are
different control-flows in the program, that in the end all arrive at the
same bug. This is only relevant for static analysis tools, and for run-time
mitigations these are unsuitable. In particular, some flow variants (e.g.,
12) do not (always) trigger or reach the bug at runtime. Therefore, by
default only flow variant 01 is used, but others can be specified with the
``--variants`` command-line argument.
By default, a good test is counted as successful (true negative) if its
returncode is 0, and a bad test is counted as successful (true positive) if
its returncode is non-zero. The latter behavior can be fine-tuned via the
``mitigation_return_code`` argument to this class, which can be set to match
the returncode of the mitigation.
Each test receives a fixed string to stdin.
Tests that are based on sockets are currently not supported, as this
requires running two tests at the same time (a client and a server).
Tests can be built in parallel (using ``--parallel=proc``), since this
process might take a while when multiple CWEs or variants are selected.
Running tests in parallel is not supported (yet).
:name: juliet
:param mitigation_return_code: Return code the mitigation exits with, to
distinguish true positives for the bad
version of testcases. If ``None``, any
non-zero value is considered a success.
"""
name = "juliet"
aggregation_field = "cwe"
zip_name = "Juliet_Test_Suite_v1.3_for_C_Cpp.zip"
def __init__(self, mitigation_return_code: Optional[int] = None):
self.mitigation_return_code = mitigation_return_code
def add_build_args(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"--cwe", required=True, nargs="+", help="which CWE to build"
)
parser.add_argument(
"--variants",
nargs="+",
type=int,
default=[1],
help="which flow variants to build",
)
def add_run_args(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--cwe", required=True, nargs="+", help="which CWE to run")
parser.add_argument(
"--variants",
nargs="+",
type=int,
default=[1],
help="which flow variants to build",
)
@staticmethod
def parse_cwe_list(cwe_list: List[str]) -> List[str]:
aliases = {}
aliases["buffer-overflow"] = [
"CWE121",
"CWE122",
"CWE124",
"CWE126",
"CWE127",
"CWE680",
]
aliases["spatial"] = aliases["buffer-overflow"] + ["CWE123"]
aliases["double-free"] = ["CWE415"]
aliases["uaf"] = ["CWE416"]
aliases["stack-uaf"] = ["CWE562"]
aliases["invalid-free"] = ["CWE590", "CWE761"]
aliases["memory-error"] = list(chain(*aliases.values()))
ret = set()
for cwe in cwe_list:
if re.match(r"^CWE\d+$", cwe):
ret.add(cwe)
elif re.match(r"^\d+$", cwe):
ret.add(f"CWE{cwe}")
elif cwe in aliases:
for c in aliases[cwe]:
ret.add(c)
else:
raise ValueError(
"CWE must be in format 'CWE<number>' or one of"
f" {','.join(aliases)}, not {cwe}"
)
return list(ret)
def is_fetched(self, ctx: Context) -> bool:
return os.path.exists(self.zip_name)
def fetch(self, ctx: Context) -> None:
url = f"https://zenodo.org/record/4701387/files/{self.zip_name}?download=1"
download(ctx, url)
def build(
self, ctx: Context, instance: Instance, pool: Optional[Pool] = None
) -> None:
for cwe in self.parse_cwe_list(ctx.args.cwe):
self.build_cwe(ctx, instance, pool, cwe)
def build_cwe(
self, ctx: Context, instance: Instance, pool: Optional[Pool], cwe: str
) -> None:
bdir = Path(self.path(ctx))
srcrootdir = bdir / "src"
os.makedirs(srcrootdir, exist_ok=True)
testcasedir = srcrootdir / "C" / "testcases"
incdir = srcrootdir / "C" / "testcasesupport"
if not testcasedir.is_dir():
run(ctx, ["unzip", self.zip_name, "-d", str(srcrootdir)])
cwedirs = list(testcasedir.glob(f"{cwe}_*"))
if not cwedirs:
raise Exception(f"Could not find {cwe}")
assert len(cwedirs) == 1
cwedir = cwedirs[0]
objdir = bdir / "obj" / instance.name / cwe
gooddir = objdir / "good"
baddir = objdir / "bad"
if objdir.exists():
shutil.rmtree(objdir)
os.makedirs(gooddir, exist_ok=True)
os.makedirs(baddir, exist_ok=True)
# Some CWEs split their tests up in subdirs
cwesrcdirs = [cwedir]
if (cwedir / "s01").exists():
cwesrcdirs = list(cwedir.glob("s*"))
assert len(cwesrcdirs) > 1
for cwesrcdir in cwesrcdirs:
for testpath in chain(cwesrcdir.glob("*.c"), cwesrcdir.glob("*.cpp")):
testname = testpath.stem
m = re.match(r".*_(\d+)([a-z]|_[a-zA-Z0-9]+)?", testname)
if not m:
continue
variant = int(m.group(1))
part = m.group(2)
# Only run selected flow-variants (normally only 01)
if variant not in ctx.args.variants:
continue
# Skip windows-only tests
if "w32" in testname or "wchar_t" in testname:
continue
# Skip socket tests since we cannot run them (multi-program)
if "socket" in testname:
continue
# Handle multi-file test-cases
testfiles = [str(testpath)]
if part:
if part != "a":
continue
testname = testname[:-1]
pattern = f"{testname}*{testpath.suffix}"
testfiles = [str(f) for f in cwesrcdir.glob(pattern)]
ctx.log.info(f"building {testname}")
goodbin = gooddir / testname
badbin = baddir / testname
# janky way to support 'cc' and 'cxx' with spaces
cc, cc_args = ctx.cc, []
if " " in cc:
cc, *cc_args = cc.split(" ")
cxx, cxx_args = ctx.cxx, []
if " " in cxx:
cxx, *cxx_args = cxx.split(" ")
if testpath.suffix == ".c":
compiler = [cc, *cc_args, *ctx.cflags]
else:
compiler = [cxx, *cxx_args, *ctx.cxxflags]
compiler += ["-DINCLUDEMAIN"]
compiler += ["-I", str(incdir)]
testfiles += [str(incdir / "io.c")]
cmd_good = [
*compiler,
*testfiles,
"-o",
str(goodbin),
"-DOMITBAD",
*ctx.ldflags,
]
cmd_bad = [
*compiler,
*testfiles,
"-o",
str(badbin),
"-DOMITGOOD",
*ctx.ldflags,
]
resdir = Path(ctx.paths.pool_results)
outdir = resdir / "build" / self.name / instance.name
if "bad" not in testname:
if pool:
pool.run(
ctx,
cmd_good,
jobid=f"build-{testname}-good",
outfile=f"{outdir}/{testname}-good",
nnodes=1,
)
else:
run(ctx, cmd_good)
if "good" not in testname:
if pool:
pool.run(
ctx,
cmd_bad,
jobid=f"build-{testname}-bad",
outfile=f"{outdir}/{testname}-bad",
nnodes=1,
)
else:
run(ctx, cmd_bad)
def binary_paths(self, ctx: Context, instance: Instance) -> List[str]:
paths = []
for cwe in self.parse_cwe_list(ctx.args.cwe):
bdir = Path(self.path(ctx))
objdir = bdir / "obj" / instance.name / cwe
gooddir = objdir / "good"
for testpath in gooddir.iterdir():
paths.append(testpath)
baddir = objdir / "bad"
for testpath in baddir.iterdir():
paths.append(testpath)
return [str(p) for p in paths]
def run(
self, ctx: Context, instance: Instance, pool: Optional[Pool] = None
) -> None:
for cwe in self.parse_cwe_list(ctx.args.cwe):
self.run_cwe(ctx, instance, cwe)
def run_cwe(self, ctx: Context, instance: Instance, cwe: str) -> None:
bdir = Path(self.path(ctx))
objdir = bdir / "obj" / instance.name / cwe
stdin = b"A" * 8
good_ok_cnt, good_total_cnt = 0, 0
gooddir = objdir / "good"
for testpath in gooddir.iterdir():
testname = testpath.stem
good_total_cnt += 1
proc = run(
ctx,
[str(testpath)],
env=ctx.runenv,
silent=True,
allow_error=False,
input=stdin,
universal_newlines=False,
)
if proc.returncode:
ctx.log.error(f"GOOD {testname} returned error")
else:
good_ok_cnt += 1
bad_ok_cnt, bad_total_cnt = 0, 0
baddir = objdir / "bad"
for testpath in baddir.iterdir():
testname = testpath.stem
if "good" in testname:
continue
bad_total_cnt += 1
proc = run(
ctx,
[str(testpath)],
env=ctx.runenv,
silent=True,
allow_error=True,
input=stdin,
universal_newlines=False,
)
if (
self.mitigation_return_code is not None
and self.mitigation_return_code != proc.returncode
):
ctx.log.error(
f"BAD {testname} did not return correct error: "
f"returned {proc.returncode}, expected "
f"{self.mitigation_return_code}"
)
elif self.mitigation_return_code is None and not proc.returncode:
ctx.log.error(f"BAD {testname} did not return error")
else:
bad_ok_cnt += 1
ctx.log.info(f"{cwe}: Passed {good_ok_cnt}/{good_total_cnt} GOOD tests")
ctx.log.info(f"{cwe}: Passed {bad_ok_cnt}/{bad_total_cnt} BAD tests")