Coverage for src/deepdraw/utils/resources.py: 77%
135 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-11-30 15:00 +0100
« prev ^ index » next coverage.py v7.3.1, created at 2023-11-30 15:00 +0100
1# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
5"""Tools for interacting with the running computer or GPU."""
7import logging
8import multiprocessing
9import os
10import queue
11import shutil
12import subprocess
13import time
15import numpy
16import psutil
18logger = logging.getLogger(__name__)
20_nvidia_smi = shutil.which("nvidia-smi")
21"""Location of the nvidia-smi program, if one exists."""
24GB = float(2**30)
25"""The number of bytes in a gigabyte."""
28def run_nvidia_smi(query, rename=None):
29 """Returns GPU information from query.
31 For a comprehensive list of options and help, execute ``nvidia-smi
32 --help-query-gpu`` on a host with a GPU
35 Parameters
36 ----------
38 query : list
39 A list of query strings as defined by ``nvidia-smi --help-query-gpu``
41 rename : :py:class:`list`, Optional
42 A list of keys to yield in the return value for each entry above. It
43 gives you the opportunity to rewrite some key names for convenience.
44 This list, if provided, must be of the same length as ``query``.
47 Returns
48 -------
50 data : :py:class:`tuple`, None
51 An ordered dictionary (organized as 2-tuples) containing the queried
52 parameters (``rename`` versions). If ``nvidia-smi`` is not available,
53 returns ``None``. Percentage information is left alone,
54 memory information is transformed to gigabytes (floating-point).
55 """
57 if _nvidia_smi is not None:
58 if rename is None:
59 rename = query
60 else:
61 assert len(rename) == len(query)
63 # Get GPU information based on GPU ID.
64 values = subprocess.getoutput(
65 "%s --query-gpu=%s --format=csv,noheader --id=%s"
66 % (
67 _nvidia_smi,
68 ",".join(query),
69 os.environ.get("CUDA_VISIBLE_DEVICES"),
70 )
71 )
72 values = [k.strip() for k in values.split(",")]
73 t_values = []
74 for k in values:
75 if k.endswith("%"):
76 t_values.append(float(k[:-1].strip()))
77 elif k.endswith("MiB"):
78 t_values.append(float(k[:-3].strip()) / 1024)
79 else:
80 t_values.append(k) # unchanged
81 return tuple(zip(rename, t_values))
84def gpu_constants():
85 """Returns GPU (static) information using nvidia-smi.
87 See :py:func:`run_nvidia_smi` for operational details.
89 Returns
90 -------
92 data : :py:class:`tuple`, None
93 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
94 return an ordered dictionary (organized as 2-tuples) containing the
95 following ``nvidia-smi`` query information:
97 * ``gpu_name``, as ``gpu_name`` (:py:class:`str`)
98 * ``driver_version``, as ``gpu_driver_version`` (:py:class:`str`)
99 * ``memory.total``, as ``gpu_memory_total`` (transformed to gigabytes,
100 :py:class:`float`)
101 """
103 return run_nvidia_smi(
104 ("gpu_name", "driver_version", "memory.total"),
105 ("gpu_name", "gpu_driver_version", "gpu_memory_total"),
106 )
109def gpu_log():
110 """Returns GPU information about current non-static status using nvidia-
111 smi.
113 See :py:func:`run_nvidia_smi` for operational details.
115 Returns
116 -------
118 data : :py:class:`tuple`, None
119 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
120 return an ordered dictionary (organized as 2-tuples) containing the
121 following ``nvidia-smi`` query information:
123 * ``memory.used``, as ``gpu_memory_used`` (transformed to gigabytes,
124 :py:class:`float`)
125 * ``memory.free``, as ``gpu_memory_free`` (transformed to gigabytes,
126 :py:class:`float`)
127 * ``100*memory.used/memory.total``, as ``gpu_memory_percent``,
128 (:py:class:`float`, in percent)
129 * ``utilization.gpu``, as ``gpu_percent``,
130 (:py:class:`float`, in percent)
131 """
133 retval = run_nvidia_smi(
134 (
135 "memory.total",
136 "memory.used",
137 "memory.free",
138 "utilization.gpu",
139 ),
140 (
141 "gpu_memory_total",
142 "gpu_memory_used",
143 "gpu_memory_free",
144 "gpu_percent",
145 ),
146 )
148 # re-compose the output to generate expected values
149 return (
150 retval[1], # gpu_memory_used
151 retval[2], # gpu_memory_free
152 ("gpu_memory_percent", 100 * (retval[1][1] / retval[0][1])),
153 retval[3], # gpu_percent
154 )
157def cpu_constants():
158 """Returns static CPU information about the current system.
160 Returns
161 -------
163 data : tuple
164 An ordered dictionary (organized as 2-tuples) containing these entries:
166 0. ``cpu_memory_total`` (:py:class:`float`): total memory available,
167 in gigabytes
168 1. ``cpu_count`` (:py:class:`int`): number of logical CPUs available
169 """
171 return (
172 ("cpu_memory_total", psutil.virtual_memory().total / GB),
173 ("cpu_count", psutil.cpu_count(logical=True)),
174 )
177class CPULogger:
178 """Logs CPU information using :py:mod:`psutil`
180 Parameters
181 ----------
183 pid : :py:class:`int`, Optional
184 Process identifier of the main process (parent process) to observe
185 """
187 def __init__(self, pid=None):
188 this = psutil.Process(pid=pid)
189 self.cluster = [this] + this.children(recursive=True)
190 # touch cpu_percent() at least once for all processes in the cluster
191 [k.cpu_percent(interval=None) for k in self.cluster]
193 def log(self):
194 """Returns current process cluster information.
196 Returns
197 -------
199 data : tuple
200 An ordered dictionary (organized as 2-tuples) containing these entries:
202 0. ``cpu_memory_used`` (:py:class:`float`): total memory used from
203 the system, in gigabytes
204 1. ``cpu_rss`` (:py:class:`float`): RAM currently used by
205 process and children, in gigabytes
206 2. ``cpu_vms`` (:py:class:`float`): total memory (RAM + swap) currently
207 used by process and children, in gigabytes
208 3. ``cpu_percent`` (:py:class:`float`): percentage of the total CPU
209 used by this process and children (recursively) since last call
210 (first time called should be ignored). This number depends on the
211 number of CPUs in the system and can be greater than 100%
212 4. ``cpu_processes`` (:py:class:`int`): total number of processes
213 including self and children (recursively)
214 5. ``cpu_open_files`` (:py:class:`int`): total number of open files by
215 self and children
216 """
218 # check all cluster components and update process list
219 # done so we can keep the cpu_percent() initialization
220 stored_children = set(self.cluster[1:])
221 current_children = set(self.cluster[0].children(recursive=True))
222 keep_children = stored_children - current_children
223 new_children = current_children - stored_children
224 gone = set()
225 for k in new_children:
226 try:
227 k.cpu_percent(interval=None)
228 except (psutil.ZombieProcess, psutil.NoSuchProcess):
229 # child process is gone meanwhile
230 # update the intermediate list for this time
231 gone.add(k)
232 new_children = new_children - gone
233 self.cluster = (
234 self.cluster[:1] + list(keep_children) + list(new_children)
235 )
237 memory_info = []
238 cpu_percent = []
239 open_files = []
240 gone = set()
241 for k in self.cluster:
242 try:
243 memory_info.append(k.memory_info())
244 cpu_percent.append(k.cpu_percent(interval=None))
245 open_files.append(len(k.open_files()))
246 except (psutil.ZombieProcess, psutil.NoSuchProcess):
247 # child process is gone meanwhile, just ignore it
248 # it is too late to update any intermediate list
249 # at this point, but ensures to update counts later on
250 gone.add(k)
252 return (
253 ("cpu_memory_used", psutil.virtual_memory().used / GB),
254 ("cpu_rss", sum([k.rss for k in memory_info]) / GB),
255 ("cpu_vms", sum([k.vms for k in memory_info]) / GB),
256 ("cpu_percent", sum(cpu_percent)),
257 ("cpu_processes", len(self.cluster) - len(gone)),
258 ("cpu_open_files", sum(open_files)),
259 )
262class _InformationGatherer:
263 """A container to store monitoring information.
265 Parameters
266 ----------
268 has_gpu : bool
269 A flag indicating if we have a GPU installed on the platform or not
271 main_pid : int
272 The main process identifier to monitor
274 logger : logging.Logger
275 A logger to be used for logging messages
276 """
278 def __init__(self, has_gpu, main_pid, logger):
279 self.cpu_logger = CPULogger(main_pid)
280 self.keys = [k[0] for k in self.cpu_logger.log()]
281 self.cpu_keys_len = len(self.keys)
282 self.has_gpu = has_gpu
283 self.logger = logger
284 if self.has_gpu:
285 self.keys += [k[0] for k in gpu_log()]
286 self.data = [[] for _ in self.keys]
288 def acc(self):
289 """Accumulates another measurement."""
290 for i, k in enumerate(self.cpu_logger.log()):
291 self.data[i].append(k[1])
292 if self.has_gpu:
293 for i, k in enumerate(gpu_log()):
294 self.data[i + self.cpu_keys_len].append(k[1])
296 def summary(self):
297 """Returns the current data."""
299 if len(self.data[0]) == 0:
300 self.logger.error("CPU/GPU logger was not able to collect any data")
301 retval = []
302 for k, values in zip(self.keys, self.data):
303 retval.append((k, values))
304 return tuple(retval)
307def _monitor_worker(interval, has_gpu, main_pid, stop, queue, logging_level):
308 """A monitoring worker that measures resources and returns lists.
310 Parameters
311 ==========
313 interval : int, float
314 Number of seconds to wait between each measurement (maybe a floating
315 point number as accepted by :py:func:`time.sleep`)
317 has_gpu : bool
318 A flag indicating if we have a GPU installed on the platform or not
320 main_pid : int
321 The main process identifier to monitor
323 stop : :py:class:`multiprocessing.Event`
324 Indicates if we should continue running or stop
326 queue : :py:class:`queue.Queue`
327 A queue, to send monitoring information back to the spawner
329 logging_level: int
330 The logging level to use for logging from launched processes
331 """
333 logger = multiprocessing.log_to_stderr(level=logging_level)
334 ra = _InformationGatherer(has_gpu, main_pid, logger)
336 while not stop.is_set():
337 try:
338 ra.acc() # guarantees at least an entry will be available
339 time.sleep(interval)
340 except Exception:
341 logger.warning(
342 "Iterative CPU/GPU logging did not work properly " "this once",
343 exc_info=True,
344 )
345 time.sleep(0.5) # wait half a second, and try again!
347 queue.put(ra.summary())
350class ResourceMonitor:
351 """An external, non-blocking CPU/GPU resource monitor.
353 Parameters
354 ----------
356 interval : int, float
357 Number of seconds to wait between each measurement (maybe a floating
358 point number as accepted by :py:func:`time.sleep`)
360 has_gpu : bool
361 A flag indicating if we have a GPU installed on the platform or not
363 main_pid : int
364 The main process identifier to monitor
366 logging_level: int
367 The logging level to use for logging from launched processes
368 """
370 def __init__(self, interval, has_gpu, main_pid, logging_level):
371 self.interval = interval
372 self.has_gpu = has_gpu
373 self.main_pid = main_pid
374 self.event = multiprocessing.Event()
375 self.q = multiprocessing.Queue()
376 self.logging_level = logging_level
378 self.monitor = multiprocessing.Process(
379 target=_monitor_worker,
380 name="ResourceMonitorProcess",
381 args=(
382 self.interval,
383 self.has_gpu,
384 self.main_pid,
385 self.event,
386 self.q,
387 self.logging_level,
388 ),
389 )
391 self.data = None
393 @staticmethod
394 def monitored_keys(has_gpu):
395 return _InformationGatherer(has_gpu, None, logger).keys
397 def __enter__(self):
398 """Starts the monitoring process."""
400 self.monitor.start()
401 return self
403 def __exit__(self, *exc):
404 """Stops the monitoring process and returns the summary of
405 observations."""
407 self.event.set()
408 self.monitor.join()
409 if self.monitor.exitcode != 0:
410 logger.error(
411 f"CPU/GPU resource monitor process exited with code "
412 f"{self.monitor.exitcode}. Check logs for errors!"
413 )
415 try:
416 data = self.q.get(timeout=2 * self.interval)
417 except queue.Empty:
418 logger.warn(
419 f"CPU/GPU resource monitor did not provide anything when "
420 f"joined (even after a {2*self.interval}-second timeout - "
421 f"this is normally due to exceptions on the monitoring process. "
422 f"Check above for other exceptions."
423 )
424 self.data = None
425 else:
426 # summarize the returned data by creating means
427 summary = []
428 for k, values in data:
429 if values:
430 if k in ("cpu_processes", "cpu_open_files"):
431 summary.append((k, numpy.max(values)))
432 else:
433 summary.append((k, numpy.mean(values)))
434 else:
435 summary.append((k, 0.0))
436 self.data = tuple(summary)