Source code for infra.targets.webservers

import argparse
import os
import random
import re
import shutil
import string
import time
from abc import ABCMeta, abstractmethod
from contextlib import redirect_stdout
from hashlib import md5
from multiprocessing import cpu_count
from statistics import mean, median, pstdev
from typing import Dict, Iterable, Iterator, List, Mapping, Optional, Sequence, Union
from urllib.request import urlretrieve

from ..commands.report import outfile_path
from ..context import Context
from ..instance import Instance
from ..package import Package
from ..packages import Bash, Netcat, Scons, Wrk
from ..parallel import Job, Pool, ProcessPool, PrunPool, SSHJob, SSHPool
from ..target import Target
from ..util import FatalError, ResultDict, download, join_env_paths, qjoin, run, untar
from .remote_runner import RemoteRunner, RemoteRunnerError


class WebServer(Target, metaclass=ABCMeta):
    aggregation_field = "connections"

    def reportable_fields(self) -> Mapping[str, str]:
        return {
            "connections": "concurrent client connections",
            "threads": "number of client threads making connections",
            "throughput": "attained throughput (reqs/s)",
            "avg_latency": "average latency (ms)",
            "50p_latency": "50th percentile latency (ms)",
            "75p_latency": "75th percentile latency (ms)",
            "90p_latency": "90th percentile latency (ms)",
            "99p_latency": "99th percentile latency (ms)",
            "transferrate": "network traffic (KB/s)",
            "duration": "benchmark duration (s)",
            "cpu": "median server CPU load during benchmark (%%)",
        }

    def dependencies(self) -> Iterator[Package]:
        yield Bash("4.3")
        yield Wrk()
        yield Netcat("0.7.1")

    def add_run_args(self, parser: argparse.ArgumentParser) -> None:
        parser.add_argument(
            "-t",
            "-type",
            dest="run_type",
            required=True,
            choices=("serve", "test", "bench", "bench-server", "bench-client"),
            help=(
                "serve: just run the web server until it is killed\n"
                "test: test a single fetch of randomized index.html\n"
                "bench: run server and wrk client on separate nodes "
                "(needs prun)"
            ),
        )

        # common options
        parser.add_argument(
            "--port",
            type=int,
            default=random.randint(10000, 30000),
            help="web server port (random by default)",
        )
        parser.add_argument(
            "--filesize",
            type=str,
            default="64",
            help=(
                "filesize for generated index.html in bytes "
                "(supports suffixes compatible with dd, default 64)"
            ),
        )

        # bench options
        parser.add_argument(
            "--duration",
            metavar="SECONDS",
            default=10,
            type=int,
            help="benchmark duration in seconds (default 10)",
        )
        parser.add_argument(
            "--threads",
            type=int,
            default=1,
            help="concurrent wrk threads (distributes client load)",
        )
        parser.add_argument(
            "--connections",
            nargs="+",
            type=int,
            help=(
                "a list of concurrent wrk connections; "
                "start low and increment until the server is saturated"
            ),
        )
        parser.add_argument(
            "--cleanup-time",
            metavar="SECONDS",
            default=0,
            type=int,
            help="time to wait between benchmarks (default 3)",
        )

        parser.add_argument(
            "--restart-server-between-runs",
            default=False,
            action="store_true",
            help=(
                "terminate and restart the server between each "
                "benchmarking run (e.g., when benchmarking multiple "
                "connection configurations or doing multiple iterations\n"
                "NOTE: only supported for --parallel=ssh!"
            ),
        )
        parser.add_argument(
            "--disable-warmup",
            default=False,
            action="store_true",
            help=(
                "disable the warmup run of the server before doing actual "
                "benchmarks. This can be useful for measuring statistics\n"
                "NOTE: only supported for --parallel=ssh!"
            ),
        )
        parser.add_argument(
            "--collect-stats",
            nargs="+",
            choices=("cpu", "cpu-proc", "rss", "vms"),
            help=(
                "Statistics to collect of server while running benchmarks "
                "(disabled if not specified)\n"
                "NOTE: only supported for --parallel=ssh!\n"
                "cpu: CPU utilization of entire server (0..100%%)\n"
                "cpu-proc: sum of CPU utilization of all server processes "
                "(0..nproc*100%%)\n"
                "rss: sum of Resident Set Size of all server processes\n"
                "vms: sum of Virtual Memory Size of all server processes\n"
            ),
        )
        parser.add_argument(
            "--collect-stats-interval",
            type=float,
            default=1,
            help=(
                "seconds between measurements of statistics provided in "
                "the --collect-stats argument. Has no effect if no "
                "statistics are specified.\n"
                "NOTE: only supported for --parallel=ssh!"
            ),
        )
        parser.add_argument(
            "--remote-client-host",
            type=str,
            default="",
            help=(
                "if specified, connect to the remote client runner via "
                "this host instead of setting up an SSH tunnel.\n"
                "NOTE: only supported for --parallel=ssh!"
            ),
        )
        parser.add_argument(
            "--remote-server-host",
            type=str,
            default="",
            help=(
                "if specified, connect to the remote server runner via "
                "this host instead of setting up an SSH tunnel.\n"
                "NOTE: only supported for --parallel=ssh!"
            ),
        )
        parser.add_argument(
            "--nofork",
            action="store_true",
            help=(
                "if specified, run the server without any forking "
                "(1 worker max)\n"
                "NOTE: only supported for --parallel=ssh and nginx!"
            ),
        )
        parser.add_argument(
            "--config",
            type=str,
            default="",
            help=(
                "Config file to be used instead of the default template\n"
                "NOTE: use absolute path"
            ),
        )

        # bench-client options
        parser.add_argument(
            "--server-ip", help="IP of machine running matching bench-server"
        )

    def run(
        self, ctx: Context, instance: Instance, pool: Optional[Pool] = None
    ) -> None:
        runner = WebServerRunner(self, ctx, instance, pool)

        if ctx.args.run_type == "serve":
            runner.run_serve()
        elif ctx.args.run_type == "test":
            runner.run_test()
        elif ctx.args.run_type == "bench":
            runner.run_bench()
        elif ctx.args.run_type == "bench-server":
            runner.run_bench_server()
        elif ctx.args.run_type == "bench-client":
            runner.run_bench_client()

    @abstractmethod
    def populate_stagedir(self, runner: "WebServerRunner") -> None:
        """
        Populate the staging directory (`runner.stagedir`), which will be copied
        to (or mounted on) both server and the client as their run directory
        (`runner.rundir`) later. E.g., write the server configuration file here.
        The configuration should store temporary files, such as access logs, in
        the rundir (`runner.rundir`) which will be private to each host in the
        run pool.

        :param runner: the web server runner instance calling this function
        """
        pass

    @abstractmethod
    def server_bin(self, ctx: Context, instance: Instance) -> str:
        """
        Retrieve path to the server binary file.

        :param instance: the instance for which this webserver is used
        :returns: the path to the server binary
        """
        pass

    @abstractmethod
    def pid_file(self, runner: "WebServerRunner") -> str:
        """
        Retrieve path to the PID file (a file containing the process id of the
        running web server instance).

        :param runner: the web server runner instance calling this function
        :returns: the path to the pid file
        """
        pass

    @abstractmethod
    def start_cmd(self, runner: "WebServerRunner", foreground: bool = False) -> str:
        """
        Generate command to start running the webserver.

        :param runner: the web server runner instance calling this function
        :param foreground: whether to start the web server in the foreground or
                           background (i.e., daemonize, the default)
        :returns: the command that starts the server
        """
        pass

    @abstractmethod
    def stop_cmd(self, runner: "WebServerRunner") -> str:
        """
        Generate command to stop running the webserver.

        :param runner: the web server runner instance calling this function
        :returns: the command that stops the server
        """
        pass

    @staticmethod
    @abstractmethod
    def kill_cmd(runner: "WebServerRunner") -> str:
        """
        Generate command to forcefully kill the running webserver.

        :param runner: the web server runner instance calling this function
        :returns: the command that kills the server
        """
        pass

    def start_script(self, runner: "WebServerRunner") -> str:
        """
        Generate a bash script that starts the server daemon.

        :param runner: the web server runner instance calling this function
        :returns: a bash script that starts the server daemon
        """
        start_cmd = self.start_cmd(runner)
        pid_file = self.pid_file(runner)
        libs = runner.ctx.runenv.get("LD_PRELOAD")
        preload = ""
        if libs is not None:
            preload = f"LD_PRELOAD={':'.join(libs)}"
            runner.ctx.runenv["LD_PRELOAD"] = ""

        return f"""
        {preload} {start_cmd}
        echo -n "=== started server on port {runner.ctx.args.port}, "
        echo "pid $(cat "{pid_file}")"
        """

    def stop_script(self, runner: "WebServerRunner") -> str:
        """
        Generate a bash script that stops the server daemon after benchmarking.

        :param runner: the web server runner instance calling this function
        :returns: a bash script that stops the server daemon
        """
        return self.stop_cmd(runner)

    def parse_outfile(self, ctx: Context, outfile: str) -> Iterator[ResultDict]:
        dirname, filename = os.path.split(outfile)
        if not filename.startswith("bench."):
            ctx.log.debug("ignoring non-benchmark file")
            return

        with open(outfile) as f:
            outfile_contents = f.read()

        def search(regex: str) -> str:
            m = re.search(regex, outfile_contents, re.M)
            assert m, "regex not found in outfile " + outfile
            return m.group(1)

        def parse_latency(s: str) -> float:
            m = re.match(r"(\d+\.\d+)([mun]?s)", s)
            assert m, "invalid latency"
            latency = float(m.group(1))
            unit = m.group(2)
            if unit == "us":
                latency /= 1000
            elif unit == "ns":
                latency /= 1000000
            elif unit == "s":
                latency *= 1000
            return latency

        def parse_bytesize(s: str) -> float:
            m = re.match(r"(\d+\.\d+)([KMGTP]?B)", s)
            assert m, "invalid bytesize"
            size = float(m.group(1))
            unit = m.group(2)
            factors = {
                "B": 1.0 / 1024,
                "KB": 1,
                "MB": 1024,
                "GB": 1024 * 1024,
                "TB": 1024 * 1024 * 1024,
                "PB": 1024 * 1024 * 1024 * 1024,
            }
            return size * factors[unit]

        cpu_outfile = os.path.join(dirname, filename.replace("bench", "cpu"))
        with open(cpu_outfile) as f:
            try:
                cpu_usages = [float(line) for line in f]
            except ValueError:
                raise FatalError(f"{cpu_outfile} contains invalid lines")

        yield {
            "threads": int(search(r"(\d+) threads and \d+ connections")),
            "connections": int(search(r"\d+ threads and (\d+) connections")),
            "avg_latency": parse_latency(search(r"^    Latency\s+([^ ]+)")),
            "50p_latency": parse_latency(search(r"^\s+50%\s+(.+)")),
            "75p_latency": parse_latency(search(r"^\s+75%\s+(.+)")),
            "90p_latency": parse_latency(search(r"^\s+90%\s+(.+)")),
            "99p_latency": parse_latency(search(r"^\s+99%\s+(.+)")),
            "throughput": float(search(r"^Requests/sec:\s+([0-9.]+)")),
            "transferrate": parse_bytesize(search(r"^Transfer/sec:\s+(.+)")),
            "duration": float(search(r"\d+ requests in ([\d.]+)s,")),
            "cpu": median(sorted(cpu_usages)),
        }


