Coverage for src/deepdraw/utils/resources.py: 77%

135 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-11-30 15:00 +0100

1# SPDX-FileCopyrightText: Copyright © 2023 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5"""Tools for interacting with the running computer or GPU.""" 

6 

7import logging 

8import multiprocessing 

9import os 

10import queue 

11import shutil 

12import subprocess 

13import time 

14 

15import numpy 

16import psutil 

17 

18logger = logging.getLogger(__name__) 

19 

20_nvidia_smi = shutil.which("nvidia-smi") 

21"""Location of the nvidia-smi program, if one exists.""" 

22 

23 

24GB = float(2**30) 

25"""The number of bytes in a gigabyte.""" 

26 

27 

28def run_nvidia_smi(query, rename=None): 

29 """Returns GPU information from query. 

30 

31 For a comprehensive list of options and help, execute ``nvidia-smi 

32 --help-query-gpu`` on a host with a GPU 

33 

34 

35 Parameters 

36 ---------- 

37 

38 query : list 

39 A list of query strings as defined by ``nvidia-smi --help-query-gpu`` 

40 

41 rename : :py:class:`list`, Optional 

42 A list of keys to yield in the return value for each entry above. It 

43 gives you the opportunity to rewrite some key names for convenience. 

44 This list, if provided, must be of the same length as ``query``. 

45 

46 

47 Returns 

48 ------- 

49 

50 data : :py:class:`tuple`, None 

51 An ordered dictionary (organized as 2-tuples) containing the queried 

52 parameters (``rename`` versions). If ``nvidia-smi`` is not available, 

53 returns ``None``. Percentage information is left alone, 

54 memory information is transformed to gigabytes (floating-point). 

55 """ 

56 

57 if _nvidia_smi is not None: 

58 if rename is None: 

59 rename = query 

60 else: 

61 assert len(rename) == len(query) 

62 

63 # Get GPU information based on GPU ID. 

64 values = subprocess.getoutput( 

65 "%s --query-gpu=%s --format=csv,noheader --id=%s" 

66 % ( 

67 _nvidia_smi, 

68 ",".join(query), 

69 os.environ.get("CUDA_VISIBLE_DEVICES"), 

70 ) 

71 ) 

72 values = [k.strip() for k in values.split(",")] 

73 t_values = [] 

74 for k in values: 

75 if k.endswith("%"): 

76 t_values.append(float(k[:-1].strip())) 

77 elif k.endswith("MiB"): 

78 t_values.append(float(k[:-3].strip()) / 1024) 

79 else: 

80 t_values.append(k) # unchanged 

81 return tuple(zip(rename, t_values)) 

82 

83 

84def gpu_constants(): 

85 """Returns GPU (static) information using nvidia-smi. 

86 

87 See :py:func:`run_nvidia_smi` for operational details. 

88 

89 Returns 

90 ------- 

91 

92 data : :py:class:`tuple`, None 

93 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we 

94 return an ordered dictionary (organized as 2-tuples) containing the 

95 following ``nvidia-smi`` query information: 

96 

97 * ``gpu_name``, as ``gpu_name`` (:py:class:`str`) 

98 * ``driver_version``, as ``gpu_driver_version`` (:py:class:`str`) 

99 * ``memory.total``, as ``gpu_memory_total`` (transformed to gigabytes, 

100 :py:class:`float`) 

101 """ 

102 

103 return run_nvidia_smi( 

104 ("gpu_name", "driver_version", "memory.total"), 

105 ("gpu_name", "gpu_driver_version", "gpu_memory_total"), 

106 ) 

107 

108 

109def gpu_log(): 

110 """Returns GPU information about current non-static status using nvidia- 

111 smi. 

112 

113 See :py:func:`run_nvidia_smi` for operational details. 

114 

115 Returns 

116 ------- 

117 

118 data : :py:class:`tuple`, None 

119 If ``nvidia-smi`` is not available, returns ``None``, otherwise, we 

120 return an ordered dictionary (organized as 2-tuples) containing the 

121 following ``nvidia-smi`` query information: 

122 

123 * ``memory.used``, as ``gpu_memory_used`` (transformed to gigabytes, 

124 :py:class:`float`) 

125 * ``memory.free``, as ``gpu_memory_free`` (transformed to gigabytes, 

126 :py:class:`float`) 

127 * ``100*memory.used/memory.total``, as ``gpu_memory_percent``, 

128 (:py:class:`float`, in percent) 

129 * ``utilization.gpu``, as ``gpu_percent``, 

130 (:py:class:`float`, in percent) 

131 """ 

132 

133 retval = run_nvidia_smi( 

134 ( 

135 "memory.total", 

136 "memory.used", 

137 "memory.free", 

138 "utilization.gpu", 

139 ), 

140 ( 

141 "gpu_memory_total", 

142 "gpu_memory_used", 

143 "gpu_memory_free", 

144 "gpu_percent", 

145 ), 

146 ) 

147 

148 # re-compose the output to generate expected values 

