Module EDPluginExecProcess
[hide private]
[frames] | no frames]

Source Code for Module EDPluginExecProcess

  1  # coding: utf8 
  2  # 
  3  #    Project: The EDNA Kernel 
  4  #             http://www.edna-site.org 
  5  # 
  6  #    File: "$Id$" 
  7  # 
  8  #    Copyright (C) 2008-2009 European Synchrotron Radiation Facility 
  9  #                            Grenoble, France 
 10  # 
 11  #    Principal authors: Marie-Francoise Incardona (incardon@esrf.fr) 
 12  #                       Olof Svensson (svensson@esrf.fr) 
 13  #                       Jérôme Kieffer (jerome.kieffer@esrf.eu) 
 14  # 
 15  #    This program is free software: you can redistribute it and/or modify 
 16  #    it under the terms of the GNU Lesser General Public License as published 
 17  #    by the Free Software Foundation, either version 3 of the License, or 
 18  #    (at your option) any later version. 
 19  # 
 20  #    This program is distributed in the hope that it will be useful, 
 21  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 22  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 23  #    GNU Lesser General Public License for more details. 
 24  # 
 25  #    You should have received a copy of the GNU General Public License 
 26  #    and the GNU Lesser General Public License  along with this program. 
 27  #    If not, see <http://www.gnu.org/licenses/>. 
 28  # 
 29   
 30  __authors__ = [ "Marie-Francoise Incardona", "Olof Svensson", "Jérôme Kieffer" ] 
 31  __contact__ = "svensson@esrf.fr" 
 32  __license__ = "LGPLv3+" 
 33  __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" 
 34   
 35  import threading, shlex, sys, time, subprocess 
 36   
 37  from EDVerbose          import EDVerbose 
 38  from EDPlugin           import EDPlugin 
 39  from EDPluginExec       import EDPluginExec 
 40  from EDConfiguration    import EDConfiguration 
 41  from EDMessage          import EDMessage 
 42  from EDUtilsPlatform    import EDUtilsPlatform 
 43   
 44  from XSDataCommon       import XSPluginItem 
 45   
 46  OAR_POLL_COMMAND = 'oarstat -s -j {0:d}' 
 47  DEFAULT_OAR_POLL_INTERVAL = 10 
 48   
