Coverage for /scratch/builds/bob/bob.ip.binseg/miniconda/conda-bld/bob.ip.binseg_1673966692152/_test_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_p/lib/python3.10/site-packages/bob/ip/common/utils/resources.py: 63%

135 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 15:03 +0000

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3 

4"""Tools for interacting with the running computer or GPU""" 

5 

6import logging 

7import multiprocessing 

8import os 

9import queue 

10import shutil 

11import subprocess 

12import time 

13 

14import numpy 

15import psutil 

16 

17logger = logging.getLogger(__name__) 

18 

19_nvidia_smi = shutil.which("nvidia-smi") 

20"""Location of the nvidia-smi program, if one exists""" 

21 

22 

23GB = float(2**30) 

24"""The number of bytes in a gigabyte""" 

25 

26 

27def run_nvidia_smi(query, rename=None): 

28 """Returns GPU information from query 

29 

30 For a comprehensive list of options and help, execute ``nvidia-smi 

31 --help-query-gpu`` on a host with a GPU 

32 

33 

34 Parameters 

35 ---------- 

36 

37 query : list 

38 A list of query strings as defined by ``nvidia-smi --help-query-gpu`` 

39 

40 rename : :py:class:`list`, Optional 

41 A list of keys to yield in the return value for each entry above. It 

42 gives you the opportunity to rewrite some key names for convenience. 

43 This list, if provided, must be of the same length as ``query``. 

44 

45 

46 Returns 

47 ------- 

48 

49 data : :py:class:`tuple`, None 

50 An ordered dictionary (organized as 2-tuples) containing the queried 

51 parameters (``rename`` versions). If ``nvidia-smi`` is not available, 

52 returns ``None``. Percentage information is left alone, 

53 memory information is transformed to gigabytes (floating-point). 

54 

55 """ 

56 

57 if _nvidia_smi is not None: 

58 

59 if rename is None: 

60 rename = query 

61 else: 

62 assert len(rename) == len(query) 

63 

64 # Get GPU information based on GPU ID. 

65 values = subprocess.getoutput( 

66 "%s --query-gpu=%s --format=csv,noheader --id=%s" 

67 % ( 

68 _nvidia_smi, 

69 ",".join(query), 

70 os.environ.get("CUDA_VISIBLE_DEVICES"), 

71 ) 

72 ) 

73 values = [k.strip() for k in values.split(",")] 

74 t_values = [] 

75 for k in values: 

76 if k.endswith("%"): 

77 t_values.append(float(k[:-1].strip())) 

78 elif k.endswith("MiB"): 

79 t_values.append(float(k[:-3].strip()) / 1024) 

80 else: 

81 t_values.append(k) # unchanged 

82 return tuple(zip(rename, t_values)) 

83 

84 

85def gpu_constants(): 

86 """Returns GPU (static) information using nvidia-smi 

87 

88 See :py:func:`run_nvidia_smi` for operational details. 

89 

90 Returns 

91 ------- 

92 

93 data : :py:class:`tuple`, None 

94 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we 

95 return an ordered dictionary (organized as 2-tuples) containing the 

96 following ``nvidia-smi`` query information: 

97 

98 * ``gpu_name``, as ``gpu_name`` (:py:class:`str`) 

99 * ``driver_version``, as ``gpu_driver_version`` (:py:class:`str`) 

100 * ``memory.total``, as ``gpu_memory_total`` (transformed to gigabytes, 

101 :py:class:`float`) 

102 

103 """ 

104 

105 return run_nvidia_smi( 

106 ("gpu_name", "driver_version", "memory.total"), 

107 ("gpu_name", "gpu_driver_version", "gpu_memory_total"), 

108 ) 

109 

110 

111def gpu_log(): 

112 """Returns GPU information about current non-static status using nvidia-smi 

113 

114 See :py:func:`run_nvidia_smi` for operational details. 

115 

116 Returns 

117 ------- 

118 

119 data : :py:class:`tuple`, None 

120 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we 

121 return an ordered dictionary (organized as 2-tuples) containing the 

122 following ``nvidia-smi`` query information: 

123 

124 * ``memory.used``, as ``gpu_memory_used`` (transformed to gigabytes, 

125 :py:class:`float`) 

126 * ``memory.free``, as ``gpu_memory_free`` (transformed to gigabytes, 

127 :py:class:`float`) 

128 * ``100*memory.used/memory.total``, as ``gpu_memory_percent``, 

129 (:py:class:`float`, in percent) 

130 * ``utilization.gpu``, as ``gpu_percent``, 

131 (:py:class:`float`, in percent) 

132 

133 """ 

134 

135 retval = run_nvidia_smi( 

136 ( 

137 "memory.total", 

138 "memory.used", 

139 "memory.free", 

140 "utilization.gpu", 

141 ), 

142 ( 

143 "gpu_memory_total", 

144 "gpu_memory_used", 

145 "gpu_memory_free", 

146 "gpu_percent", 

147 ), 

148 ) 

149 

150 # re-compose the output to generate expected values 