149 return ( 

150 retval[1], # gpu_memory_used 

151 retval[2], # gpu_memory_free 

152 ("gpu_memory_percent", 100 * (retval[1][1] / retval[0][1])), 

153 retval[3], # gpu_percent 

154 ) 

155 

156 

157def cpu_constants(): 

158 """Returns static CPU information about the current system. 

159 

160 Returns 

161 ------- 

162 

163 data : tuple 

164 An ordered dictionary (organized as 2-tuples) containing these entries: 

165 

166 0. ``cpu_memory_total`` (:py:class:`float`): total memory available, 

167 in gigabytes 

168 1. ``cpu_count`` (:py:class:`int`): number of logical CPUs available 

169 """ 

170 

171 return ( 

172 ("cpu_memory_total", psutil.virtual_memory().total / GB), 

173 ("cpu_count", psutil.cpu_count(logical=True)), 

174 ) 

175 

176 

177class CPULogger: 

178 """Logs CPU information using :py:mod:`psutil` 

179 

180 Parameters 

181 ---------- 

182 

183 pid : :py:class:`int`, Optional 

184 Process identifier of the main process (parent process) to observe 

185 """ 

186 

187 def __init__(self, pid=None): 

188 this = psutil.Process(pid=pid) 

189 self.cluster = [this] + this.children(recursive=True) 

190 # touch cpu_percent() at least once for all processes in the cluster 

191 [k.cpu_percent(interval=None) for k in self.cluster] 

192 

193 def log(self): 

194 """Returns current process cluster information. 

195 

196 Returns 

197 ------- 

198 

199 data : tuple 

200 An ordered dictionary (organized as 2-tuples) containing these entries: 

201 

202 0. ``cpu_memory_used`` (:py:class:`float`): total memory used from 

203 the system, in gigabytes 

204 1. ``cpu_rss`` (:py:class:`float`): RAM currently used by 

205 process and children, in gigabytes 

206 2. ``cpu_vms`` (:py:class:`float`): total memory (RAM + swap) currently 

207 used by process and children, in gigabytes 

208 3. ``cpu_percent`` (:py:class:`float`): percentage of the total CPU 

209 used by this process and children (recursively) since last call 

210 (first time called should be ignored). This number depends on the 

211 number of CPUs in the system and can be greater than 100% 

212 4. ``cpu_processes`` (:py:class:`int`): total number of processes 

213 including self and children (recursively) 

214 5. ``cpu_open_files`` (:py:class:`int`): total number of open files by 

215 self and children 

216 """ 

217 

218 # check all cluster components and update process list 

219 # done so we can keep the cpu_percent() initialization 

220 stored_children = set(self.cluster[1:]) 

221 current_children = set(self.cluster[0].children(recursive=True)) 

222 keep_children = stored_children - current_children 

223 new_children = current_children - stored_children 

224 gone = set() 

225 for k in new_children: 

226 try: 

227 k.cpu_percent(interval=None) 

228 except (psutil.ZombieProcess, psutil.NoSuchProcess): 

229 # child process is gone meanwhile 

230 # update the intermediate list for this time 

231 gone.add(k) 

232 new_children = new_children - gone 

233 self.cluster = ( 

234 self.cluster[:1] + list(keep_children) + list(new_children) 

235 ) 

236 

237 memory_info = [] 

238 cpu_percent = [] 

239 open_files = [] 

240 gone = set() 

241 for k in self.cluster: 

242 try: 

243 memory_info.append(k.memory_info()) 

244 cpu_percent.append(k.cpu_percent(interval=None)) 

245 open_files.append(len(k.open_files())) 

246 except (psutil.ZombieProcess, psutil.NoSuchProcess): 

247 # child process is gone meanwhile, just ignore it 

248 # it is too late to update any intermediate list 

249 # at this point, but ensures to update counts later on 

250 gone.add(k) 

251 

252 return ( 

253 ("cpu_memory_used", psutil.virtual_memory().used / GB), 

254 ("cpu_rss", sum([k.rss for k in memory_info]) / GB), 

255 ("cpu_vms", sum([k.vms for k in memory_info]) / GB), 

256 ("cpu_percent", sum(cpu_percent)), 

257 ("cpu_processes", len(self.cluster) - len(gone)), 

258 ("cpu_open_files", sum(open_files)), 

259 ) 

260 

261 

262class _InformationGatherer: 

263 """A container to store monitoring information. 

264 

265 Parameters 

266 ---------- 

267 

268 has_gpu : bool 

269 A flag indicating if we have a GPU installed on the platform or not 

270 

271 main_pid : int 

272 The main process identifier to monitor 

273 

274 logger : logging.Logger 

275 A logger to be used for logging messages 

276 """ 

277 

278 def __init__(self, has_gpu, main_pid, logger): 

279 self.cpu_logger = CPULogger(main_pid) 

280 self.keys = [k[0] for k in self.cpu_logger.log()] 

281 self.cpu_keys_len = len(self.keys) 

282 self.has_gpu = has_gpu 

283 self.logger = logger 

