1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3
4"""Tools for interacting with the running computer or GPU"""
5
6import logging
7import multiprocessing
8import os
9import queue
10import shutil
11import subprocess
12import time
13
14import numpy
15import psutil
16
17logger = logging.getLogger(__name__)
18
19_nvidia_smi = shutil.which("nvidia-smi")
20"""Location of the nvidia-smi program, if one exists"""
21
22
23GB = float(2**30)
24"""The number of bytes in a gigabyte"""
25
26
27def run_nvidia_smi(query, rename=None):
28 """Returns GPU information from query
29
30 For a comprehensive list of options and help, execute ``nvidia-smi
31 --help-query-gpu`` on a host with a GPU
32
33
34 Parameters
35 ----------
36
37 query : list
38 A list of query strings as defined by ``nvidia-smi --help-query-gpu``
39
40 rename : :py:class:`list`, Optional
41 A list of keys to yield in the return value for each entry above. It
42 gives you the opportunity to rewrite some key names for convenience.
43 This list, if provided, must be of the same length as ``query``.
44
45
46 Returns
47 -------
48
49 data : :py:class:`tuple`, None
50 An ordered dictionary (organized as 2-tuples) containing the queried
51 parameters (``rename`` versions). If ``nvidia-smi`` is not available,
52 returns ``None``. Percentage information is left alone,
53 memory information is transformed to gigabytes (floating-point).
54
55 """
56
57 if _nvidia_smi is not None:
58
59 if rename is None:
60 rename = query
61 else:
62 assert len(rename) == len(query)
63
64 # Get GPU information based on GPU ID.
65 values = subprocess.getoutput(
66 "%s --query-gpu=%s --format=csv,noheader --id=%s"
67 % (
68 _nvidia_smi,
69 ",".join(query),
70 os.environ.get("CUDA_VISIBLE_DEVICES"),
71 )
72 )
73 values = [k.strip() for k in values.split(",")]
74 t_values = []
75 for k in values:
76 if k.endswith("%"):
77 t_values.append(float(k[:-1].strip()))
78 elif k.endswith("MiB"):
79 t_values.append(float(k[:-3].strip()) / 1024)
80 else:
81 t_values.append(k) # unchanged
82 return tuple(zip(rename, t_values))
83
84
85def gpu_constants():
86 """Returns GPU (static) information using nvidia-smi
87
88 See :py:func:`run_nvidia_smi` for operational details.
89
90 Returns
91 -------
92
93 data : :py:class:`tuple`, None
94 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
95 return an ordered dictionary (organized as 2-tuples) containing the
96 following ``nvidia-smi`` query information:
97
98 * ``gpu_name``, as ``gpu_name`` (:py:class:`str`)
99 * ``driver_version``, as ``gpu_driver_version`` (:py:class:`str`)
100 * ``memory.total``, as ``gpu_memory_total`` (transformed to gigabytes,
101 :py:class:`float`)
102
103 """
104
105 return run_nvidia_smi(
106 ("gpu_name", "driver_version", "memory.total"),
107 ("gpu_name", "gpu_driver_version", "gpu_memory_total"),
108 )
109
110
111def gpu_log():
112 """Returns GPU information about current non-static status using nvidia-smi
113
114 See :py:func:`run_nvidia_smi` for operational details.
115
116 Returns
117 -------
118
119 data : :py:class:`tuple`, None
120 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we
121 return an ordered dictionary (organized as 2-tuples) containing the
122 following ``nvidia-smi`` query information:
123
124 * ``memory.used``, as ``gpu_memory_used`` (transformed to gigabytes,
125 :py:class:`float`)
126 * ``memory.free``, as ``gpu_memory_free`` (transformed to gigabytes,
127 :py:class:`float`)
128 * ``100*memory.used/memory.total``, as ``gpu_memory_percent``,
129 (:py:class:`float`, in percent)
130 * ``utilization.gpu``, as ``gpu_percent``,
131 (:py:class:`float`, in percent)
132
133 """
134
135 retval = run_nvidia_smi(
136 (
137 "memory.total",
138 "memory.used",
139 "memory.free",
140 "utilization.gpu",
141 ),
142 (
143 "gpu_memory_total",
144 "gpu_memory_used",
145 "gpu_memory_free",
146 "gpu_percent",
147 ),
148 )
149
150 # re-compose the output to generate expected values
151 return (
152 retval[1], # gpu_memory_used
153 retval[2], # gpu_memory_free
154 ("gpu_memory_percent", 100 * (retval[1][1] / retval[0][1])),
155 retval[3], # gpu_percent
156 )
157
158
159def cpu_constants():
160 """Returns static CPU information about the current system.
161
162
163 Returns
164 -------
165
166 data : tuple
167 An ordered dictionary (organized as 2-tuples) containing these entries:
168
169 0. ``cpu_memory_total`` (:py:class:`float`): total memory available,
170 in gigabytes
171 1. ``cpu_count`` (:py:class:`int`): number of logical CPUs available
172
173 """
174
175 return (
176 ("cpu_memory_total", psutil.virtual_memory().total / GB),
177 ("cpu_count", psutil.cpu_count(logical=True)),
178 )
179
180
181class CPULogger:
182 """Logs CPU information using :py:mod:`psutil`
183
184
185 Parameters
186 ----------
187
188 pid : :py:class:`int`, Optional
189 Process identifier of the main process (parent process) to observe
190
191 """
192
193 def __init__(self, pid=None):
194 this = psutil.Process(pid=pid)
195 self.cluster = [this] + this.children(recursive=True)
196 # touch cpu_percent() at least once for all processes in the cluster
197 [k.cpu_percent(interval=None) for k in self.cluster]
198
199 def log(self):
200 """Returns current process cluster information
201
202 Returns
203 -------
204
205 data : tuple
206 An ordered dictionary (organized as 2-tuples) containing these entries:
207
208 0. ``cpu_memory_used`` (:py:class:`float`): total memory used from
209 the system, in gigabytes
210 1. ``cpu_rss`` (:py:class:`float`): RAM currently used by
211 process and children, in gigabytes
212 2. ``cpu_vms`` (:py:class:`float`): total memory (RAM + swap) currently
213 used by process and children, in gigabytes
214 3. ``cpu_percent`` (:py:class:`float`): percentage of the total CPU
215 used by this process and children (recursively) since last call
216 (first time called should be ignored). This number depends on the
217 number of CPUs in the system and can be greater than 100%
218 4. ``cpu_processes`` (:py:class:`int`): total number of processes
219 including self and children (recursively)
220 5. ``cpu_open_files`` (:py:class:`int`): total number of open files by
221 self and children
222
223 """
224
225 # check all cluster components and update process list
226 # done so we can keep the cpu_percent() initialization
227 stored_children = set(self.cluster[1:])
228 current_children = set(self.cluster[0].children(recursive=True))
229 keep_children = stored_children - current_children
230 new_children = current_children - stored_children
231 gone = set()
232 for k in new_children:
233 try:
234 k.cpu_percent(interval=None)
235 except (psutil.ZombieProcess, psutil.NoSuchProcess):
236 # child process is gone meanwhile
237 # update the intermediate list for this time
238 gone.add(k)
239 new_children = new_children - gone
240 self.cluster = (
241 self.cluster[:1] + list(keep_children) + list(new_children)
242 )
243
244 memory_info = []
245 cpu_percent = []
246 open_files = []
247 gone = set()
248 for k in self.cluster:
249 try:
250 memory_info.append(k.memory_info())
251 cpu_percent.append(k.cpu_percent(interval=None))
252 open_files.append(len(k.open_files()))
253 except (psutil.ZombieProcess, psutil.NoSuchProcess):
254 # child process is gone meanwhile, just ignore it
255 # it is too late to update any intermediate list
256 # at this point, but ensures to update counts later on
257 gone.add(k)
258
259 return (
260 ("cpu_memory_used", psutil.virtual_memory().used / GB),
261 ("cpu_rss", sum([k.rss for k in memory_info]) / GB),
262 ("cpu_vms", sum([k.vms for k in memory_info]) / GB),
263 ("cpu_percent", sum(cpu_percent)),
264 ("cpu_processes", len(self.cluster) - len(gone)),
265 ("cpu_open_files", sum(open_files)),
266 )
267
268
269class _InformationGatherer:
270 """A container to store monitoring information
271
272 Parameters
273 ----------
274
275 has_gpu : bool
276 A flag indicating if we have a GPU installed on the platform or not
277
278 main_pid : int
279 The main process identifier to monitor
280
281 logger : logging.Logger
282 A logger to be used for logging messages
283
284 """
285
286 def __init__(self, has_gpu, main_pid, logger):
287 self.cpu_logger = CPULogger(main_pid)
288 self.keys = [k[0] for k in self.cpu_logger.log()]
289 self.cpu_keys_len = len(self.keys)
290 self.has_gpu = has_gpu
291 self.logger = logger
292 if self.has_gpu:
293 self.keys += [k[0] for k in gpu_log()]
294 self.data = [[] for _ in self.keys]
295
296 def acc(self):
297 """Accumulates another measurement"""
298 for i, k in enumerate(self.cpu_logger.log()):
299 self.data[i].append(k[1])
300 if self.has_gpu:
301 for i, k in enumerate(gpu_log()):
302 self.data[i + self.cpu_keys_len].append(k[1])
303
304 def summary(self):
305 """Returns the current data"""
306
307 if len(self.data[0]) == 0:
308 self.logger.error("CPU/GPU logger was not able to collect any data")
309 retval = []
310 for k, values in zip(self.keys, self.data):
311 retval.append((k, values))
312 return tuple(retval)
313
314
315def _monitor_worker(interval, has_gpu, main_pid, stop, queue, logging_level):
316 """A monitoring worker that measures resources and returns lists
317
318 Parameters
319 ==========
320
321 interval : int, float
322 Number of seconds to wait between each measurement (maybe a floating
323 point number as accepted by :py:func:`time.sleep`)
324
325 has_gpu : bool
326 A flag indicating if we have a GPU installed on the platform or not
327
328 main_pid : int
329 The main process identifier to monitor
330
331 stop : :py:class:`multiprocessing.Event`
332 Indicates if we should continue running or stop
333
334 queue : :py:class:`queue.Queue`
335 A queue, to send monitoring information back to the spawner
336
337 logging_level: int
338 The logging level to use for logging from launched processes
339
340 """
341
342 logger = multiprocessing.log_to_stderr(level=logging_level)
343 ra = _InformationGatherer(has_gpu, main_pid, logger)
344
345 while not stop.is_set():
346 try:
347 ra.acc() # guarantees at least an entry will be available
348 time.sleep(interval)
349 except Exception:
350 logger.warning(
351 "Iterative CPU/GPU logging did not work properly " "this once",
352 exc_info=True,
353 )
354 time.sleep(0.5) # wait half a second, and try again!
355
356 queue.put(ra.summary())
357
358
359class ResourceMonitor:
360 """An external, non-blocking CPU/GPU resource monitor
361
362 Parameters
363 ----------
364
365 interval : int, float
366 Number of seconds to wait between each measurement (maybe a floating
367 point number as accepted by :py:func:`time.sleep`)
368
369 has_gpu : bool
370 A flag indicating if we have a GPU installed on the platform or not
371
372 main_pid : int
373 The main process identifier to monitor
374
375 logging_level: int
376 The logging level to use for logging from launched processes
377
378 """
379
380 def __init__(self, interval, has_gpu, main_pid, logging_level):
381
382 self.interval = interval
383 self.has_gpu = has_gpu
384 self.main_pid = main_pid
385 self.event = multiprocessing.Event()
386 self.q = multiprocessing.Queue()
387 self.logging_level = logging_level
388
389 self.monitor = multiprocessing.Process(
390 target=_monitor_worker,
391 name="ResourceMonitorProcess",
392 args=(
393 self.interval,
394 self.has_gpu,
395 self.main_pid,
396 self.event,
397 self.q,
398 self.logging_level,
399 ),
400 )
401
402 self.data = None
403
404 @staticmethod
405 def monitored_keys(has_gpu):
406
407 return _InformationGatherer(has_gpu, None, logger).keys
408
409 def __enter__(self):
410 """Starts the monitoring process"""
411
412 self.monitor.start()
413 return self
414
415 def __exit__(self, *exc):
416 """Stops the monitoring process and returns the summary of observations"""
417
418 self.event.set()
419 self.monitor.join()
420 if self.monitor.exitcode != 0:
421 logger.error(
422 f"CPU/GPU resource monitor process exited with code "
423 f"{self.monitor.exitcode}. Check logs for errors!"
424 )
425
426 try:
427 data = self.q.get(timeout=2 * self.interval)
428 except queue.Empty:
429 logger.warn(
430 f"CPU/GPU resource monitor did not provide anything when "
431 f"joined (even after a {2*self.interval}-second timeout - "
432 f"this is normally due to exceptions on the monitoring process. "
433 f"Check above for other exceptions."
434 )
435 self.data = None
436 else:
437 # summarize the returned data by creating means
438 summary = []
439 for k, values in data:
440 if values:
441 if k in ("cpu_processes", "cpu_open_files"):
442 summary.append((k, numpy.max(values)))
443 else:
444 summary.append((k, numpy.mean(values)))
445 else:
446 summary.append((k, 0.0))
447 self.data = tuple(summary)