class WebServerRunner:
    comm_port = 40000

    server: WebServer
    ctx: Context
    instance: Instance
    pool: Optional[Pool]

    def __init__(
        self, server: WebServer, ctx: Context, instance: Instance, pool: Optional[Pool]
    ):
        self.server = server
        self.ctx = ctx
        self.instance = instance
        self.pool = pool

        tmpdir = f"/tmp/infra-{server.name}-{instance.name}"

        # Directory where we stage our run directory, which will then be copied
        # to the (node-local) rundir.
        self.stagedir = os.path.join(
            ctx.paths.buildroot, "run-staging", f"{server.name}-{instance.name}"
        )

        if self.pool:
            if isinstance(self.pool, SSHPool):
                tmpdir = self.pool.tempdir
            self.rundir = os.path.join(tmpdir, "run")
            self.logdir = outfile_path(ctx, server, instance)
        else:
            self.rundir = os.path.join(tmpdir, "run")
            self.logdir = os.path.join(tmpdir, "log")

    def logfile(self, outfile: str) -> str:
        return os.path.join(self.logdir, outfile)

    def run_serve(self) -> None:
        if self.pool:
            if not self.ctx.args.duration:
                raise FatalError("need --duration argument")

            self.populate_stagedir()

            server_command = self.bash_command(self.standalone_server_script())
            outfile = self.logfile("server.out")
            self.ctx.log.debug("server will log to " + outfile)
            self.pool.run(
                self.ctx, server_command, jobid="server", nnodes=1, outfile=outfile
            )
        else:
            self.create_logdir()
            self.populate_stagedir()
            self.start_server()

            try:
                self.ctx.log.info("press ctrl-C to kill the server")
                while True:
                    time.sleep(100000)
            except KeyboardInterrupt:
                pass

            self.stop_server()

    def run_test(self) -> None:
        if self.pool:
            self.populate_stagedir()

            server_command = self.bash_command(self.test_server_script())
            outfile = self.logfile("server.out")
            self.ctx.log.debug("server will log to " + outfile)
            self.pool.run(
                self.ctx, server_command, jobid="server", nnodes=1, outfile=outfile
            )

            client_command = self.bash_command(self.test_client_script())
            outfile = self.logfile("client.out")
            self.ctx.log.debug("client will log to " + outfile)
            self.pool.run(
                self.ctx, client_command, jobid="client", nnodes=1, outfile=outfile
            )
        else:
            self.create_logdir()
            self.populate_stagedir()
            self.start_server()
            self.request_and_check_index()
            self.stop_server()

    def _run_bench_over_ssh(self) -> None:
        assert isinstance(self.pool, SSHPool)

        def _start_server() -> None:
            """Start the server for benchmarking, verify it is behaving
            correctly and perfom warmup run."""

            server_cmd = self.server.start_cmd(self, foreground=True)
            server.run(server_cmd, wait=False, env=self.ctx.runenv)

            # Wait for server to come up
            starttime = time.time()
            while time.time() - starttime < 5:
                test_cmd = f"curl -s {url}"
                ret = client.run(test_cmd, allow_error=True)
                if ret["rv"] == 0:
                    break
                time.sleep(0.1)
            else:
                raise RemoteRunnerError("server did not come up")

            server.poll(expect_alive=True)
            with open(os.path.join(self.stagedir, "www/index.html")) as f:
                if ret["stdout"] != f.read():
                    raise RemoteRunnerError("contents of " + url + " do not match")

            # Do a warmup run
            if not self.ctx.args.disable_warmup:
                client.run(
                    f"{wrk_path} --duration 1s --threads {wrk_threads} "
                    f'--connections 400 "{url}"'
                )

            server.poll(expect_alive=True)

        def _run_bench_client(cons: int, it: int) -> None:
            """Run workload on client, and write back the results. Optionally
            monitor statistics of the server and write back those as well."""

            self.ctx.log.info(
                f"Benchmarking {self.server.name} with {cons} connections, #{it}"
            )

            if collect_stats:
                server.start_monitoring(
                    stats=collect_stats, interval=self.ctx.args.collect_stats_interval
                )

            # Allow wrk to return non-zero values, which it does when (some) of
            # the requests are errors. We just go on benchmarking, and let the
            # report command worry about this.
            ret = client.run(
                (
                    f"{wrk_path} "
                    "--latency "
                    f"--duration {wrk_duration}s "
                    f"--connections {cons} "
                    f"--threads {wrk_threads} "
                    f'"{url}"'
                ),
                allow_error=True,
            )

            stats: Dict[str, List[Union[int, float]]] = {}
            if collect_stats:
                stats = server.stop_monitoring()

            # Write results: wrk output and all of our collected stats
            def resfile(base: str) -> str:
                return self.logfile(f"{base}.{cons}.{it}")

            with open(resfile("bench"), "w") as f:
                f.write(ret["stdout"])
            for stat in collect_stats:
                with open(resfile(stat), "w") as f:
                    vals = [str(v) for v in stats[stat]]
                    if isinstance(vals[0], float):
                        vals = ["%.3f" % v for v in vals]
                    f.write("\n".join(map(str, vals + [""])))

        def _kill_server() -> None:
            """Really really kills the running server."""
            server.kill()
            server.wait(timeout=1, allow_error=True)
            forcekillcmd = self.server.kill_cmd(self)
            server.run(forcekillcmd, allow_error=True)

        assert self.rundir.startswith(self.pool.tempdir)

        def tempfile(*p: str) -> str:
            assert isinstance(self.pool, SSHPool)
            return os.path.join(self.pool.tempdir, *p)

        client_node, server_node = self.ctx.args.ssh_nodes
        client_outfile = self.logfile("client_runner.out")
        server_outfile = self.logfile("server_runner.out")
        client_debug_file = "client_runner_debug.out"
        server_debug_file = "server_runner_debug.out"
        rrunner_port_client, rrunner_port_server = 20010, 20011
        rrunner_script = "remote_runner.py"
        rrunner_script_path = tempfile(rrunner_script)
        client_cmd = [
            "python3",
            rrunner_script_path,
            "-p",
            str(rrunner_port_client),
            "-o",
            tempfile(client_debug_file),
        ]
        server_cmd = [
            "python3",
            rrunner_script_path,
            "-p",
            str(rrunner_port_server),
            "-o",
            tempfile(server_debug_file),
        ]
        curdir = os.path.dirname(os.path.abspath(__file__))

        client_host = self.ctx.args.remote_client_host or "localhost"
        server_host = self.ctx.args.remote_server_host or "localhost"

        client_tunnel_dest, server_tunnel_dest = None, None
        if not self.ctx.args.remote_client_host:
            client_tunnel_dest = rrunner_port_client
        if not self.ctx.args.remote_server_host:
            server_tunnel_dest = rrunner_port_server

        url = f"http://{self.ctx.args.server_ip}:{self.ctx.args.port}/index.html"
        wrk_path = Wrk().get_binary_path(self.ctx)
        wrk_threads = self.ctx.args.threads
        wrk_duration = self.ctx.args.duration

        collect_stats = []
        if self.ctx.args.collect_stats:
            collect_stats = ["time"] + self.ctx.args.collect_stats

        has_started_server = False

        # Create local stagedir and transfer files to other nodes.
        self.ctx.log.info("Setting up local and remote files")
        self.populate_stagedir()
        self.pool.sync_to_nodes(self.stagedir, "run")
        self.pool.sync_to_nodes(os.path.join(curdir, rrunner_script))

        # Launch the remote runners so we can easily control each node.
        client_job: Job = list(
            self.pool.run(
                self.ctx,
                client_cmd,
                jobid="client",
                nnodes=1,
                outfile=client_outfile,
                nodes=client_node,
                tunnel_to_nodes_dest=client_tunnel_dest,
            )
        )[0]
        server_job: Job = list(
            self.pool.run(
                self.ctx,
                server_cmd,
                jobid="server",
                nnodes=1,
                outfile=server_outfile,
                nodes=server_node,
                tunnel_to_nodes_dest=server_tunnel_dest,
            )
        )[0]

        assert isinstance(client_job, SSHJob)
        assert isinstance(server_job, SSHJob)

        # Connect to the remote runners. SSH can be slow, so give generous
        # timeout (retry window) so we don't end up with a ConnectionRefused.
        # Client here means "connect to the remote runner server", not the
        # client/server of our webserver setup.
        self.ctx.log.info("Connecting to remote nodes")
        client_port = (
            client_job.tunnel_src if client_tunnel_dest else rrunner_port_client
        )
        server_port = (
            server_job.tunnel_src if server_tunnel_dest else rrunner_port_server
        )
        client = RemoteRunner(
            self.ctx.log, side="client", host=client_host, port=client_port, timeout=10
        )
        server = RemoteRunner(
            self.ctx.log, side="client", host=server_host, port=server_port, timeout=10
        )

        _err: Optional[BaseException] = None
        try:
            # Do some minor sanity checks on the remote file system of server
            server_bin = self.server.server_bin(self.ctx, self.instance)
            if not server.has_file(server_bin):
                raise RemoteRunnerError(
                    "server binary " + server_bin + " not present on server"
                )

            # Copy wrk binary only as needed
            if not client.has_file(wrk_path):
                self.ctx.log.info("wrk binary not found on client, syncing...")
                self.pool.sync_to_nodes(wrk_path)
                wrk_path = tempfile("wrk")

            # Clean up any lingering server. # XXX hacky
            for s in (Nginx, ApacheHttpd, Lighttpd):
                assert hasattr(s, "kill_cmd")
                kill_cmd = s.kill_cmd(self)
                server.run(kill_cmd, allow_error=True)

            # Start actual server and benchmarking!
            for cons in self.ctx.args.connections:
                for it in range(self.ctx.args.iterations):
                    if (
                        not has_started_server
                        or self.ctx.args.restart_server_between_runs
                    ):
                        if has_started_server:
                            _kill_server()
                        _start_server()
                        has_started_server = True

                    _run_bench_client(cons, it)

            _kill_server()

        except RemoteRunnerError as e:
            _err = e
            self.ctx.log.error(f"aborting tests due to error:\n {e}")
        except KeyboardInterrupt as e:
            self.ctx.log.error(
                "Received KeyboardInterrupt, aborting "
                "gracefully...\n"
                "Note that this will wait for the last "
                "benchmark to finish, which may take up to "
                f"{wrk_duration} seconds."
            )
            _err = e

        # Terminate the remote runners and clean up.
        client.close()
        server.close()
        self.pool.wait_all()

        self.ctx.log.info("Done, syncing results to " + self.logdir)
        self.pool.sync_from_nodes(
            client_debug_file, self.logfile(client_debug_file), client_node
        )
        self.pool.sync_from_nodes(
            server_debug_file, self.logfile(server_debug_file), server_node
        )

        self.pool.cleanup_tempdirs()

        if _err:
            raise _err

    def run_bench(self) -> None:
        if not self.pool:
            raise FatalError("need --parallel= argument to run benchmark")
        elif isinstance(self.pool, SSHPool):
            if len(self.ctx.args.ssh_nodes) != 2:
                raise FatalError("need exactly 2 nodes (via --ssh-nodes)")
            if not self.ctx.args.server_ip:
                raise FatalError("need --server-ip")
        elif isinstance(self.pool, ProcessPool):
            self.ctx.log.warn(
                "the client should not run on the same machine "
                "as the server, use prun for benchmarking"
            )

        if not self.ctx.args.duration:
            raise FatalError("need --duration")

        if not self.ctx.args.connections:
            raise FatalError("need --connections")

        for conn in self.ctx.args.connections:
            if conn < self.ctx.args.threads:
                raise FatalError(
                    "#connections must be >= #threads "
                    f"({conn} < {self.ctx.args.threads})"
                )

        # Set up directory for results
        os.makedirs(self.logdir, exist_ok=True)
        self.write_log_of_config()

        if isinstance(self.pool, SSHPool):
            self._run_bench_over_ssh()
        else:
            client_outfile = self.logfile("client.out")
            server_outfile = self.logfile("server.out")

            self.populate_stagedir()

            server_script = self.wrk_server_script()
            server_command = self.bash_command(server_script)
            self.ctx.log.debug("server will log to " + server_outfile)
            self.pool.run(
                self.ctx,
                server_command,
                outfile=server_outfile,
                jobid="server",
                nnodes=1,
            )

            client_command = self.bash_command(self.wrk_client_script())
            self.ctx.log.debug("client will log to " + client_outfile)
            self.pool.run(
                self.ctx,
                client_command,
                outfile=client_outfile,
                jobid="wrk-client",
                nnodes=1,
            )

    def run_bench_server(self) -> None:
        if self.pool:
            raise FatalError("cannot run this command with --parallel")

        self.ctx.log.warn("another machine should run a matching bench-client")
        self.ctx.log.info(f"will log to {self.logdir} (merge with client log)")

        self.populate_stagedir()
        self.write_log_of_config()
        run(self.ctx, self.bash_command(self.wrk_server_script()), teeout=True)

    def run_bench_client(self) -> None:
        if self.pool:
            raise FatalError("cannot run this command with --parallel")

        if not self.ctx.args.duration:
            raise FatalError("need --duration")

        if not self.ctx.args.connections:
            raise FatalError("need --connections")

        if not self.ctx.args.server_ip:
            raise FatalError("need --server-ip and --port")

        for conn in self.ctx.args.connections:
            if conn < self.ctx.args.threads:
                raise FatalError(
                    "#connections must be >= #threads "
                    f"({conn} < {self.ctx.args.threads})"
                )

        self.ctx.log.warn(
            f"matching bench-server should be running at {self.ctx.args.server_ip}"
        )
        self.ctx.log.info(f"will log to {self.logdir} (merge with server log)")

        self.ctx.log.debug("creating log directory")
        os.makedirs(self.logdir, exist_ok=True)
        os.chdir(self.logdir)

        with open(self.logfile("server_host"), "w") as f:
            f.write(self.ctx.args.server_ip + "\n")

        self.write_log_of_config()
        run(self.ctx, self.bash_command(self.wrk_client_script()), teeout=True)

    def write_log_of_config(self) -> None:
        with open(self.logfile("config.txt"), "w") as f:
            with redirect_stdout(f):
                print("server workers:    ", self.ctx.args.workers)
                print("client threads:    ", self.ctx.args.threads)
                print("client connections:", self.ctx.args.connections)
                print("benchmark duration:", self.ctx.args.duration, "seconds")

    def start_server(self) -> None:
        self.ctx.log.info("starting server")
        script = self.wrap_start_script()
        run(self.ctx, self.bash_command(script), teeout=True)

    def stop_server(self) -> None:
        self.ctx.log.info("stopping server")
        script = self.wrap_stop_script()
        run(self.ctx, self.bash_command(script), teeout=True)

    def bash_command(self, script: str) -> Iterable[str]:
        if isinstance(self.pool, PrunPool):
            # escape for passing as: prun ... bash -c '<script>'
            script = script.replace("$", "\\$").replace('"', '\\"')

        return ["bash", "-c", f"set -e; cd {self.logdir}; {script}"]

    def create_logdir(self) -> None:
        assert not self.pool
        if os.path.exists(self.logdir):
            self.ctx.log.debug("removing old log directory " + self.logdir)
            shutil.rmtree(self.logdir)
        self.ctx.log.debug("creating log directory " + self.logdir)
        os.makedirs(self.logdir)

    def populate_stagedir(self) -> None:
        if os.path.exists(self.stagedir):
            self.ctx.log.debug("removing old staging run directory " + self.stagedir)
            shutil.rmtree(self.stagedir)

        self.ctx.log.debug("populating local staging run directory")
        os.makedirs(self.stagedir, exist_ok=True)
        os.chdir(self.stagedir)

        os.makedirs("www", exist_ok=True)
        with open("www/index.html", "w") as f:
            chars = string.printable
            filesize = parse_filesize(self.ctx.args.filesize)
            f.write("".join(random.choice(chars) for i in range(filesize)))

        self.server.populate_stagedir(self)

    def request_and_check_index(self) -> None:
        assert not self.pool
        url = f"http://localhost:{self.ctx.args.port}/index.html"
        self.ctx.log.info("requesting " + url)
        urlretrieve(url, "requested_index.html")

        with open(os.path.join(self.rundir, "www", "index.html"), "rb") as f:
            expected = f.read()
        with open("requested_index.html", "rb") as f:
            got = f.read()

        if got != expected:
            self.stop_server()
            raise FatalError("content does not match generated index.html")
        self.ctx.log.info("contents of index.html are correct")

    def wrap_start_script(self) -> str:
        start_script = self.server.start_script(self)
        host_command = "echo localhost"
        if isinstance(self.pool, PrunPool):
            # get the infiniband network IP
            host_command = 'ifconfig ib0 2>/dev/null | grep -Po "(?<=inet )[^ ]+"'
        return f"""
        echo "=== creating local run directory"
        rm -rf "{self.rundir}"
        cp -r {self.stagedir} {self.rundir}

        echo "=== starting web server"
        {start_script}
        server_host="$({host_command})"
        echo "=== serving at $server_host:{self.ctx.args.port}"
        """

    def wrap_stop_script(self) -> str:
        stop_script = self.server.stop_script(self)
        return f"""
        echo "=== received stop signal, stopping web server"
        {stop_script}

        if [ -s "{self.rundir}/error.log" ]; then
            echo "=== there were errors, copying log to {self.logdir}/error.log"
            cp "{self.rundir}/error.log" .
        fi

        echo "=== removing local run directory"
        rm -rf "{self.rundir}"
        """

    def server_script(self, body_template: str) -> str:
        start_script = self.wrap_start_script()
        stop_script = self.wrap_stop_script()
        return f"""
        comm_recv() {{ netcat --close -l -p {self.comm_port} || true; }}

        {start_script}

        echo "=== writing hostname to file"
        echo "$server_host" > server_host
        sync

        {body_template}

        {stop_script}
        """

    def client_script(self, body_template: str) -> str:
        return f"""
        comm_send() {{
            read msg
            while ! netcat --close "$server_host" {self.comm_port} \\
                    <<< "$msg" 2>/dev/null; do :; done
        }}

        echo "=== waiting for server to write its IP to file"
        while [ ! -e server_host ]; do sleep 0.1; sync; done
        server_host="$(cat server_host)"

        {body_template}

        echo "=== sending stop signal to server"
        comm_send <<< stop
        """

    def test_server_script(self) -> str:
        return self.server_script(f"""
        echo "=== copying index.html to log directory for client"
        cp "{self.rundir}/www/index.html" .

        echo "=== waiting for stop signal from client"
        test "$(comm_recv)" = stop
        """)

    def test_client_script(self) -> str:
        return (
            self.client_script(f"""
        url="http://$server_host:{self.ctx.args.port}/index.html"
        echo "=== requesting $url"
        wget -q -O requested_index.html "$url"
        """)
            + """
        if diff -q index.html requested_index.html; then
            echo "=== contents of index.html are correct"
        else
            echo "=== ERROR: content mismatch:"
            echo "  $(pwd)/requested_index.html"
            echo "does not match:"
            echo "  $(pwd)/index.html"
            exit 1
        fi
        """
        )

    def wrk_server_script(self) -> str:
        duration = self.ctx.args.duration
        return self.server_script(f"""
        echo "=== waiting for first work rate"
        rate="$(comm_recv)"
        while [ "$rate" != stop ]; do
            echo "=== logging cpu usage to cpu.$rate for {duration} seconds"
            {{ timeout {duration} mpstat 1 {duration} || true; }} | \\
                    awk 'BEGIN {{idle=13}}
                         /%idle/ {{for(i=1;i<=NF;i++) if($i == "%idle") idle=i}}
                         /^[0-9].+all/ {{print 100-$idle; fflush()}}' \\
                    > "cpu.$rate"

            echo "=== waiting for next work rate"
            rate="$(comm_recv)"
        done
        """)

    def wrk_client_script(self) -> str:
        conns = " ".join(str(c) for c in self.ctx.args.connections)
        a = self.ctx.args
        return self.client_script(f"""
        url="http://$server_host:{a.port}/index.html"
        echo "=== will benchmark $url for {a.duration} seconds for each work rate"

        echo "=== 3 second warmup run"
        wrk --duration 3s --threads {a.threads} --connections 400 "$url"

        for i in $(seq 1 1 {a.iterations}); do
            for connections in {conns}; do
                if [ {a.cleanup_time} -gt 0 ]; then
                    echo "=== waiting {a.cleanup_time} seconds for server to clean up"
                    sleep {a.cleanup_time}
                fi

                echo "=== sending work rate $connections.$i to server"
                comm_send <<< "$connections.$i"

                echo "=== starting benchmark"
                set -x
                wrk --duration {a.duration}s --connections $connections \\
                        --threads {a.threads} --latency "$url" \\
                        > bench.$connections.$i
                set +x
            done
        done
        """)

    def standalone_server_script(self) -> str:
        duration = self.ctx.args.duration
        return self.server_script(f"""
        echo "=== logging cpu usage to cpu for {duration} seconds"
        {{ timeout {duration} mpstat 1 {duration} || true; }} | \\
                awk '/^[0-9].+all/ {{print 100-$13; fflush()}}' \\
                > cpu
        """)


