Module EDAction
[hide private]
[frames] | no frames]

Source Code for Module EDAction

  1  # coding: utf8 
  2  # 
  3  #    Project: The EDNA Kernel 
  4  #             http://www.edna-site.org 
  5  # 
  6  #    File: "$Id$" 
  7  # 
  8  #    Copyright (C) 2008-2009 European Synchrotron Radiation Facility 
  9  #                            Grenoble, France 
 10  # 
 11  #    Principal authors: Olof Svensson (svensson@esrf.fr)  
 12  #                       Jérôme Kieffer (jerome.kieffer@esrf.fr)  
 13  # 
 14  #    This program is free software: you can redistribute it and/or modify 
 15  #    it under the terms of the GNU Lesser General Public License as published 
 16  #    by the Free Software Foundation, either version 3 of the License, or 
 17  #    (at your option) any later version. 
 18  # 
 19  #    This program is distributed in the hope that it will be useful, 
 20  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 21  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 22  #    GNU Lesser General Public License for more details. 
 23  # 
 24  #    You should have received a copy of the GNU General Public License 
 25  #    and the GNU Lesser General Public License  along with this program.   
 26  #    If not, see <http://www.gnu.org/licenses/>. 
 27  # 
 28   
 29  # 
 30  # This class has been inspired by the corresponding AALib class  
 31  # (20090518-PyAALib-JyAALib-111) and modified according to the needs  
 32  # for the EDNA project. 
 33  # 
 34   
 35  __authors__ = [ "Olof Svensson", "Jérôme Kieffer" ] 
 36  __contact__ = "svensson@esrf.fr" 
 37  __license__ = "LGPLv3+" 
 38  __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" 
 39   
 40  """ 
 41  This class is taking care of the workflow preProcess - process - postProcess. 
 42  """ 
 43   
 44   
 45  import time, os 
 46  from threading   import Thread 
 47  from EDSlot      import EDSlot 
 48  from EDVerbose   import EDVerbose 
 49  from EDLogging   import EDLogging 
 50   
 51   
