Source code for beat.core.dock

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################


"""
====
dock
====

Docker helper classes
"""
import ast
import logging
import os
import socket
import subprocess as sp  # nosec
import tempfile
import time

import docker
import simplejson as json

from packaging import version

from beat.core import stats

from .utils import build_env_name

logger = logging.getLogger(__name__)


[docs]class Host(object): """An object of this class can connect to the docker host and resolve stuff""" images_cache = {} def __init__(self, images_cache=None, raise_on_errors=True, discover=True): # Initialisations self.raise_on_errors = raise_on_errors self.images_cache_filename = images_cache self.base_url = None self.containers = [] self.processing_environments = {} self.db_environments = {} # (If necessary) Load the known infos about the images if (self.images_cache_filename is not None) and os.path.exists( self.images_cache_filename ): with open(self.images_cache_filename, "r") as f: Host.images_cache = json.load(f) # Discover the environments if discover: ( self.processing_environments, self.db_environments, ) = self._discover_environments_using_labels() if not self.db_environments and not self.processing_environments: ( self.processing_environments, self.db_environments, ) = self._discover_environments_using_describe() # (If necessary) Save the known infos about the images if self.images_cache_filename is not None: with open(self.images_cache_filename, "w") as f: json.dump(Host.images_cache, f, indent=4) def __contains__(self, key): return (key in self.processing_environments) or (key in self.db_environments) def __str__(self): s = "Docker host" if self.base_url is not None: s += " (%s)" % self.base_url return s
[docs] def env2docker(self, key): """Returns a nice docker image name given a BEAT environment key""" attrs = self.processing_environments[key] return attrs["image"]
[docs] def db2docker(self, db_names): """Returns a nice docker image name given a database name""" def _all_in(db_names, databases): return len([x for x in db_names if x in databases]) == len(db_names) attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x["databases"]) ][0] return attrs["image"]
[docs] def dbenv2docker(self, key): """Returns a nice docker image name given a BEAT database environment key""" attrs = self.db_environments[key] return attrs["image"]
[docs] def teardown(self): for container in self.containers: self.rm(container) self.containers = []
def __exit__(self, *exc): self.teardown()
[docs] def full_environment_name(self, name): try: return list( filter( lambda x: x.startswith(name + " ("), self.processing_environments.keys(), ) )[0] except IndexError: try: return list( filter( lambda x: x.startswith(name + " ("), self.db_environments.keys() ) )[0] except IndexError: return None
@property def ip(self): """The IP address of the docker host""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets return s.getsockname()[0] def _discover_environments_using_describe(self): """Returns a dictionary containing information about docker environments Raises: RuntimeError: if you set ``raise_on_errors`` in the constructor and I found environments that override each other for their description keys (``<name>(<version>)``), which should be unique. """ def _describe(image): """Tries to run the "describe" app on the image, collect results""" if image in Host.images_cache: return Host.images_cache[image] cmd = ["docker", "run", "--rm=true", image, "describe"] (status, stdout, stderr) = self._exec(cmd) if status == 0: try: infos = json.loads(stdout) Host.images_cache[image] = infos return infos except Exception as e: logger.warning( "Ignoring potential environment at `%s' since " "`describe' output cannot be parsed: %s", image, str(e), ) else: logger.warning( "Execution failed with status {}: \n" "stdout: '{}'\n" "stderr: '{}'".format(status, stdout, stderr) ) return {} def _must_replace(image, environments, key): # this check avoids we do a new environment and, by mistake, # override it with a previous version or the contrary. if self.raise_on_errors: raise RuntimeError( "Environments at '%s' and '%s' have the " "same name ('%s'). Distinct environments must be " "uniquely named. Fix this and re-start." % (image, environments[key]["image"], key) ) new_version = None previous_version = None parts = image.split("/") if len(parts) > 1: parts = parts[-1].split(":") if len(parts) > 1: new_version = parts[-1] parts = environments[key]["image"].split("/") if len(parts) > 1: parts = parts[-1].split(":") if len(parts) > 1: previous_version = parts[-1] replacement = False keep = False if (new_version is not None) and (previous_version is not None): if new_version == "latest": replacement = True elif previous_version == "latest": keep = True else: try: new_version = tuple([int(x) for x in new_version.split(".")]) try: previous_version = tuple( [int(x) for x in previous_version.split(".")] ) if new_version > previous_version: replacement = True else: keep = True except Exception: replacement = True except Exception: keep = True elif new_version is not None: replacement = True elif previous_version is not None: keep = True if replacement: logger.debug( "Overriding **existing** environment '%s' in image '%s'", key, environments[key]["image"], ) elif keep: logger.debug( "Environment '%s' already existing in image '%s', we'll keep it", key, environments[key]["image"], ) return False else: logger.warning( "Overriding **existing** environment '%s' image " "with '%s'. To avoid this warning make " "sure your docker images do not contain environments " "with the same names", key, environments[key]["image"], ) return True environments = {} db_environments = {} cmd = ["docker", "images", "--format", "{{.Repository}}:{{.Tag}}"] (status, stdout, stderr) = self._exec(cmd) if status != 0: logger.error( "Failed to retrieve the list of docker images, reason:\n\n%s", stderr ) return (environments, db_environments) images = [x for x in stdout.split("\n") if x.find("beat.env.") >= 0] for image in images: # Call the "describe" application on each existing image description = _describe(image) if not description: logger.debug("Description not found for", image) continue key = build_env_name(description) if "databases" in description: if (key in db_environments) and not _must_replace( image, db_environments, key ): continue db_environments[key] = description db_environments[key]["image"] = image else: if (key in environments) and not _must_replace( image, environments, key ): continue environments[key] = description environments[key]["image"] = image logger.info("Registered '%s' -> '%s'", key, image) logger.debug( "Found %d environments and %d database environments", len(environments), len(db_environments), ) return (environments, db_environments) def _discover_environments_using_labels(self): """Search BEAT runtime environments using docker labels""" def _must_replace(key, image, environments): environment = environments[key] if environment["image"] not in image.tags: logger.warning( "Different images providing the same environment: {} VS {}".format( environment["image"], image.tags ) ) if self.raise_on_errors: raise RuntimeError( "Environments at '%s' and '%s' have the " "same name ('%s'). Distinct environments must be " "uniquely named. Fix this and re-start." % (image.tags[0], environments[key]["image"], key) ) else: logger.debug("Keeping more recent") current_version = "{}{}".format( environment["version"], environment["revision"] ) new_version = "{}{}".format( image.labels["beat.env.version"], image.labels["beat.env.revision"] ) current_version = version.parse(current_version) new_version = version.parse(new_version) return new_version > current_version def _parse_image_info(image): labels = image.labels data = { "image": image.tags[0], "name": labels["beat.env.name"], "version": labels["beat.env.version"], "revision": labels["beat.env.revision"], } database_list = labels.get("beat.env.databases") if database_list: data["databases"] = ast.literal_eval(database_list) capabilities = labels.get("beat.env.capabilities") if capabilities: data["capabilities"] = ast.literal_eval(capabilities) return data def _process_image_list(image_list): environments = {} for image in image_list: if not len(image.tags): logger.warning("Untagged image, skipping") continue image_info = _parse_image_info(image) key = build_env_name(image_info) image_name = image_info["image"] if key in environments: if _must_replace(key, image, environments): environments[key] = image_info logger.info("Updated '%s' -> '%s'", key, image_name) else: environments[key] = image_info Host.images_cache[image_name] = environments[key] logger.info("Registered '%s' -> '%s'", key, image_name) return environments try: client = docker.from_env() except Exception as e: if self.raise_on_errors: raise else: logger.error("Docker client creation failed {}".format(e)) return {}, {} try: databases = client.images.list( filters={"label": ["beat.env.type=database"]} ) except Exception as e: if self.raise_on_errors: raise else: logger.error("Docker error: {}".format(e)) return {}, {} else: db_environments = _process_image_list(databases) try: executors = client.images.list( filters={"label": ["beat.env.type=execution"]} ) except Exception as e: if self.raise_on_errors: raise else: logger.error("Docker error: {}".format(e)) return {}, {} else: environments = _process_image_list(executors) logger.debug( "Found %d environments and %d database environments", len(environments), len(db_environments), ) return environments, db_environments
[docs] def create_container(self, image, command): if image in self: # Replace by a real image name image = self.env2docker(image) return Container(image, command)
[docs] def start(self, container, virtual_memory_in_megabytes=0, max_cpu_percent=0): """Starts the execution of a container The process will be run in the background, and its standard output and standard error will be read after it finishes, into a limited size circular buffer. Parameters: container (:py:class:`Container`): The container. virtual_memory_in_megabytes (:py:class:`int`, Optional): The maximum amount of memory the user process can consume on the host. If not specified, a memory limit is not set. max_cpu_percent (:py:class:`float`, Optional): The maximum amount of CPU the user process may consume on the host. The value ``100`` equals to using 100% of a single core. If not specified, then a CPU limitation is not put in place. """ cmd = ["docker", "run", "--tty", "--interactive", "--detach", "--read-only"] network = container.network if network: cmd.append(network) user = container.user if user: cmd.append(user) name = container.name if name: cmd.append(name) workdir = container.workdir if workdir: cmd.append(workdir) entrypoint = container.entrypoint if entrypoint: cmd.append(entrypoint) if container.image in Host.images_cache: image_infos = Host.images_cache[container.image] if ("capabilities" in image_infos) and ( "gpu" in image_infos["capabilities"] ): if os.path.exists("/proc/driver/nvidia"): cmd.append("--gpus=all") if virtual_memory_in_megabytes: # For this to work properly, memory swap limitation has to be #  enabled on the kernel. This typically goes by setting # "cgroup_enable=memory" as a boot parameter to kernels which are # compiled with this support. # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting logger.debug("Setting maximum memory to %dMB" % virtual_memory_in_megabytes) cmd.append("--memory=%dm" % virtual_memory_in_megabytes) cmd.append("--memory-swap=%dm" % virtual_memory_in_megabytes) if max_cpu_percent: # The period corresponds to the scheduling interval for the CFS in # Linux. The quota corresponds to a fraction or a multiple of the #  period, the container will get. A quota that is 2x the period # gets the container up to 200% cpu time (2 cores). If the quota is # 0.5x the period, the container gets up to 50% the cpu time. Each # core represents 100%. A system with 2 cores has 200% computing # power. # # More info: # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt # # For this to work properly, CPU bandwidth provisioning for the # Linux CFS must be enabled on the kernel. More info on how to do # it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/ # # If your system is running on a virtual machine, having more cores # available to docker engine normally translates to more precise # scheduling. period = 100000 # microseconds quota = max_cpu_percent / 100.0 logger.debug("Setting CPU quota to %d%%" % max_cpu_percent) cmd.append("--cpu-period=%d" % period) cmd.append("--cpu-quota=%d" % int(quota * period)) # Mount the volumes cmd.extend(container.volumes) # Add tmpfs entries cmd.extend(container.temporary_filesystems) # Expose the ports cmd.extend(container.ports) # Environment variables cmd.extend(container.environment_variables) cmd.append(container.image) cmd.extend(container.command) # Instantiate the container logger.debug( 'Creation and start of a container: image=%s, command="%s"', container.image, " ".join(container.command), ) logger.debug("Docker command: %s", " ".join(cmd)) (status, stdout, stderr) = self._exec(cmd) if stdout != "": container.id = stdout.replace("\n", "") self.containers.append(container) if status != 0: message = "Failed to create the container, reason:\n\n%s" % stderr logger.error(message) if container.id is not None: self.rm(container) raise RuntimeError(message) logger.debug("Container ID: '%s'", container.id)
[docs] def wait(self, container, timeout=None): """Wait for the container to finish its job Parameters: timeout (:py:class:`float`, Optional): A timeout in seconds to wait for the user process to finish. If a timeout value is not given, waits forever. """ (status, stdout, stderr) = self._exec( ["docker", "wait", container.id], timeout=timeout ) if status != 0: return None return int(stdout)
[docs] def status(self, container): """Checks the status of a given container""" logger.debug("Inspect container %s", container.id) (status, stdout, stderr) = self._exec(["docker", "inspect", container.id]) if status != 0: return None try: return json.loads(stdout)[0]["State"]["Status"] except Exception: return None
[docs] def logs(self, container): """Returns the logs of a container""" (status, stdout, stderr) = self._exec(["docker", "logs", container.id]) if status != 0: return "" return stdout.replace("\r\n", "\n")
[docs] def statistics(self, container): """Returns the statistics about a container""" client = docker.APIClient() data = client.stats(container.id, stream=False) # If CPU statistics can't be retrieved if "system_cpu_usage" not in data["cpu_stats"]: data["cpu_stats"] = dict(data["precpu_stats"]) # If memory statistics can't be retrieved if len(data["memory_stats"]) == 0: data["memory_stats"] = dict(limit=0, max_usage=0) previous_cpu = container._stats["cpu_stats"] if container._stats else None # merge statistics retval = dict( cpu=stats.cpu_statistics(previous_cpu, data["cpu_stats"]), memory=stats.memory_statistics(data["memory_stats"]), ) container._stats = data return retval
[docs] def rm(self, container): """Removes a given container. If it is not done, kill it first""" if container.id is None: return status = self.status(container) if status not in ("created", "exited"): logger.warning( "Killing container '%s' which is on state '%s'", container.id, status ) self._exec(["docker", "container", "stop", container.id]) logger.debug("Remove container %s", container.id) (status, stdout, stderr) = self._exec(["docker", "rm", container.id]) self.containers.remove(container) container.id = None
[docs] def kill(self, container): """Stop a container""" if self.status(container) == "running": self._exec(["docker", "kill", container.id])
[docs] def run(self, image, command): """Runs a command and retrieves its status and output""" container = None try: container = self.create_container(image, command) self.start(container) status = self.wait(container) output = self.logs(container) except Exception: return 1, None finally: if container is not None: self.rm(container) return status, output
[docs] def get_ipaddress(self, container): """Returns the ip address of the given container""" cmd = [ "docker", "inspect", "--format", "{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}", container.id, ] (status, stdout, stderr) = self._exec(cmd) if status != 0: logger.error( "Failed to retrieve the ip address of the container, reason:\n\n%s", stderr, ) return None return stdout.replace("\n", "")
def _exec(self, command, timeout=None): process_stdout = tempfile.NamedTemporaryFile() process_stderr = tempfile.NamedTemporaryFile() def _read_streams(): with open(process_stdout.name, "r") as f: stdout = f.read() with open(process_stderr.name, "r") as f: stderr = f.read() return (stdout, stderr) try: process = sp.Popen(command, stdout=process_stdout, stderr=process_stderr) except IOError as e: if self.raise_on_errors: raise logger.error("Docker can not be found") return (-1, None, e) if timeout is None: process.communicate() else: start = time.time() while process.poll() is None: time.sleep(0.1) if time.time() - start >= timeout: process.kill() (stdout, stderr) = _read_streams() return (None, stdout, stderr) (stdout, stderr) = _read_streams() return (process.returncode, stdout, stderr)
# ----------------------------------------------------------
[docs]class Container: """This class represents a Docker container with its set of parameters Parameters: :param str image: Name of the image to use to build the container :param str command: Command to execute in the container. """ def __init__(self, image, command): self.image = image self.command = command self.network_name = None self.uid = None self.id = None self._volumes = {} self._ports = {} self._environment_variables = {} self._stats = None self._name = None self._workdir = None self._entrypoint = None self._temporary_filesystems = {"/tmp": "500k", "/run": "500k"} # nosec client = docker.from_env() docker_image = client.images.get(image) custom_tmpfs = docker_image.labels.get("beat.env.custom_tmpfs") if custom_tmpfs is not None: # import ipdb; ipdb.set_trace() custom_tmpfs = json.loads(custom_tmpfs) for path, size in custom_tmpfs.items(): self._temporary_filesystems[path] = size
[docs] def set_name(self, name): """Set the name to be used by the container in place of the docker auto generated one. """ self._name = name
[docs] def set_workdir(self, workdir): """Set the work folder to be used by the container""" self._workdir = workdir
[docs] def set_entrypoint(self, entrypoint): """Set the entry point to be used by the container""" self._entrypoint = entrypoint
[docs] def add_volume(self, path, mount_path, read_only=True): """Add a volume to be mounted on the container Parameters: :param str path: Source path of the volume on disk :param str mount_path: Path of the volume in the container :param boolean read_only: Whether the volume will be read only """ self._volumes[path] = {"bind": mount_path, "mode": "ro" if read_only else "rw"}
[docs] def add_tmpfs(self, path, size): """Add a tmpfs to be mounted on the container Parameters: :param str path: Target path for the tmpfs :param str size: Size of the tmps. Unlimited if empty """ self._temporary_filesystems[path] = size
[docs] def add_port(self, container_port, host_port, host_address=None): """Add a port binding Parameters: :param int container_port: Port to bind from the container :param int host_port: Port to bind to on the host :param str host_address: Address of the host """ if host_address is not None: value = (host_address, host_port) else: value = [host_port] self._ports[container_port] = value
[docs] def add_environment_variable(self, name, value): """Add an environment variable Parameters: :param str name: Name of the variable :param str value: Content of the variable """ self._environment_variables[name] = value
[docs] def reset_ports(self): """Empty the port bindings""" self._ports = {}
@property def name(self): name = "" if self._name: name = "--name=%s" % self._name return name @property def workdir(self): workdir = "" if self._workdir: workdir = "--workdir=%s" % self._workdir return workdir @property def entrypoint(self): entrypoint = "" if self._entrypoint: entrypoint = "--entrypoint=%s" % self._entrypoint return entrypoint @property def volumes(self): """Returns the volumes of this container in a suitable form to build a command to start the container. """ volumes = [] for k, v in self._volumes.items(): if k.startswith("nfs://"): addr, src = k[6:].split(":") volumes.append( "--mount=type=volume," "dst={dst}," "volume-driver=local," "volume-opt=type=nfs," "volume-opt=device=:{src}," "volume-opt=o=addr={addr}".format(dst=v["bind"], src=src, addr=addr) ) else: if k.startswith("file://"): k = k[6:] volumes.append("--volume=%s:%s:%s" % (k, v["bind"], v["mode"])) return volumes @property def temporary_filesystems(self): tempfs_list = [] for path, size in self._temporary_filesystems.items(): tmpfs_string = "--tmpfs={}:rw,noexec,nosuid".format(path) if size: tmpfs_string += ",size={}".format(size) tempfs_list.append(tmpfs_string) return tempfs_list @property def ports(self): """Returns the ports of this container in a suitable form to build a command to start the container. """ ports = [] for k, v in self._ports.items(): ports.append("-p") if isinstance(v, tuple): ports.append("%s:%d:%d" % (v[0], v[1], k)) else: ports.append("%d:%d" % (v[0], k)) return ports @property def environment_variables(self): """Returns the environment variables to set on this container.""" environment_variables = [] for k, v in self._environment_variables.items(): environment_variables.append("--env={}={}".format(k, v)) return environment_variables @property def network(self): network = "" if self.network_name: network = "--network=" + self.network_name return network @property def user(self): user = "" if self.uid: user = "--user={0}:{0}".format(self.uid) return user @property def command_line(self): """Returns the complete docker command to start the container and execute the specified command. Returns: str: Command to execute """ cmd = "docker run -ti --rm=true " cmd += "%s " % self.network cmd += "%s " % self.user cmd += " ".join(self.volumes) cmd += " ".join(self.ports) cmd += " ".join(self.environment_variables) cmd += "%s " % self.name cmd += "%s " % self.workdir cmd += "%s " % self.entrypoint cmd += "%s " % self.image cmd += '"%s"' % " ".join(self.command) return cmd