151 return ( 

152 retval[1], # gpu_memory_used 

153 retval[2], # gpu_memory_free 

154 ("gpu_memory_percent", 100 * (retval[1][1] / retval[0][1])), 

155 retval[3], # gpu_percent 

156 ) 

157 

158 

159def cpu_constants(): 

160 """Returns static CPU information about the current system. 

161 

162 

163 Returns 

164 ------- 

165 

166 data : tuple 

167 An ordered dictionary (organized as 2-tuples) containing these entries: 

168 

169 0. ``cpu_memory_total`` (:py:class:`float`): total memory available, 

170 in gigabytes 

171 1. ``cpu_count`` (:py:class:`int`): number of logical CPUs available 

172 

173 """ 

174 

175 return ( 

176 ("cpu_memory_total", psutil.virtual_memory().total / GB), 

177 ("cpu_count", psutil.cpu_count(logical=True)), 

178 ) 

179 

180 

181class CPULogger: 

182 """Logs CPU information using :py:mod:`psutil` 

183 

184 

185 Parameters 

186 ---------- 

187 

188 pid : :py:class:`int`, Optional 

189 Process identifier of the main process (parent process) to observe 

190 

191 """ 

192 

193 def __init__(self, pid=None): 

194 this = psutil.Process(pid=pid) 

195 self.cluster = [this] + this.children(recursive=True) 

196 # touch cpu_percent() at least once for all processes in the cluster 

197 [k.cpu_percent(interval=None) for k in self.cluster] 

198 

199 def log(self): 

200 """Returns current process cluster information 

201 

202 Returns 

203 ------- 

204 

205 data : tuple 

206 An ordered dictionary (organized as 2-tuples) containing these entries: 

207 

208 0. ``cpu_memory_used`` (:py:class:`float`): total memory used from 

209 the system, in gigabytes 

210 1. ``cpu_rss`` (:py:class:`float`): RAM currently used by 

211 process and children, in gigabytes 

212 2. ``cpu_vms`` (:py:class:`float`): total memory (RAM + swap) currently 

213 used by process and children, in gigabytes 

214 3. ``cpu_percent`` (:py:class:`float`): percentage of the total CPU 

215 used by this process and children (recursively) since last call 

216 (first time called should be ignored). This number depends on the 

217 number of CPUs in the system and can be greater than 100% 

218 4. ``cpu_processes`` (:py:class:`int`): total number of processes 

219 including self and children (recursively) 

220 5. ``cpu_open_files`` (:py:class:`int`): total number of open files by 

221 self and children 

222 

223 """ 

224 

225 # check all cluster components and update process list 

226 # done so we can keep the cpu_percent() initialization 

227 stored_children = set(self.cluster[1:]) 

228 current_children = set(self.cluster[0].children(recursive=True)) 

229 keep_children = stored_children - current_children 

230 new_children = current_children - stored_children 

231 gone = set() 

232 for k in new_children: 

233 try: 

234 k.cpu_percent(interval=None) 

235 except (psutil.ZombieProcess, psutil.NoSuchProcess): 

236 # child process is gone meanwhile 

237 # update the intermediate list for this time 

238 gone.add(k) 

239 new_children = new_children - gone 

240 self.cluster = ( 

241 self.cluster[:1] + list(keep_children) + list(new_children) 

242 ) 

243 

244 memory_info = [] 

245 cpu_percent = [] 

246 open_files = [] 

247 gone = set() 

248 for k in self.cluster: 

249 try: 

250 memory_info.append(k.memory_info()) 

251 cpu_percent.append(k.cpu_percent(interval=None)) 

252 open_files.append(len(k.open_files())) 

253 except (psutil.ZombieProcess, psutil.NoSuchProcess): 

254 # child process is gone meanwhile, just ignore it 

255 # it is too late to update any intermediate list 

256 # at this point, but ensures to update counts later on 

257 gone.add(k) 

258 

259 return ( 

260 ("cpu_memory_used", psutil.virtual_memory().used / GB), 

261 ("cpu_rss", sum([k.rss for k in memory_info]) / GB), 

262 ("cpu_vms", sum([k.vms for k in memory_info]) / GB), 

263 ("cpu_percent", sum(cpu_percent)), 

264 ("cpu_processes", len(self.cluster) - len(gone)), 

265 ("cpu_open_files", sum(open_files)), 

266 ) 

267 

268 

269class _InformationGatherer: 

270 """A container to store monitoring information 

271 

272 Parameters 

273 ---------- 

274 

275 has_gpu : bool 

276 A flag indicating if we have a GPU installed on the platform or not 

277 

278 main_pid : int 

279 The main process identifier to monitor 

280 

281 logger : logging.Logger 

282 A logger to be used for logging messages 

283 

284 """ 

285 

286 def __init__(self, has_gpu, main_pid, logger): 

287 self.cpu_logger = CPULogger(main_pid) 

288 self.keys = [k[0] for k in self.cpu_logger.log()] 

289 self.cpu_keys_len = len(self.keys) 

290 self.has_gpu = has_gpu 

