Source code for beat.core.stats

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################


"""
=====
stats
=====

A class that can read, validate and update statistical information

Forward impored from :py:mod:`beat.backend.python.stats`:
:py:func:`beat.backend.python.stats.io_statistics`
:py:func:`beat.backend.python.stats.update`
"""
import copy
import os

import simplejson as json

from beat.backend.python.stats import io_statistics  # noqa
from beat.backend.python.stats import update  # noqa

from . import prototypes
from . import schema


[docs]class Statistics(object): """Statistics define resource usage for algorithmic code runs Parameters: data (:py:class:`object`, Optional): The piece of data representing the statistics the be read, it must validate against our pre-defined execution schema. If the input is ``None`` or empty, then start a new statistics from scratch. Attributes: errors (list): A list strings containing errors found while loading this statistics information. """ def __init__(self, data=None): self.errors = [] if data: self._load(data) # also runs validation else: self._data, self.errors = prototypes.load("statistics") # also validates def _load(self, data): """Loads the statistics Parameters: data (object, str, file): The piece of data to load. The input can be a valid python object that represents a JSON structure, a file, from which the JSON contents will be read out or a string. See :py:func:`schema.validate` for more details. """ # reset self._data = None self.errors = [] if not isinstance(data, dict): # user has passed a file pointer if not os.path.exists(data): self.errors.append("File not found: %s" % data) return # this runs basic validation, including JSON loading if required self._data, self.errors = schema.validate("statistics", data) if self.errors: return # don't proceed with the rest of validation @property def schema_version(self): """Returns the schema version""" return self.data.get("schema_version", 1) @property def cpu(self): """Returns only CPU information""" return self._data["cpu"] @cpu.setter def cpu(self, data): """Sets the CPU information""" for key in ("user", "system", "total"): self._data["cpu"][key] = data[key] for key in ("voluntary", "involuntary"): self._data["cpu"]["context_switches"][key] = data["context_switches"][key] @property def memory(self): """Returns only memory information""" return self._data["memory"] @memory.setter def memory(self, data): """Sets only the memory information""" for key in ("rss",): self._data["memory"][key] = data[key] @property def data(self): """Returns only I/O information""" return self._data["data"] @data.setter def data(self, data): """Sets only the I/O information""" for key in ("volume", "blocks", "time"): self._data["data"][key]["read"] = data[key]["read"] self._data["data"][key]["write"] = data[key]["write"] self._data["data"]["files"] = list(data["files"]) self._data["network"] = data["network"] @property def valid(self): """A boolean that indicates if this executor is valid or not""" return not bool(self.errors) def __add__(self, other): """Adds two statistics data blocks""" retval = Statistics(copy.deepcopy(self._data)) retval += other return retval def __iadd__(self, other): """Self-add statistics from another block""" if not isinstance(other, Statistics): return NotImplemented for key in ("user", "system", "total"): self._data["cpu"][key] += other._data["cpu"][key] for key in ("voluntary", "involuntary"): self._data["cpu"]["context_switches"][key] += other._data["cpu"][ "context_switches" ][key] for key in ("rss",): # gets the maximum between the two self._data["memory"][key] = max( other._data["memory"][key], self._data["memory"][key] ) for key in ("volume", "blocks", "time"): self._data["data"][key]["read"] += other._data["data"][key]["read"] self._data["data"][key]["write"] += other._data["data"][key]["write"] self._data["data"]["files"] += other._data["data"]["files"] self._data["data"]["network"]["wait_time"] += other._data["data"]["network"][ "wait_time" ] return self def __str__(self): return self.as_json(2)
[docs] def as_json(self, indent=None): """Returns self as as JSON Parameters: :param indent int: Indentation to use for the JSON generation Returns: dict: JSON representation """ return json.dumps(self._data, indent=indent)
[docs] def as_dict(self): """Returns self as a dictionary""" return self._data
[docs] def write(self, f): """Writes contents to a file-like object""" if hasattr(f, "write"): f.write(str(self)) else: with open(f, "wt") as fobj: fobj.write(str(self))
# ----------------------------------------------------------
[docs]def cpu_statistics(start, end): """Summarizes current CPU usage This method should be used when the currently set algorithm is the only one executed through the whole process. It is done for collecting resource statistics on separate processing environments. It follows the recipe in: http://stackoverflow.com/questions/30271942/get-docker-container-cpu-usage-as-percentage Returns: dict: A dictionary summarizing current CPU usage """ if "system_cpu_usage" not in end: return { "user": 0.0, "system": 0.0, "total": 0.0, "percent": 0.0, "processors": 1, } if start is not None: user_cpu = end["cpu_usage"]["total_usage"] - start["cpu_usage"]["total_usage"] total_cpu = end["system_cpu_usage"] - start["system_cpu_usage"] else: user_cpu = end["cpu_usage"]["total_usage"] total_cpu = end["system_cpu_usage"] user_cpu /= 1000000000.0 # in seconds total_cpu /= 1000000000.0 # in seconds end_cpu_usage = end["cpu_usage"] end_percpu_usage = end_cpu_usage.get("percpu_usage", []) processors = len(end_percpu_usage) return { "user": user_cpu, "system": 0.0, "total": total_cpu, "percent": 100.0 * processors * user_cpu / total_cpu if total_cpu else 0.0, "processors": processors, }
# ----------------------------------------------------------
[docs]def memory_statistics(data): """Summarizes current memory usage This method should be used when the currently set algorithm is the only one executed through the whole process. It is done for collecting resource statistics on separate processing environments. Returns: dict: A dictionary summarizing current memory usage """ limit = float(data["limit"]) memory = float(data.get("max_usage", data.get("usage"))) return { "rss": memory, "limit": limit, "percent": 100.0 * memory / limit if limit else 0.0, }