[docs]class Nginx(WebServer): """ The Nginx web server. :name: nginx :param version: which (open source) version to download """ #: :class:`list` Command line arguments for the built-in ``-allocs`` pass; #: Registers custom allocation function wrappers in Nginx. custom_allocs_flags = [ "-allocs-custom-funcs=" + ".".join( ( "ngx_alloc:malloc:0", "ngx_palloc:malloc:1", "ngx_palloc_small:malloc:1", "ngx_palloc_large:malloc:1", ) ) ] version: str def __init__(self, version: str, build_flags: List[str] = []): super().__init__() self.build_flags = build_flags self.version = version self.name = "nginx-" + version def fetch(self, ctx: Context) -> None: download(ctx, "https://nginx.org/download/" + self.tar_name()) def is_fetched(self, ctx: Context) -> bool: return os.path.exists(self.tar_name()) def tar_name(self) -> str: return "nginx-" + self.version + ".tar.gz" def build( self, ctx: Context, instance: Instance, pool: Optional[Pool] = None ) -> None: if not os.path.exists(instance.name): ctx.log.debug("unpacking nginx-" + self.version) shutil.rmtree("nginx-" + self.version, ignore_errors=True) untar(ctx, self.tar_name(), instance.name, remove=False) # Configure if there is no Makefile or if flags changed os.chdir(instance.name) if self.should_configure(ctx): ctx.log.debug("no Makefile or flags changed, reconfiguring") run( ctx, [ "./configure", "--with-cc=" + ctx.cc, "--with-cc-opt=" + qjoin(ctx.cflags), "--with-ld-opt=" + qjoin(ctx.ldflags), *self.build_flags, ], ) else: ctx.log.debug("same flags as before, skip reconfigure") run(ctx, ["make", f"-j{ctx.jobs}", "--always-make"]) def should_configure(self, ctx: Context) -> bool: if not os.path.exists("Makefile"): return True try: with open("flags_hash") as f: old_hash = f.read() except FileNotFoundError: old_hash = None new_hash = self.hash_flags(ctx) if new_hash == old_hash: return False with open("flags_hash", "w") as f: f.write(new_hash) return True def hash_flags(self, ctx: Context) -> str: h = md5() h.update(b"CC=" + ctx.cc.encode("ascii")) h.update(b"\nCFLAGS=" + qjoin(ctx.cflags).encode("ascii")) h.update(b"\nLDFLAGS=" + qjoin(ctx.ldflags).encode("ascii")) return h.hexdigest() def server_bin(self, ctx: Context, instance: Instance) -> str: return self.path(ctx, instance.name, "objs", "nginx") def binary_paths(self, ctx: Context, instance: Instance) -> Iterator[str]: yield self.server_bin(ctx, instance) def add_run_args(self, parser: argparse.ArgumentParser) -> None: super().add_run_args(parser) parser.add_argument( "--workers", type=int, default=1, help="number of worker processes (default 1)", ) parser.add_argument( "--worker-connections", type=int, default=1024, help="number of connections per worker process (default 1024)", ) def populate_stagedir(self, runner: WebServerRunner) -> None: # Nginx needs the logs/ dir to create the default error log before # processing the error_logs directive os.makedirs("logs", exist_ok=True) runner.ctx.log.debug("creating nginx.conf") a = runner.ctx.args if os.path.exists(a.config): runner.ctx.log.debug(f"Found configuration file: {a.config}") shutil.copyfile(a.config, "nginx.conf") return config_template = f""" error_log {runner.rundir}/error.log error; lock_file {runner.rundir}/nginx.lock; pid {runner.rundir}/nginx.pid; worker_processes {a.workers}; worker_cpu_affinity auto; events {{ worker_connections {a.worker_connections}; use epoll; }} http {{ server {{ listen {a.port}; server_name localhost; sendfile on; access_log off; keepalive_requests 500; keepalive_timeout 500ms; location / {{ root {runner.rundir}/www; }} }} }} """ with open("nginx.conf", "w") as f: f.write(config_template) def pid_file(self, runner: WebServerRunner) -> str: return f"{runner.rundir}/nginx.pid" def start_cmd(self, runner: WebServerRunner, foreground: bool = False) -> str: nginx = self.server_bin(runner.ctx, runner.instance) runopt = '-g "daemon off;"' if foreground else "" if runner.ctx.args.nofork: runopt = '-g "daemon off; master_process off;"' return f'{nginx} -p "{runner.rundir}" -c nginx.conf {runopt}' def stop_cmd(self, runner: WebServerRunner) -> str: nginx = self.server_bin(runner.ctx, runner.instance) return f'{nginx} -p "{runner.rundir}" -c nginx.conf -s quit' @staticmethod def kill_cmd(runner: WebServerRunner) -> str: return "pkill -9 nginx"
[docs]class ApacheHttpd(WebServer): """ Apache web server. Builds APR and APR Util libraries as binary dependencies. :name: apache :param version: apache httpd version :param apr_version: APR version :param apr_util_version: APR Util version :param module: a list of modules to enable (default: "few", any modules will be statically linked) """ #: :class:`list` Command line arguments for the built-in ``-allocs`` pass; #: Registers custom allocation function wrappers in Apache. custom_allocs_flags = [ "-allocs-custom-funcs=" + ".".join( ( "apr_palloc:malloc:1", "apr_palloc_debug:malloc:1", "apr_pcalloc:calloc:1", "apr_pcalloc_debug:calloc:1", ) ) ] def __init__( self, version: str, apr_version: str, apr_util_version: str, modules: Iterable[str] = ["few"], build_flags: List[str] = [], ): self.version = version self.apr_version = apr_version self.apr_util_version = apr_util_version self.modules = modules self.name = "apache-" + version self.build_flags = build_flags self.modules = modules super().__init__() def fetch(self, ctx: Context) -> None: _fetch_apache(ctx, "httpd", "httpd-" + self.version, "src") _fetch_apache(ctx, "apr", "apr-" + self.apr_version, "src/srclib/apr") _fetch_apache( ctx, "apr", "apr-util-" + self.apr_util_version, "src/srclib/apr-util" ) def is_fetched(self, ctx: Context) -> bool: return os.path.exists("src") def build( self, ctx: Context, instance: Instance, pool: Optional[Pool] = None ) -> None: # create build directory objdir = os.path.join(instance.name, "obj") if os.path.exists(objdir): ctx.log.debug("removing old object directory " + objdir) shutil.rmtree(objdir) ctx.log.debug("creating object directory " + objdir) os.makedirs(objdir) os.chdir(objdir) # set environment for configure scripts prefix = self.path(ctx, instance.name, "install") env = { "CC": ctx.cc, "CFLAGS": qjoin(ctx.cflags), "LDFLAGS": qjoin(ctx.lib_ldflags), "HTTPD_LDFLAGS": qjoin(ctx.ldflags), "AR": ctx.ar, "RANLIB": ctx.ranlib, } # build APR ctx.log.info(f"building {self.name}-{instance.name}-apr") os.mkdir("apr") os.chdir("apr") run( ctx, [ "../../../src/srclib/apr/configure", "--prefix=" + prefix, "--enable-static", "--enable-shared=no", ], env=env, ) run(ctx, f"make -j{ctx.jobs}") run(ctx, "make install") os.chdir("..") # build APR-Util ctx.log.info(f"building {self.name}-{instance.name}-apr-util") os.mkdir("apr-util") os.chdir("apr-util") run( ctx, [ "../../../src/srclib/apr-util/configure", "--prefix=" + prefix, "--with-apr=" + prefix, ], env=env, ) run(ctx, f"make -j{ctx.jobs}") run(ctx, "make install") os.chdir("..") # build httpd web server ctx.log.info(f"building {self.name}-{instance.name}-httpd") os.mkdir("httpd") os.chdir("httpd") run( ctx, [ "../../../src/configure", "--prefix=" + prefix, "--with-apr=" + prefix, "--with-apr-util=" + prefix, "--enable-modules=none", # only build static "--enable-mods-static=" + qjoin(self.modules), *self.build_flags, ], env=env, ) run(ctx, f"make -j{ctx.jobs}") run(ctx, "make install") os.chdir("..") def server_bin(self, ctx: Context, instance: Instance) -> str: return self.path(ctx, instance.name, "install", "bin", "httpd") def binary_paths(self, ctx: Context, instance: Instance) -> Iterator[str]: yield self.server_bin(ctx, instance) def add_run_args(self, parser: argparse.ArgumentParser) -> None: super().add_run_args(parser) nproc = cpu_count() parser.add_argument( "--workers", type=int, default=nproc, help=f"number of worker processes (ServerLimit, default {nproc})", ) parser.add_argument( "--worker-threads", type=int, default=25, help=( "number of connection threads per worker process " "(ThreadsPerChild, default 25)" ), ) def populate_stagedir(self, runner: WebServerRunner) -> None: runner.ctx.log.debug("copying base config") rootdir = self.path(runner.ctx, runner.instance.name, "install") copytree(rootdir, runner.stagedir) a = runner.ctx.args if os.path.exists(a.config): runner.ctx.log.debug(f"Found configuration file: {a.config}") shutil.copyfile(a.config, "conf/httpd.conf") return runner.ctx.log.debug("creating httpd.conf from template") total_threads = a.workers * a.worker_threads config_template = f""" Listen {a.port} ErrorLog error.log PidFile apache.pid ServerName localhost DocumentRoot www ServerLimit {a.workers} StartServers {a.workers} ThreadsPerChild {a.worker_threads} ThreadLimit {a.worker_threads} MaxRequestWorkers {total_threads} MaxSpareThreads {total_threads} KeepAlive On KeepAliveTimeout 500ms MaxKeepAliveRequests 500 EnableSendfile On Timeout 1 """ with open("conf/httpd.conf", "w") as f: f.write(config_template) def pid_file(self, runner: WebServerRunner) -> str: return f"{runner.rundir}/apache.pid" def start_cmd(self, runner: WebServerRunner, foreground: bool = False) -> str: httpd = self.path(runner.ctx, runner.instance.name, "install", "bin", "httpd") runopt = "-D FOREGROUND" if foreground else "-k start" return f'{httpd} -d "{runner.rundir}" {runopt}' def stop_cmd(self, runner: WebServerRunner) -> str: httpd = self.path(runner.ctx, runner.instance.name, "install", "bin", "httpd") return f'{httpd} -d "{runner.rundir}" -k stop' @staticmethod def kill_cmd(runner: WebServerRunner) -> str: return "pkill -9 httpd"
[docs]class Lighttpd(WebServer): """ TODO: docs """ def __init__(self, version: str): self.version = version self.name += "lighttpd-" + version super().__init__() def dependencies(self) -> Iterator[Package]: yield from super().dependencies() yield Scons.default() def fetch(self, ctx: Context) -> None: m = re.match(r"(\d+\.\d+)\.\d+", self.version) assert m minor_version = m.group(1) download( ctx, ( "https://download.lighttpd.net/lighttpd/" f"releases-{minor_version}.x/{self.tar_name()}" ), ) def is_fetched(self, ctx: Context) -> bool: return os.path.exists(self.tar_name()) def tar_name(self) -> str: return "lighttpd-" + self.version + ".tar.gz" def build( self, ctx: Context, instance: Instance, pool: Optional[Pool] = None ) -> None: if not os.path.exists(instance.name): ctx.log.debug("unpacking lighttpd-" + self.version) shutil.rmtree("lighttpd-" + self.version, ignore_errors=True) untar(ctx, self.tar_name(), instance.name, remove=False) os.chdir(instance.name) # remove old build directory to force a rebuild if os.path.exists("sconsbuild"): ctx.log.debug("removing old sconsbuild directory") shutil.rmtree("sconsbuild") path = join_env_paths(ctx.runenv).get("PATH", "") cc = shutil.which(ctx.cc, path=path) assert cc env: Dict[str, Union[str, List[str]]] = { "CFLAGS": qjoin(ctx.cflags), "LDFLAGS": qjoin(ctx.ldflags), "ASAN_OPTIONS": "detect_leaks=0", # Lighttphd suffers from memory leaks } run( ctx, [ "scons", "-j", ctx.jobs, "CC=" + cc, "with_pcre=no", "build_static=yes", "build_dynamic=no", ], env=env, ) def server_bin(self, ctx: Context, instance: Instance) -> str: return self.path( ctx, instance.name, "sconsbuild", "static", "build", "lighttpd" ) def binary_paths(self, ctx: Context, instance: Instance) -> Iterator[str]: yield self.server_bin(ctx, instance) def add_run_args(self, parser: argparse.ArgumentParser) -> None: super().add_run_args(parser) parser.add_argument( "--workers", type=int, default=1, help="number of worker processes (default 1)", ) parser.add_argument( "--server-connections", type=int, default=2048, help="number of concurrent connections to the server (default 2048)", ) def populate_stagedir(self, runner: WebServerRunner) -> None: a = runner.ctx.args if os.path.exists(a.config): runner.ctx.log.debug(f"Found configuration file: {a.config}") runner.ctx.log.debug(f"Port: {a.port}") shutil.copyfile(a.config, "lighttpd.conf") return runner.ctx.log.debug("creating lighttpd.conf from template") max_fds = 2 * a.server_connections config_template = f""" var.rundir = "{runner.rundir}" server.port = {a.port} server.document-root = var.rundir + "/www" server.errorlog = var.rundir + "/error.log" server.pid-file = var.rundir + "/lighttpd.pid" server.event-handler = "linux-sysepoll" server.network-backend = "sendfile" server.max-worker = {a.workers} server.max-connections = {a.server_connections} server.max-fds = {max_fds} server.max-keep-alive-requests = 500 server.max-keep-alive-idle = 1 server.max-read-idle = 1 server.max-write-idle = 1 """ with open("lighttpd.conf", "w") as f: f.write(config_template) def stop_script(self, runner: WebServerRunner) -> str: return f""" kill $(cat "{runner.rundir}/lighttpd.pid") """ def pid_file(self, runner: WebServerRunner) -> str: return f"{runner.rundir}/lighttpd.pid" def start_cmd(self, runner: WebServerRunner, foreground: bool = False) -> str: lighttpd = self.server_bin(runner.ctx, runner.instance) runopt = "-D" if foreground else "" return f'{lighttpd} -f "{runner.rundir}/lighttpd.conf" {runopt}' def stop_cmd(self, runner: WebServerRunner) -> str: # TODO better to read pidfile return "pkill lighttpd" @staticmethod def kill_cmd(runner: WebServerRunner) -> str: return "pkill -9 lighttpd"
def median_absolute_deviation(numbers: Sequence[float]) -> float: assert len(numbers) > 0 med = median(numbers) return median(abs(x - med) for x in numbers) def stdev_percent(numbers: Sequence[float]) -> float: return 100 * pstdev(numbers) / mean(numbers) def _fetch_apache(ctx: Context, repo: str, basename: str, dest: str) -> None: tarname = basename + ".tar.bz2" download(ctx, f"https://archive.apache.org/dist/{repo}/{tarname}") untar(ctx, tarname, dest) def copytree(src: str, dst: str) -> None: """Wrapper for shutil.copytree, which does not have dirs_exist_ok until python 3.8.""" for item in os.listdir(src): s = os.path.join(src, item) d = os.path.join(dst, item) if os.path.isdir(s): shutil.copytree(s, d) else: shutil.copy2(s, d) def parse_filesize(filesize: str) -> int: """Convert a size in human-readable form to bytes (e.g., 4K, 2G).""" if isinstance(filesize, int): return filesize if not isinstance(filesize, str): raise FatalError("unsupported filesize type " + repr(filesize)) factors = {"": 1, "K": 1024, "M": 1024 * 1024, "G": 1024 * 1024 * 1024} filesize = filesize.upper() factor = "" if filesize[-1] not in string.digits: filesize, factor = filesize[:-1], filesize[-1] return int(filesize) * factors[factor]