284 if self.has_gpu: 

285 self.keys += [k[0] for k in gpu_log()] 

286 self.data = [[] for _ in self.keys] 

287 

288 def acc(self): 

289 """Accumulates another measurement.""" 

290 for i, k in enumerate(self.cpu_logger.log()): 

291 self.data[i].append(k[1]) 

292 if self.has_gpu: 

293 for i, k in enumerate(gpu_log()): 

294 self.data[i + self.cpu_keys_len].append(k[1]) 

295 

296 def summary(self): 

297 """Returns the current data.""" 

298 

299 if len(self.data[0]) == 0: 

300 self.logger.error("CPU/GPU logger was not able to collect any data") 

301 retval = [] 

302 for k, values in zip(self.keys, self.data): 

303 retval.append((k, values)) 

304 return tuple(retval) 

305 

306 

307def _monitor_worker(interval, has_gpu, main_pid, stop, queue, logging_level): 

308 """A monitoring worker that measures resources and returns lists. 

309 

310 Parameters 

311 ========== 

312 

313 interval : int, float 

314 Number of seconds to wait between each measurement (maybe a floating 

315 point number as accepted by :py:func:`time.sleep`) 

316 

317 has_gpu : bool 

318 A flag indicating if we have a GPU installed on the platform or not 

319 

320 main_pid : int 

321 The main process identifier to monitor 

322 

323 stop : :py:class:`multiprocessing.Event` 

324 Indicates if we should continue running or stop 

325 

326 queue : :py:class:`queue.Queue` 

327 A queue, to send monitoring information back to the spawner 

328 

329 logging_level: int 

330 The logging level to use for logging from launched processes 

331 """ 

332 

333 logger = multiprocessing.log_to_stderr(level=logging_level) 

334 ra = _InformationGatherer(has_gpu, main_pid, logger) 

335 

336 while not stop.is_set(): 

337 try: 

338 ra.acc() # guarantees at least an entry will be available 

339 time.sleep(interval) 

340 except Exception: 

341 logger.warning( 

342 "Iterative CPU/GPU logging did not work properly " "this once", 

343 exc_info=True, 

344 ) 

345 time.sleep(0.5) # wait half a second, and try again! 

346 

347 queue.put(ra.summary()) 

348 

349 

350class ResourceMonitor: 

351 """An external, non-blocking CPU/GPU resource monitor. 

352 

353 Parameters 

354 ---------- 

355 

356 interval : int, float 

357 Number of seconds to wait between each measurement (maybe a floating 

358 point number as accepted by :py:func:`time.sleep`) 

359 

360 has_gpu : bool 

361 A flag indicating if we have a GPU installed on the platform or not 

362 

363 main_pid : int 

364 The main process identifier to monitor 

365 

366 logging_level: int 

367 The logging level to use for logging from launched processes 

368 """ 

369 

370 def __init__(self, interval, has_gpu, main_pid, logging_level): 

371 self.interval = interval 

372 self.has_gpu = has_gpu 

373 self.main_pid = main_pid 

374 self.event = multiprocessing.Event() 

375 self.q = multiprocessing.Queue() 

376 self.logging_level = logging_level 

377 

378 self.monitor = multiprocessing.Process( 

379 target=_monitor_worker, 

380 name="ResourceMonitorProcess", 

381 args=( 

382 self.interval, 

383 self.has_gpu, 

384 self.main_pid, 

385 self.event, 

386 self.q, 

387 self.logging_level, 

388 ), 

389 ) 

390 

391 self.data = None 

392 

393 @staticmethod 

394 def monitored_keys(has_gpu): 

395 return _InformationGatherer(has_gpu, None, logger).keys 

396 

397 def __enter__(self): 

398 """Starts the monitoring process.""" 

399 

400 self.monitor.start() 

401 return self 

402 

403 def __exit__(self, *exc): 

404 """Stops the monitoring process and returns the summary of 

405 observations.""" 

406 

407 self.event.set() 

408 self.monitor.join() 

409 if self.monitor.exitcode != 0: 

410 logger.error( 

411 f"CPU/GPU resource monitor process exited with code " 

412 f"{self.monitor.exitcode}. Check logs for errors!" 

413 ) 

414 

415 try: 

416 data = self.q.get(timeout=2 * self.interval) 

417 except queue.Empty: 

418 logger.warn( 

419 f"CPU/GPU resource monitor did not provide anything when " 

420 f"joined (even after a {2*self.interval}-second timeout - " 

421 f"this is normally due to exceptions on the monitoring process. " 

422 f"Check above for other exceptions." 

423 ) 

424 self.data = None 

425 else: 

426 # summarize the returned data by creating means 

427 summary = [] 

428 for k, values in data: 

429 if values: 

430 if k in ("cpu_processes", "cpu_open_files"): 

431 summary.append((k, numpy.max(values))) 

432 else: 

433 summary.append((k, numpy.mean(values))) 

434 else: 

435 summary.append((k, 0.0)) 

436 self.data = tuple(summary)