Coverage for src/gridtk/setshell.py: 22%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Wrappers for Idiap's SETSHELL functionality.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import os 

10import signal 

11import subprocess 

12import sys 

13 

14from .tools import str_ 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19def environ(context: str) -> dict[str, str]: 

20 """Retrieves the environment for a particular SETSHELL context.""" 

21 if "BASEDIRSETSHELL" not in os.environ: 

22 # It seems that we are in a hostile environment 

23 # try to source the Idiap-wide shell 

24 idiap_source = "/idiap/resource/software/initfiles/shrc" 

25 if os.path.exists(idiap_source): 

26 logger.debug("Sourcing: '%s'" % idiap_source) 

27 try: 

28 command = ["bash", "-c", "source %s && env" % idiap_source] 

29 pi = subprocess.Popen(command, stdout=subprocess.PIPE) 

30 # overwrite the default environment 

31 for line in pi.stdout: 

32 sline = str_(line) 

33 (key, _, value) = sline.partition("=") 

34 os.environ[key.strip()] = value.strip() 

35 except OSError: 

36 # occurs when the file is not executable or not found 

37 pass 

38 

39 # in case the BASEDIRSETSHELL environment variable is not set, we are not 

40 # at Idiap, and so we don't have to set any additional variables. 

41 if "BASEDIRSETSHELL" not in os.environ: 

42 return dict(os.environ) 

43 

44 BASEDIRSETSHELL = os.environ["BASEDIRSETSHELL"] 

45 dosetshell = "%s/setshell/bin/dosetshell" % BASEDIRSETSHELL 

46 

47 command = [dosetshell, "-s", "sh", context] 

48 

49 # First things first, we get the path to the temp file created by dosetshell 

50 try: 

51 logger.debug("Executing: '%s'", " ".join(command)) 

52 p = subprocess.Popen(command, stdout=subprocess.PIPE) 

53 except OSError as e: 

54 # occurs when the file is not executable or not found 

55 raise OSError( 

56 "Error executing '%s': %s (%d)" 

57 % (" ".join(command), e.strerror, e.errno) 

58 ) 

59 

60 try: 

61 source = str_(p.communicate()[0]).strip() 

62 except KeyboardInterrupt: # the user CTRL-C'ed 

63 os.kill(p.pid, signal.SIGTERM) 

64 sys.exit(signal.SIGTERM) 

65 

66 # We have now the name of the source file, source it and erase it 

67 command2 = ["bash", "-c", "source %s && env" % source] 

68 

69 try: 

70 logger.debug("Executing: '%s'", " ".join(command2)) 

71 p2 = subprocess.Popen(command2, stdout=subprocess.PIPE) 

72 except OSError as e: 

73 # occurs when the file is not executable or not found 

74 raise OSError( 

75 "Error executing '%s': %s (%d)" 

76 % (" ".join(command2), e.strerror, e.errno) 

77 ) 

78 

79 new_environ = dict(os.environ) 

80 for line in p2.stdout: 

81 sline = str_(line) 

82 (key, _, value) = sline.partition("=") 

83 new_environ[key.strip()] = value.strip() 

84 

85 try: 

86 p2.communicate() 

87 except KeyboardInterrupt: # the user CTRL-C'ed 

88 os.kill(p2.pid, signal.SIGTERM) 

89 sys.exit(signal.SIGTERM) 

90 

91 if os.path.exists(source): 

92 os.unlink(source) 

93 

94 logger.debug("Discovered environment for context '%s':", context) 

95 for k in sorted(new_environ.keys()): 

96 logger.debug(" %s = %s", k, new_environ[k]) 

97 

98 return new_environ 

99 

100 

101def sexec( 

102 context: str, command: list[str], error_on_nonzero: bool = True 

103) -> bytes: 

104 """Executes a command within a particular Idiap SETSHELL context.""" 

105 

106 if isinstance(context, (str, bytes)): 

107 E = environ(context) 

108 else: 

109 E = context 

110 

111 try: 

112 logger.debug("Executing: '%s'", " ".join(command)) 

113 p = subprocess.Popen( 

114 command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=E 

115 ) 

116 (stdout, _) = p.communicate() # note: stderr will be 'None' 

117 if p.returncode != 0: 

118 if error_on_nonzero: 

119 raise RuntimeError( 

120 "Execution of '%s' exited with status != 0 (%d): %s" 

121 % (" ".join(command), p.returncode, str_(stdout)) 

122 ) 

123 else: 

124 logger.debug( 

125 "Execution of '%s' exited with status != 0 (%d): %s" 

126 % (" ".join(command), p.returncode, str_(stdout)) 

127 ) 

128 

129 return stdout.strip() 

130 

131 except KeyboardInterrupt: # the user CTRC-C'ed 

132 os.kill(p.pid, signal.SIGTERM) 

133 sys.exit(signal.SIGTERM) 

134 

135 

136def replace(context, command): 

137 E = environ(context) 

138 os.execvpe(command[0], command, E)