Coverage for src/gridtk/setshell.py: 22%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Wrappers for Idiap's SETSHELL functionality.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import os 

10import signal 

11import subprocess 

12import sys 

13 

14from .tools import str_ 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def environ(context: str) -> dict[str, str]: 

20 """Retrieves the environment for a particular SETSHELL context.""" 

21 if "BASEDIRSETSHELL" not in os.environ: 

22 # It seems that we are in a hostile environment 

23 # try to source the Idiap-wide shell 

24 idiap_source = "/idiap/resource/software/initfiles/shrc" 

25 if os.path.exists(idiap_source): 

26 logger.debug("Sourcing: '%s'" % idiap_source) 

27 try: 

28 command = ["bash", "-c", "source %s && env" % idiap_source] 

29 pi = subprocess.Popen(command, stdout=subprocess.PIPE) 

30 # overwrite the default environment 

31 for line in pi.stdout: 

32 sline = str_(line) 

33 (key, _, value) = sline.partition("=") 

34 os.environ[key.strip()] = value.strip() 

35 except OSError: 

36 # occurs when the file is not executable or not found 

37 pass 

38 

39 # in case the BASEDIRSETSHELL environment variable is not set, we are not 

40 # at Idiap, and so we don't have to set any additional variables. 

41 if "BASEDIRSETSHELL" not in os.environ: 

42 return dict(os.environ) 

43 

44 BASEDIRSETSHELL = os.environ["BASEDIRSETSHELL"] 

45 dosetshell = "%s/setshell/bin/dosetshell" % BASEDIRSETSHELL 

46 

47 command = [dosetshell, "-s", "sh", context] 

48 

49 # First things first, we get the path to the temp file created by dosetshell 

50 try: 

51 logger.debug("Executing: '%s'", " ".join(command)) 

52 p = subprocess.Popen(command, stdout=subprocess.PIPE) 

53 except OSError as e: 

54 # occurs when the file is not executable or not found 

55 raise OSError( 

56 "Error executing '%s': %s (%d)" % (" ".join(command), e.strerror, e.errno) 

57 ) 

58 

59 try: 

60 source = str_(p.communicate()[0]).strip() 

61 except KeyboardInterrupt: # the user CTRL-C'ed 

62 os.kill(p.pid, signal.SIGTERM) 

63 sys.exit(signal.SIGTERM) 

64 

65 # We have now the name of the source file, source it and erase it 

66 command2 = ["bash", "-c", "source %s && env" % source] 

67 

68 try: 

69 logger.debug("Executing: '%s'", " ".join(command2)) 

70 p2 = subprocess.Popen(command2, stdout=subprocess.PIPE) 

71 except OSError as e: 

72 # occurs when the file is not executable or not found 

73 raise OSError( 

74 "Error executing '%s': %s (%d)" % (" ".join(command2), e.strerror, e.errno) 

75 ) 

76 

77 new_environ = dict(os.environ) 

78 for line in p2.stdout: 

79 sline = str_(line) 

80 (key, _, value) = sline.partition("=") 

81 new_environ[key.strip()] = value.strip() 

82 

83 try: 

84 p2.communicate() 

85 except KeyboardInterrupt: # the user CTRL-C'ed 

86 os.kill(p2.pid, signal.SIGTERM) 

87 sys.exit(signal.SIGTERM) 

88 

89 if os.path.exists(source): 

90 os.unlink(source) 

91 

92 logger.debug("Discovered environment for context '%s':", context) 

93 for k in sorted(new_environ.keys()): 

94 logger.debug(" %s = %s", k, new_environ[k]) 

95 

96 return new_environ 

97 

98 

99def sexec(context: str, command: list[str], error_on_nonzero: bool = True) -> bytes: 

100 """Executes a command within a particular Idiap SETSHELL context.""" 

101 

102 if isinstance(context, (str, bytes)): 

103 E = environ(context) 

104 else: 

105 E = context 

106 

107 try: 

108 logger.debug("Executing: '%s'", " ".join(command)) 

109 p = subprocess.Popen( 

110 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=E 

111 ) 

112 (stdout, _) = p.communicate() # note: stderr will be 'None' 

113 if p.returncode != 0: 

114 if error_on_nonzero: 

115 raise RuntimeError( 

116 "Execution of '%s' exited with status != 0 (%d): %s" 

117 % (" ".join(command), p.returncode, str_(stdout)) 

118 ) 

119 else: 

120 logger.debug( 

121 "Execution of '%s' exited with status != 0 (%d): %s" 

122 % (" ".join(command), p.returncode, str_(stdout)) 

123 ) 

124 

125 return stdout.strip() 

126 

127 except KeyboardInterrupt: # the user CTRC-C'ed 

128 os.kill(p.pid, signal.SIGTERM) 

129 sys.exit(signal.SIGTERM) 

130 

131 

132def replace(context, command): 

133 E = environ(context) 

134 os.execvpe(command[0], command, E)