Module EDPluginExecProcess
|
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 __authors__ = [ "Marie-Francoise Incardona", "Olof Svensson", "Jérôme Kieffer" ]
31 __contact__ = "svensson@esrf.fr"
32 __license__ = "LGPLv3+"
33 __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
34
35 import threading, shlex, sys, time, subprocess
36
37 from EDVerbose import EDVerbose
38 from EDPlugin import EDPlugin
39 from EDPluginExec import EDPluginExec
40 from EDConfiguration import EDConfiguration
41 from EDMessage import EDMessage
42 from EDUtilsPlatform import EDUtilsPlatform
43
44 from XSDataCommon import XSPluginItem
45
46 OAR_POLL_COMMAND = 'oarstat -s -j {0:d}'
47 DEFAULT_OAR_POLL_INTERVAL = 10
48
50 """
51 The super class for all plugins that execute a process
52 This class manages the process to be launched:
53 - Process time out management (configurable by EDPlugin )
54 - Process executable to be invoked (configurable, Default 'cat')
55 - Process command line to be launched (Default: empty)
56 The ExecProcess plugin is required to have a configuration in order
57 to be executed in a plugin execution test case.
58 """
59
60 CONF_EXEC_PROCESS_TIME_OUT = "execProcessTimeOut"
61 CONF_EXEC_PROCESS_EXECUTABLE = "execProcessExecutable"
62
64 """
65 Initializes process related attributes described above
66 """
67 EDPluginExec.__init__(self)
68 self.setRequiredToHaveConfiguration()
69 self.__strConfigExecutable = "cat"
70 self.__strConfigCommandline = ""
71 self.__subprocess = None
72 self.__iPID = None
73 self.__strCWD = None
74 self.__strExecutionStatus = ""
75
76
95
96
98 """
99 Special processing method for OAR. Since oarsub returns
100 immediately we cannot rely on Popen.wait(). Instead we'll poll
101 the cluster using the oarstat utility. In order to do that
102 we'll setup a timer firing at regular intervals until we're
103 done or the total time exceeds the timeout config parameter.
104 """
105 if self._oar_options is not None:
106 command = '{0} {1} {2}'.format(self.getExecutable(),
107 self._oar_options,
108 self.getCommandline())
109 else:
110 command = '{0} {1}'.format(self.getExecutable(), self.getCommandline())
111 EDVerbose.DEBUG('EDPluginExecProcess.process_on_oar executing: "{0}"'.format(command))
112 self._start_time = time.time()
113 self._timer = threading.Timer(float(self._oar_poll_interval), self.poll_oar)
114 oarsub = subprocess.Popen(shlex.split(EDUtilsPlatform.escape(command)),
115 cwd=self.getWorkingDirectory(),
116 stdout=subprocess.PIPE)
117 oar_job_id = None
118 for line in oarsub.stdout:
119 if line.startswith('OARJOB_ID='):
120 oar_job_id = int(line.split()[1])
121 if oar_job_id is not None:
122 EDVerbose.DEBUG('EDPluginExecProcess.process_on_oar: job id is "{0}"'.format(oar_job_id))
123 self._oar_job_id = oar_job_id
124 else:
125 msg = 'EDPluginExecProcess.process_on_oar: could not get OAR_JOB_ID!'
126 EDVerbose.ERROR(msg)
127 self.addErrorMessage(msg)
128 raise RuntimeError(msg)
129
130
132 command = OAR_POLL_COMMAND.format(self._oar_job_id)
133 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: polling oar with "{0}"'.format(command))
134
135 oarstat = subprocess.Popen(shlex.split(command),
136 cwd=self.getWorkingDirectory(),
137 stdout=subprocess.PIPE)
138 status = None
139 for line in oarstat.stdout:
140 if line.startswith(str(self._oar_job_id)):
141 status = line.split()[-2]
142 break
143 if status is None:
144 msg = 'EDPluginExecProcess.poll_oar: could not get job status for job {0:d}'.format(self._oar_job_id)
145 EDVerbose.WARNING(msg)
146 self.addErrorMessage(msg)
147 raise RuntimeError(msg)
148 elif status == 'Terminated':
149 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: job {0:i} finished'.format(self._oar_job_id))
150 elif status == 'Running':
151 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: job {0:i} still running'.format(self._oar_job_id))
152 else:
153 msg = 'EDPluginExecProcess.poll_oar: job {0:i} in a non handled state'.format(self._oar_job_id)
154 EDVerbose.DEBUG(msg)
155 self.addErrorMessage(msg)
156 raise RuntimeError(msg)
157
158
159 if time.time() - self._start_time >= self.getTimeOut():
160 msg = 'EDPluginExecProcess.poll_oar: timeout!'
161 EDVerbose.ERROR(msg)
162 self.addErrorMessage(msg)
163 raise RuntimeError(msg)
164 else:
165 self._timer = threading.Timer(self._oar_poll_interval, self.poll_oar)
166 self._timer.start()
167
168
169 - def process(self, _edObject=None):
176
177
188
189
223
224
226 """
227 Sets the executable
228 """
229 self.__strConfigExecutable = _strExecutable
230
231
233 """
234 Sets the executable
235 """
236 if self.__strConfigExecutable == "python":
237 return sys.executable
238 return self.__strConfigExecutable
239
240
242 """
243 Sets the command line
244 """
245 self.__strConfigCommandline = _strCommandline
246
247
249 """
250 Returns the command line
251 """
252 return self.__strConfigCommandline
253
256
257
259 """
260 Returns the string containing the execution status.
261 """
262 return self.__strExecutionStatus
263