49 -class EDPluginExecProcess(EDPluginExec):
50 """ 51 The super class for all plugins that execute a process 52 This class manages the process to be launched: 53 - Process time out management (configurable by EDPlugin ) 54 - Process executable to be invoked (configurable, Default 'cat') 55 - Process command line to be launched (Default: empty) 56 The ExecProcess plugin is required to have a configuration in order 57 to be executed in a plugin execution test case. 58 """ 59 60 CONF_EXEC_PROCESS_TIME_OUT = "execProcessTimeOut" 61 CONF_EXEC_PROCESS_EXECUTABLE = "execProcessExecutable" 62
63 - def __init__ (self):
64 """ 65 Initializes process related attributes described above 66 """ 67 EDPluginExec.__init__(self) 68 self.setRequiredToHaveConfiguration() 69 self.__strConfigExecutable = "cat" 70 self.__strConfigCommandline = "" 71 self.__subprocess = None 72 self.__iPID = None 73 self.__strCWD = None 74 self.__strExecutionStatus = ""
75 76
77 - def process_locally(self, _edObject=None):
78 """ 79 Sets the process up with the executable, command line and time out 80 Launches the process, in case of error, an error message is added to the list, and the plugins fails 81 """ 82 strCommand = self.getExecutable() + " " + self.getCommandline() 83 EDVerbose.DEBUG("EDPluginExecProcess.process executing: " + self.getExecutable()) 84 self.synchronizeOn() 85 EDVerbose.screen(self.getBaseName() + ": Processing") 86 timer = threading.Timer(float(self.getTimeOut()), self.kill) 87 timer.start() 88 self.__subprocess = EDUtilsPlatform.Popen(shlex.split(str(EDUtilsPlatform.escape(strCommand))), 89 cwd=self.getWorkingDirectory()) 90 self.__iPID = self.__subprocess.pid 91 self.__strExecutionStatus = str(self.__subprocess.wait()) 92 timer.cancel() 93 EDVerbose.DEBUG("EDPluginExecProcess.process finished ") 94 self.synchronizeOff()
95 96
97 - def process_on_oar(self, _edObject=None):
98 """ 99 Special processing method for OAR. Since oarsub returns 100 immediately we cannot rely on Popen.wait(). Instead we'll poll 101 the cluster using the oarstat utility. In order to do that 102 we'll setup a timer firing at regular intervals until we're 103 done or the total time exceeds the timeout config parameter. 104 """ 105 if self._oar_options is not None: 106 command = '{0} {1} {2}'.format(self.getExecutable(), 107 self._oar_options, 108 self.getCommandline()) 109 else: 110 command = '{0} {1}'.format(self.getExecutable(), self.getCommandline()) 111 EDVerbose.DEBUG('EDPluginExecProcess.process_on_oar executing: "{0}"'.format(command)) 112 self._start_time = time.time() 113 self._timer = threading.Timer(float(self._oar_poll_interval), self.poll_oar) 114 oarsub = subprocess.Popen(shlex.split(EDUtilsPlatform.escape(command)), 115 cwd=self.getWorkingDirectory(), 116 stdout=subprocess.PIPE) 117 oar_job_id = None 118 for line in oarsub.stdout: 119 if line.startswith('OARJOB_ID='): 120 oar_job_id = int(line.split()[1]) 121 if oar_job_id is not None: 122 EDVerbose.DEBUG('EDPluginExecProcess.process_on_oar: job id is "{0}"'.format(oar_job_id)) 123 self._oar_job_id = oar_job_id 124 else: 125 msg = 'EDPluginExecProcess.process_on_oar: could not get OAR_JOB_ID!' 126 EDVerbose.ERROR(msg) 127 self.addErrorMessage(msg) 128 raise RuntimeError(msg)
129 130
131 - def poll_oar(self):
132 command = OAR_POLL_COMMAND.format(self._oar_job_id) 133 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: polling oar with "{0}"'.format(command)) 134 135 oarstat = subprocess.Popen(shlex.split(command), 136 cwd=self.getWorkingDirectory(), 137 stdout=subprocess.PIPE) 138 status = None 139 for line in oarstat.stdout: 140 if line.startswith(str(self._oar_job_id)): 141 status = line.split()[-2] 142 break 143 if status is None: 144 msg = 'EDPluginExecProcess.poll_oar: could not get job status for job {0:d}'.format(self._oar_job_id) 145 EDVerbose.WARNING(msg) 146 self.addErrorMessage(msg) 147 raise RuntimeError(msg) 148 elif status == 'Terminated': 149 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: job {0:i} finished'.format(self._oar_job_id)) 150 elif status == 'Running': 151 EDVerbose.DEBUG('EDPluginExecProcess.poll_oar: job {0:i} still running'.format(self._oar_job_id)) 152 else: 153 msg = 'EDPluginExecProcess.poll_oar: job {0:i} in a non handled state'.format(self._oar_job_id) 154 EDVerbose.DEBUG(msg) 155 self.addErrorMessage(msg) 156 raise RuntimeError(msg) 157 158 # we must then decide if we continue for another polling round 159 if time.time() - self._start_time >= self.getTimeOut(): 160 msg = 'EDPluginExecProcess.poll_oar: timeout!' 161 EDVerbose.ERROR(msg) 162 self.addErrorMessage(msg) 163 raise RuntimeError(msg) 164 else: 165 self._timer = threading.Timer(self._oar_poll_interval, self.poll_oar) 166 self._timer.start()
167 168
169 - def process(self, _edObject=None):
170 EDPluginExec.process(self) 171 EDVerbose.DEBUG("EDPluginExecProcess.process starting") 172 if self.getExecutable() == "oarsub": 173 self.process_on_oar(_edObject) 174 else: 175 self.process_locally(_edObject)
176 177
178 - def kill(self):
179 EDVerbose.WARNING("I will kill subprocess %s pid= %s" % (self.__subprocess, self.__iPID)) 180 EDUtilsPlatform.kill(self.__iPID) 181 self.synchronizeOff() 182 self.__strExecutionStatus = "timeout" 183 EDVerbose.DEBUG("EDPluginExecProcess.process ========================================= ERROR! ================") 184 errorMessage = EDMessage.ERROR_EXECUTION_03 % ('EDPluginExecProcess.process', self.getClassName(), "Timeout ") 185 EDVerbose.error(errorMessage) 186 self.addErrorMessage(errorMessage) 187 raise RuntimeError, errorMessage
188 189
190 - def configure(self):
191 """ 192 Configures the plugin with executable from configuration file 193 """ 194 EDPluginExec.configure(self) 195 EDVerbose.DEBUG("EDPluginExecProcess.configure") 196 strExecutable = self.config.get(self.CONF_EXEC_PROCESS_EXECUTABLE, None) 197 if strExecutable is None: 198 EDVerbose.DEBUG("EDPluginExecProcess.configure: No configuration parameter found for: %s , using default value: %s"\ 199 % (self.CONF_EXEC_PROCESS_EXECUTABLE, self.getExecutable())) 200 else: 201 self.setExecutable(strExecutable) 202 203 # test if we're to use oar and check for additional config 204 if strExecutable == "oarsub": 205 oar_options = self.config.get("oarOptions", None) 206 if oar_options is None: 207 EDVerbose.DEBUG('EDPluginExecProcess.configure: no additional options were specified for oarsub') 208 self._oar_options = oar_options 209 oar_poll = self.config.get("oarPollInterval", None) 210 if oar_poll is None: 211 EDVerbose.DEBUG('EDPluginExecProcess.configure: oar polling interval not configured') 212 EDVerbose.DEBUG('EDPluginExecProcess.configure: using default version of {0}'.format(DEFAULT_OAR_POLL_INTERVAL)) 213 self._oar_poll_interval = DEFAULT_OAR_POLL_INTERVAL 214 else: 215 self._oar_poll_interval = oar_poll 216 217 # The execProcessTimeOut is deprecated, see bug #563 218 timeOut = self.config.get(self.CONF_EXEC_PROCESS_TIME_OUT, None) 219 if timeOut is not None: 220 EDVerbose.WARNING("Use of %s in plugin configuration is deprecated" % self.CONF_EXEC_PROCESS_TIME_OUT) 221 EDVerbose.WARNING("Please use %s instead." % EDPlugin.CONF_TIME_OUT) 222 self.setTimeOut(timeOut)
223 224
225 - def setExecutable(self, _strExecutable):
226 """ 227 Sets the executable 228 """ 229 self.__strConfigExecutable = _strExecutable
230 231
232 - def getExecutable(self):
233 """ 234 Sets the executable 235 """ 236 if self.__strConfigExecutable == "python": 237 return sys.executable 238 return self.__strConfigExecutable
239 240
241 - def setCommandline(self, _strCommandline):
242 """ 243 Sets the command line 244 """ 245 self.__strConfigCommandline = _strCommandline
246 247
248 - def getCommandline(self):
249 """ 250 Returns the command line 251 """ 252 return self.__strConfigCommandline
253
254 - def getPid(self):
255 return self.__iPID
256 257
258 - def getExecutionStatus(self):
259 """ 260 Returns the string containing the execution status. 261 """ 262 return self.__strExecutionStatus
263