291 self.logger = logger 

292 if self.has_gpu: 

293 self.keys += [k[0] for k in gpu_log()] 

294 self.data = [[] for _ in self.keys] 

295 

296 def acc(self): 

297 """Accumulates another measurement""" 

298 for i, k in enumerate(self.cpu_logger.log()): 

299 self.data[i].append(k[1]) 

300 if self.has_gpu: 

301 for i, k in enumerate(gpu_log()): 

302 self.data[i + self.cpu_keys_len].append(k[1]) 

303 

304 def summary(self): 

305 """Returns the current data""" 

306 

307 if len(self.data[0]) == 0: 

308 self.logger.error("CPU/GPU logger was not able to collect any data") 

309 retval = [] 

310 for k, values in zip(self.keys, self.data): 

311 retval.append((k, values)) 

312 return tuple(retval) 

313 

314 

315def _monitor_worker(interval, has_gpu, main_pid, stop, queue, logging_level): 

316 """A monitoring worker that measures resources and returns lists 

317 

318 Parameters 

319 ========== 

320 

321 interval : int, float 

322 Number of seconds to wait between each measurement (maybe a floating 

323 point number as accepted by :py:func:`time.sleep`) 

324 

325 has_gpu : bool 

326 A flag indicating if we have a GPU installed on the platform or not 

327 

328 main_pid : int 

329 The main process identifier to monitor 

330 

331 stop : :py:class:`multiprocessing.Event` 

332 Indicates if we should continue running or stop 

333 

334 queue : :py:class:`queue.Queue` 

335 A queue, to send monitoring information back to the spawner 

336 

337 logging_level: int 

338 The logging level to use for logging from launched processes 

339 

340 """ 

341 

342 logger = multiprocessing.log_to_stderr(level=logging_level) 

343 ra = _InformationGatherer(has_gpu, main_pid, logger) 

344 

345 while not stop.is_set(): 

346 try: 

347 ra.acc() # guarantees at least an entry will be available 

348 time.sleep(interval) 

349 except Exception: 

350 logger.warning( 

351 "Iterative CPU/GPU logging did not work properly " "this once", 

352 exc_info=True, 

353 ) 

354 time.sleep(0.5) # wait half a second, and try again! 

355 

356 queue.put(ra.summary()) 

357 

358 

359class ResourceMonitor: 

360 """An external, non-blocking CPU/GPU resource monitor 

361 

362 Parameters 

363 ---------- 

364 

365 interval : int, float 

366 Number of seconds to wait between each measurement (maybe a floating 

367 point number as accepted by :py:func:`time.sleep`) 

368 

369 has_gpu : bool 

370 A flag indicating if we have a GPU installed on the platform or not 

371 

372 main_pid : int 

373 The main process identifier to monitor 

374 

375 logging_level: int 

376 The logging level to use for logging from launched processes 

377 

378 """ 

379 

380 def __init__(self, interval, has_gpu, main_pid, logging_level): 

381 

382 self.interval = interval 

383 self.has_gpu = has_gpu 

384 self.main_pid = main_pid 

385 self.event = multiprocessing.Event() 

386 self.q = multiprocessing.Queue() 

387 self.logging_level = logging_level 

388 

389 self.monitor = multiprocessing.Process( 

390 target=_monitor_worker, 

391 name="ResourceMonitorProcess", 

392 args=( 

393 self.interval, 

394 self.has_gpu, 

395 self.main_pid, 

396 self.event, 

397 self.q, 

398 self.logging_level, 

399 ), 

400 ) 

401 

402 self.data = None 

403 

404 @staticmethod 

405 def monitored_keys(has_gpu): 

406 

407 return _InformationGatherer(has_gpu, None, logger).keys 

408 

409 def __enter__(self): 

410 """Starts the monitoring process""" 

411 

412 self.monitor.start() 

413 return self 

414 

415 def __exit__(self, *exc): 

416 """Stops the monitoring process and returns the summary of observations""" 

417 

418 self.event.set() 

419 self.monitor.join() 

420 if self.monitor.exitcode != 0: 

421 logger.error( 

422 f"CPU/GPU resource monitor process exited with code " 

423 f"{self.monitor.exitcode}. Check logs for errors!" 

424 ) 

425 

426 try: 

427 data = self.q.get(timeout=2 * self.interval) 

428 except queue.Empty: 

429 logger.warn( 

430 f"CPU/GPU resource monitor did not provide anything when " 

431 f"joined (even after a {2*self.interval}-second timeout - " 

432 f"this is normally due to exceptions on the monitoring process. " 

433 f"Check above for other exceptions." 

434 ) 

435 self.data = None 

436 else: 

437 # summarize the returned data by creating means 

438 summary = [] 

439 for k, values in data: 

440 if values: 

441 if k in ("cpu_processes", "cpu_open_files"): 

442 summary.append((k, numpy.max(values))) 

443 else: 

444 summary.append((k, numpy.mean(values))) 

445 else: 

446 summary.append((k, 0.0)) 

447 self.data = tuple(summary)