52 -class EDAction(EDLogging, Thread):
53 """ 54 This class is taking care of executing the EDNA plugin and 55 application workflow preProcess - process - postProcess. 56 The detailed workflow looks like this: 57 58 preProcess 59 if not failure: slotPreProcess 60 if not failure: process 61 if not failure: slotProcess 62 if not failure: postProcess 63 if not failure: slotPostProcess 64 if not failure: slotSUCCESS 65 if failure: slotFAILURE 66 Always: finally Process 67 """ 68
69 - def __init__(self):
70 EDLogging.__init__(self) 71 Thread.__init__(self) 72 self.__edSlotPreProcess = EDSlot() 73 self.__edSlotProcess = EDSlot() 74 self.__edSlotPostProcess = EDSlot() 75 self.__edSlotSUCCESS = EDSlot() 76 self.__edSlotFAILURE = EDSlot() 77 self.__edSlotFinallyProcess = EDSlot() 78 self.__bIsFailure = False 79 self.__bIsTimeOut = False 80 self.__fTimeOutInSeconds = None 81 self.__fDefaultTimeOutInSeconds = 600.0 82 self.__bIsAbort = False 83 # Reference to the object which calls execute or executeSynchronous 84 self.__edObject = None 85 self.__lExtraTime = [] # list of extra allowed time for execution (in second) 86 self.__bLogTiming = False
87
88 - def executeKernel(self):
89 dictTimeStamps = { "init": time.time() } 90 self.setTimeInit() 91 try: 92 93 if (not self.isFailure()): 94 self.DEBUG("EDAction.executeKernel preProcess " + self.getClassName()) 95 self.preProcess() 96 if self.__bLogTiming: 97 dictTimeStamps["preProcess"] = time.time() 98 99 if (not self.isFailure()): 100 self.DEBUG("EDAction.executeKernel slotPreProcess " + self.getClassName()) 101 self.__edSlotPreProcess.call(self) 102 if self.__bLogTiming: 103 dictTimeStamps["slotPreProcess"] = time.time() 104 105 if (not self.isFailure()): 106 self.DEBUG("EDAction.executeKernel process " + self.getClassName()) 107 self.process() 108 if self.__bLogTiming: 109 dictTimeStamps["process"] = time.time() 110 111 if (not self.isFailure()): 112 self.DEBUG("EDAction.executeKernel slotProcess " + self.getClassName()) 113 self.__edSlotProcess.call(self) 114 if self.__bLogTiming: 115 dictTimeStamps["slotProcess"] = time.time() 116 117 if (not self.isFailure()): 118 self.DEBUG("EDAction.executeKernel postProcess " + self.getClassName()) 119 self.postProcess() 120 if self.__bLogTiming: 121 dictTimeStamps["postProcess"] = time.time() 122 123 if (not self.isFailure()): 124 self.DEBUG("EDAction.executeKernel slotPostProcess " + self.getClassName()) 125 self.__edSlotPostProcess.call(self) 126 if self.__bLogTiming: 127 dictTimeStamps["slotPostProcess"] = time.time() 128 129 except Exception: 130 self.writeErrorTrace() 131 self.setFailure() 132 133 # Execute finally process even in case of failure 134 self.DEBUG("EDAction.executeKernel finallyProcess" + self.getClassName()) 135 try: 136 self.finallyProcess() 137 if self.__bLogTiming: 138 dictTimeStamps["finallyProcess"] = time.time() 139 except Exception: 140 self.DEBUG("EDAction.executeKernel: ERROR in finallyProcess!") 141 self.writeErrorTrace() 142 self.setFailure() 143 try: 144 self.__edSlotFinallyProcess.call(self) 145 except Exception: 146 self.DEBUG("EDAction.executeKernel: ERROR in slotFinallyProcess!") 147 self.writeErrorTrace() 148 self.setFailure() 149 150 if (not self.isFailure()): 151 self.DEBUG("EDAction.executeKernel slotSUCCESS") 152 # Check that something doesn't go wrong in the success method! 153 try: 154 self.__edSlotSUCCESS.call(self) 155 if self.__bLogTiming: 156 dictTimeStamps["slotSUCCESS"] = time.time() 157 158 except Exception: 159 self.DEBUG("EDAction.executeKernel: ERROR in slotSUCCESS!") 160 self.writeErrorTrace() 161 self.setFailure() 162 163 if (self.isFailure()): 164 self.DEBUG("EDAction.executeKernel slotFAILURE") 165 # Check that something doesn't go wrong in the success method! 166 try: 167 self.__edSlotFAILURE.call(self) 168 if self.__bLogTiming: 169 dictTimeStamps["slotFAILURE"] = time.time() 170 except Exception: 171 self.DEBUG("EDAction.executeKernel: ERROR in slotFAILURE!") 172 self.writeErrorTrace() 173 174 self.setTimeEnd() 175 if self.__bLogTiming: 176 lstTimings = [] 177 178 dictTimeStamps["end"] = time.time() 179 lstTimings.append("EDAction.executeKernel: profiling of %s %i \t total time duration = %.3f s" % (self.getClassName(), self.getId(), dictTimeStamps["end"] - dictTimeStamps["init"])) 180 fTimeForFailureCalculation = dictTimeStamps["init"] 181 if "preProcess" in dictTimeStamps: 182 lstTimings.append("\t preProcess \t\t time duration = %.3f s" % (dictTimeStamps["preProcess"] - dictTimeStamps["init"])) 183 fTimeForFailureCalculation = dictTimeStamps["preProcess"] 184 185 if "process" in dictTimeStamps: 186 lstTimings.append("\t process \t\t time duration = %.3f s" % (dictTimeStamps["process"] - dictTimeStamps["slotPreProcess"])) 187 fTimeForFailureCalculation = dictTimeStamps["process"] 188 if "postProcess" in dictTimeStamps: 189 lstTimings.append("\t postProcess \t\t time duration = %.3f s" % (dictTimeStamps["postProcess"] - dictTimeStamps["slotProcess"])) 190 fTimeForFailureCalculation = dictTimeStamps["postProcess"] 191 fTimeForFinallyCalculation = fTimeForFailureCalculation 192 if "slotSUCCESS" in dictTimeStamps: 193 lstTimings.append("\t slotSUCCESS \t\t time duration = %.3f s" % (dictTimeStamps["slotSUCCESS"] - dictTimeStamps["slotPostProcess"])) 194 fTimeForFailureCalculation = dictTimeStamps["slotSUCCESS"] 195 if "slotFAILURE" in dictTimeStamps: 196 lstTimings.append("\t slotFAILURE \t\t time duration = %.3f s" % (dictTimeStamps["slotFAILURE"] - fTimeForFailureCalculation)) 197 if dictTimeStamps.has_key("finallyProcess"): 198 lstTimings.append("\t finallyProcess \t time duration = %.3f s" % (dictTimeStamps["finallyProcess"] - fTimeForFinallyCalculation)) 199 self.log(os.linesep.join(lstTimings))
200
201 - def synchronize(self):
202 """ 203 Wait for the thread to finish. Since the time out is used by 204 e.g. EDPluginExecProcessScript we add an extra second in order 205 to allow the subclasses to handle the time out - without this 206 extra second it's the EDAction class who time-outs first. 207 208 Note that this does not in any case add time to the execution, 209 because the extra second is only used for time-outs. The method 210 returns immediately once the thread has finished. 211 """ 212 EDVerbose.DEBUG("EDAction.synchronize() for %s" % self.getName()) 213 # fTimeOut = self.__fTimeOutInSeconds 214 if self.__fTimeOutInSeconds is None: 215 self.__fTimeOutInSeconds = self.__fDefaultTimeOutInSeconds 216 217 #Wait for the thread to be started up to timeout 218 if self.isStarted() == False: 219 tStartWait = time.time() 220 while self.isStarted() == False: 221 time.sleep(1) 222 if time.time() - tStartWait > self.__fTimeOutInSeconds: 223 self.__bIsTimeOut = True 224 strErrorMessage = "Timeout when waiting for %s to start!" % self.getClassName() 225 EDVerbose.DEBUG("EDAction.synchronize: " + strErrorMessage) 226 EDVerbose.ERROR(strErrorMessage) 227 self.setFailure() 228 return 229 # We add an extra second in order to allow execution plugin to finish 230 # which have the same timeout 231 self.join(float(self.__fTimeOutInSeconds + 1)) 232 for fExtraTime in self.__lExtraTime: 233 self.join(float(fExtraTime)) 234 if self.isAlive(): 235 # Timeout! 236 self.__bIsTimeOut = True 237 EDVerbose.DEBUG("EDAction.synchronize: Timeout!") 238 strErrorMessage = "Timeout when waiting for %s to terminate." % self.getClassName() 239 EDVerbose.ERROR(strErrorMessage) 240 self.setFailure()
241 242
243 - def isTimeOut(self):
244 return self.__bIsTimeOut
245 246
247 - def hasTimedOut(self, _bTimeout):
248 """ 249 Enforce the timeout state 250 251 @param _bTimeout: if you think you can force this ! 252 @type _bTimeout: boolean 253 """ 254 self.__bIsTimeOut = bool(_bTimeout)
255 256
257 - def isFailure(self):
258 return self.__bIsFailure
259 260
261 - def setFailure(self):
262 self.__bIsFailure = True
263 264
265 - def run(self):
266 self.executeKernel()
267 268
269 - def executeAction(self, _edObject=None):
270 self.execute(_edObject)
271 272
273 - def executeActionSynchronous(self, _edObject=None):
274 self.executeSynchronous()
275 276 277
278 - def connectPreProcess(self, _oMethod):
279 self.synchronizeOn() 280 if (_oMethod != None): 281 self.__edSlotPreProcess.connect(_oMethod) 282 self.synchronizeOff()
283 284
285 - def connectProcess(self, _oMethod):
286 self.synchronizeOn() 287 if (_oMethod != None): 288 self.__edSlotProcess.connect(_oMethod) 289 self.synchronizeOff()
290 291
292 - def connectPostProcess(self, _oMethod):
293 self.synchronizeOn() 294 if (_oMethod != None): 295 self.__edSlotPostProcess.connect(_oMethod) 296 self.synchronizeOff()
297 298
299 - def connectSUCCESS(self, _oMethod):
300 self.synchronizeOn() 301 if (_oMethod != None): 302 self.__edSlotSUCCESS.connect(_oMethod) 303 self.synchronizeOff()
304 305
306 - def connectFAILURE(self, _oMethod):
307 self.synchronizeOn() 308 if (_oMethod != None): 309 self.__edSlotFAILURE.connect(_oMethod) 310 self.synchronizeOff()
311
312 - def connectFinallyProcess(self, _oMethod):
313 self.synchronizeOn() 314 if (_oMethod != None): 315 self.__edSlotFinallyProcess.connect(_oMethod) 316 self.synchronizeOff()
317
318 - def isRunning(self):
319 return self.isAlive()
320
321 - def isEnded(self):
322 EDVerbose.DEBUG("%s.isEnded return %s" % (self.getName(), (self.getTimeEnd() is not None))) 323 return (self.getTimeEnd() is not None)
324
325 - def isStarted(self):
326 EDVerbose.DEBUG("%s.isStarted return %s, %s " % (self.getName(), (self.getTimeInit() is not None), self.getTimeInit())) 327 return (self.getTimeInit() is not None)
328 329
330 - def setTimeOut(self, _fTimeOut):
331 """ 332 Sets the time out 333 """ 334 EDVerbose.DEBUG("EDAction.setTimeOut called with value %s" % _fTimeOut) 335 self.__fTimeOutInSeconds = float(_fTimeOut)
336 337
338 - def getTimeOut(self):
339 """ 340 Returns the time out 341 """ 342 if self.__fTimeOutInSeconds is None: 343 self.__fTimeOutInSeconds = self.__fDefaultTimeOutInSeconds 344 return self.__fTimeOutInSeconds
345
346 - def addExtraTime(self, _fExtraTime):
347 """ 348 Allows to incread the timeout of a plugin while it is running 349 @param _fExtraTime: extra time to be added to timeout 350 @type _fExtraTime: float 351 """ 352 self.__lExtraTime.append(_fExtraTime) 353 self.setTimeOut(self.getTimeOut() + _fExtraTime)
354
355 - def getDefaultTimeOut(self):
356 """ 357 Returns the time out 358 """ 359 return self.__fDefaultTimeOutInSeconds
360 361
362 - def getSlotSUCCESS(self):
363 return self.__edSlotSUCCESS
364 365
366 - def getSlotFAILURE(self):
367 return self.__edSlotFAILURE
368 369
370 - def preProcess(self, _edObject=None):
371 pass
372 373
374 - def process(self, _edObject=None):
375 pass
376 377
378 - def postProcess(self, _edObject=None):
379 pass
380 381
382 - def abort(self, _edObject=None):
383 pass
384 385
386 - def finallyProcess(self, _edObject=None):
387 pass
388 389
390 - def execute(self, _edObject=None):
391 self.__bIsStarted = True 392 self.__edObject = _edObject 393 self.start()
394 395
396 - def executeSynchronous(self, _edObject=None):
397 self.execute(_edObject) 398 self.synchronize()
399 400
401 - def setLogTiming(self, _bValue):
402 """ 403 Force this action to log it's timing to file 404 """ 405 self.__bLogTiming = bool(_bValue)
406
407 - def getLogTiming(self):
408 return self.__bLogTiming
409 logTiming = property(getLogTiming, setLogTiming)
410