Module EDPluginControl
[hide private]
[frames] | no frames]

Source Code for Module EDPluginControl

  1  # 
  2  #    Project: The EDNA Kernel 
  3  #             http://www.edna-site.org 
  4  # 
  5  #    File: "$Id$" 
  6  # 
  7  #    Copyright (C) 2008-2009 European Synchrotron Radiation Facility 
  8  #                            Grenoble, France 
  9  # 
 10  #    Principal authors: Marie-Francoise Incardona (incardon@esrf.fr) 
 11  #                       Olof Svensson (svensson@esrf.fr)  
 12  # 
 13  #    This program is free software: you can redistribute it and/or modify 
 14  #    it under the terms of the GNU Lesser General Public License as published 
 15  #    by the Free Software Foundation, either version 3 of the License, or 
 16  #    (at your option) any later version. 
 17  # 
 18  #    This program is distributed in the hope that it will be useful, 
 19  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 20  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 21  #    GNU Lesser General Public License for more details. 
 22  # 
 23  #    You should have received a copy of the GNU General Public License 
 24  #    and the GNU Lesser General Public License  along with this program.   
 25  #    If not, see <http://www.gnu.org/licenses/>. 
 26  # 
 27   
 28  from __future__ import with_statement 
 29   
 30  __authors__ = ["Marie-Francoise Incardona", "Olof Svensson"] 
 31  __contact__ = "svensson@esrf.fr" 
 32  __license__ = "LGPLv3+" 
 33  __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" 
 34   
 35   
 36  import os, time, gc 
 37   
 38  from EDVerbose import EDVerbose 
 39  from EDPlugin import EDPlugin 
 40  from EDSlot import EDSlot 
 41  from EDConfiguration import EDConfiguration 
 42  from EDActionCluster import EDActionCluster 
 43  from EDFactoryPluginStatic import EDFactoryPluginStatic 
 44   
 45   
46 -class EDPluginControl(EDPlugin):
47 """ 48 An EDPluginControl is a plugin that is responsible for a EDPluginExec or EDPluginControl plugin execution: 49 It is responsible for: 50 - The EDPluginExec or EDPluginControl Workflow 51 - The data propagation between the EDPluginExec 52 - The translation between generic and specific data models via EDHandler classes 53 - The error/warning propagation 54 - The executive summaries propagation 55 - Execution of an "action cluster": a set of plugins can be added to a so called "action cluster" 56 with the method "addPluginToActionCluster". All the plugins in the cluster can then be executed 57 simultaneously with the method "executeActionCluster" and synchronized with the method 58 "synchronizeActionCluster". The number of threads used by the action cluster is by default the 59 number of processors available on the computer, but this value can be changed either by 60 calling the method "setClusterSize" or by using the configuration parameter "clusterSize". 61 """ 62
63 - def __init__ (self):
64 """ 65 """ 66 EDPlugin.__init__(self) 67 self.__strPluginToBeControlledName = None 68 self.__dictControlledPlugins = {} 69 self.__edActionCluster = None 70 self.__iClusterSize = None 71 self.__listOfLoadedPlugins = []
72 73
74 - def configure(self):
75 """ 76 Gets the EDPluginControl parameters from the configuration file and stores them in class member attributes. 77 """ 78 EDPlugin.configure(self) 79 EDVerbose.DEBUG("EDPluginControl.configure") 80 strControlledPlugins = self.config.get("controlledPlugins", None) 81 if strControlledPlugins is not None: 82 pyListControlledPlugins = strControlledPlugins.split(",") 83 for strControlledPlugin in pyListControlledPlugins: 84 strControlledPluginName = self.config.get(strControlledPlugin) 85 if strControlledPluginName != None: 86 self.setControlledPluginName(strControlledPlugin, strControlledPluginName) 87 EDVerbose.DEBUG("EDPluginControl.configure: setting controlled plugin %s to specific plugin %s" % (strControlledPlugin, strControlledPluginName)) 88 strClusterSize = self.config.get("clusterSize", None) 89 if strClusterSize is not None: 90 self.__iClusterSize = int(strClusterSize) 91 EDVerbose.DEBUG("EDPluginControl.configure: setting cluster size to %d" % self.__iClusterSize)
92 93
94 - def emptyListOfLoadedPlugin(self):
95 """ 96 Reset all plugins kept in memory 97 """ 98 self.__listOfLoadedPlugins = [] 99 gc.collect()
100
101 - def getListOfLoadedPlugin(self):
102 """ 103 """ 104 return self.__listOfLoadedPlugins
105 106
107 - def removeLoadedPlugin(self, _plugin):
108 """ 109 Remove a plugin from the list of loaded plugins to free some memory 110 @param _plugin: plugin to remove 111 @type _plugin: instance of the class EDPlugin 112 """ 113 if _plugin in self.__listOfLoadedPlugins: 114 with self.locked(): 115 self.__listOfLoadedPlugins.remove(_plugin) 116 self.DEBUG("EDPluginControl.removeLoadedPlugin: Caught, removed %s unreferenced objects. currently there are %i plugins" % (gc.get_count(), len(self.__listOfLoadedPlugins))) 117 gc.collect() 118 else: 119 self.DEBUG("EDPluginControl.removeLoadedPlugin: Missed. currently there are %i plugins" % len(self.__listOfLoadedPlugins))
120 121
122 - def synchronizePlugins(self):
123 EDVerbose.DEBUG("EDPluginControl.synchronizePlugins") 124 bSynchronized = False 125 while not bSynchronized: 126 listPluginOrig = self.__listOfLoadedPlugins[:] 127 for edPlugin in listPluginOrig: 128 if edPlugin.isStarted() and (not edPlugin.isEnded()): 129 edPlugin.synchronize() 130 time.sleep(0.01) 131 with self.locked(): 132 bSynchronized = (self.__listOfLoadedPlugins == listPluginOrig)
133 134
135 - def loadPlugins(self):
136 """ 137 This method loads and returns a list of references to the plugins to be controlled. 138 139 The name of the plugin to be controlled is set set before calling this method using the 140 "setControlledPluginName" method. 141 142 The base name of the plugin to be controlled is used as the working 143 directory name of the plugin in question. The name of the plugin is used as 144 base name. 145 """ 146 EDVerbose.DEBUG("EDPluginControl.loadPlugins") 147 listKeys = self.__dictControlledPlugins.keys() 148 listLoadedPlugins = [] 149 for strKey in listKeys: 150 strPluginName = self.__dictControlledPlugins[strKey] 151 edPlugin = EDFactoryPluginStatic.loadPlugin(strPluginName) 152 edPlugin.setBaseDirectory(self.getWorkingDirectory()) 153 edPlugin.setBaseName(strPluginName) 154 listLoadedPlugins.append(edPlugin) 155 return listLoadedPlugins
156 157
158 - def loadPlugin(self, _strPluginToBeControlledName=None, _strBaseName=None):
159 """ 160 This method loads and returns a reference to the plugin to be controlled. 161 162 The name of the plugin to be controlled can either be passed as an 163 argument, or bet set before calling this method using the 164 "setPluginToBeControlledName". 165 166 The base name of the plugin to be controlled is used as the working 167 directory name of the plugin in question. If no argument is supplied 168 the name of the plugin is used as base name. In the case of creation of 169 several plugins to be launched simultaneously, the base name should be 170 different for each plugin and hence must be provided explicitly. 171 """ 172 EDVerbose.DEBUG("EDPluginControl.loadPlugin") 173 if (_strPluginToBeControlledName is None): 174 strPluginName = self.__strPluginToBeControlledName 175 else: 176 strPluginName = _strPluginToBeControlledName 177 edPlugin = EDFactoryPluginStatic.loadPlugin(strPluginName) 178 if (edPlugin is None): 179 strErrorMessage = "EDPluginControl.loadPlugin : Cannot load plugin %s" % strPluginName 180 EDVerbose.error(strErrorMessage) 181 self.addErrorMessage(strErrorMessage) 182 raise RuntimeError, strErrorMessage 183 else: 184 self.__listOfLoadedPlugins.append(edPlugin) 185 edPlugin.setBaseDirectory(self.getWorkingDirectory()) 186 if (_strBaseName is None): 187 # Check if base name exists. OBS! Not thread safe so please set explicitly 188 # _strBaseName for multi-threaded code 189 strRenamedPlugin = self.compactPluginName(strPluginName) 190 strNewWorkingDirectory = os.path.join(self.getWorkingDirectory(), strRenamedPlugin) 191 if (os.path.exists(strNewWorkingDirectory)): 192 edPlugin.setBaseName(edPlugin.createBaseName()) 193 else: 194 edPlugin.setBaseName(strRenamedPlugin) 195 else: 196 edPlugin.setBaseName(_strBaseName) 197 return edPlugin
198 199
200 - def setControlledPluginName(self, _strControlledPluginName, _strControlledPluginValue):
201 """ 202 Adds a name-value pair to the dictionary to map the general to the specific name of a plugin to be controlled 203 """ 204 self.__dictControlledPlugins[_strControlledPluginName] = _strControlledPluginValue
205 206
207 - def getControlledPluginName(self, _strControlledPluginName):
208 """ 209 Returns the name of the plugin to be controlled. 210 """ 211 strPluginname = None 212 if self.__dictControlledPlugins.has_key(_strControlledPluginName): 213 strPluginname = self.__dictControlledPlugins[_strControlledPluginName] 214 return strPluginname
215 216
217 - def addWarningMessages(self, _listWarningMessages):
218 """ 219 Adds a list of warning messages in the existing list of warning messages 220 """ 221 EDVerbose.DEBUG("EDPluginControl.addWarningMessages") 222 for strWarningMessage in _listWarningMessages: 223 self.addWarningMessage(strWarningMessage)
224 225
226 - def addErrorMessages(self, _listErrorMessages):
227 """ 228 Adds a list of error messages in the existing list of error messages 229 """ 230 EDVerbose.DEBUG("EDPluginControl.addErrorMessages") 231 for strErrorMessage in _listErrorMessages: 232 self.addErrorMessage(strErrorMessage)
233 234
235 - def retrieveFailureMessages(self, _edPlugin, _strMethodCaller):
236 """ 237 Propagates failure messages from a plugin including unexpected errors 238 Should be called in the plugin control method invoked when a plugin exec fails (doActionFailure<>) 239 """ 240 EDVerbose.DEBUG("EDPluginControl.retrieveFailureMessages") 241 self.retrieveWarningMessages(_edPlugin) 242 self.retrieveErrorMessages(_edPlugin, _strMethodCaller, True)
243 244
245 - def retrieveSuccessMessages(self, _edPlugin, _strMethodCaller):
246 """ 247 Propagates success messages from a plugin 248 Error messages are retrieved because a plugin could end successfully with errors (depending on the use case) 249 In this case, there is no check for unexpected errors 250 """ 251 EDVerbose.DEBUG("EDPluginControl.retrieveSuccessMessages") 252 self.retrieveWarningMessages(_edPlugin) 253 self.retrieveErrorMessages(_edPlugin, _strMethodCaller, False)
254 255
256 - def retrieveErrorMessages(self, _edPlugin, _strMethodCaller, _bFailure):
257 """ 258 Propagates error messages from a plugin 259 if _bFailure is true, this method has been called from retrieveFailureMessages 260 in this case, checks for potential unexpected errors coming from the EDPluginExec 261 """ 262 EDVerbose.DEBUG("EDPluginControl.retrieveErrorMessages") 263 listErrorMessages = _edPlugin.getListOfErrorMessages() 264 if (len(listErrorMessages) == 0) and (_bFailure is True): 265 strErrorMessage = "%s : Adding Unexpected error" % _strMethodCaller 266 EDVerbose.DEBUG(strErrorMessage) 267 listErrorMessages.append(strErrorMessage) 268 self.addErrorMessages(listErrorMessages)
269 270
271 - def retrieveWarningMessages(self, _edPlugin):
272 """ 273 Propagates warning messages from a plugin 274 """ 275 EDVerbose.DEBUG("EDPluginControl.retrieveWarningMessages") 276 self.addWarningMessages(_edPlugin.getListOfWarningMessages())
277 278
279 - def appendExecutiveSummary(self, _edPlugin, _strPrefix="", _bAddSeparator=True):
280 """ 281 Appends the executive summary from a plugin. 282 """ 283 EDVerbose.DEBUG("EDPluginControl.appendExecutiveSummary") 284 if (_bAddSeparator): 285 self.addExecutiveSummarySeparator() 286 for strLine in _edPlugin.getListExecutiveSummaryLines(): 287 if strLine == self.getExecutiveSummarySeparator() and _strPrefix != "": 288 strLine = strLine[ :-len(_strPrefix) ] 289 self.addExecutiveSummaryLine(_strPrefix + strLine)
290 291
292 - def addErrorWarningMessagesToExecutiveSummary(self, _strErrorMessage="Error messages:", _strWarningMessage="Warning messages:"):
293 """ 294 Adds error and warning messages (if any) in the executive summary 295 """ 296 if len(self.getListOfErrorMessages()) != 0: 297 self.addExecutiveSummarySeparator() 298 self.addExecutiveSummaryLine(_strErrorMessage) 299 for strErrorMessage in self.getListOfErrorMessages(): 300 self.addExecutiveSummaryLine(strErrorMessage) 301 self.addExecutiveSummarySeparator() 302 if len(self.getListOfWarningMessages()) != 0: 303 self.addExecutiveSummarySeparator() 304 self.addExecutiveSummaryLine(_strWarningMessage) 305 for warningMessage in self.getListOfWarningMessages(): 306 self.addExecutiveSummaryLine(warningMessage) 307 self.addExecutiveSummarySeparator()
308 309
310 - def addPluginToActionCluster(self, _edPlugin):
311 """ 312 This method adds a plugin instance to an action cluster. 313 """ 314 if self.__edActionCluster == None: 315 self.__edActionCluster = EDActionCluster() 316 self.__edActionCluster.addAction(_edPlugin) 317 self.__listOfLoadedPlugins.append(_edPlugin)
318 319
320 - def executeActionCluster(self):
321 """ 322 This method executes the action cluster. The action cluster is executed 323 asynchronoulsy. 324 """ 325 if self.__iClusterSize != None: 326 self.__edActionCluster.setClusterSize(self.__iClusterSize) 327 self.__edActionCluster.execute()
328 329
330 - def synchronizeActionCluster(self):
331 """ 332 This method synchronises the action cluster with the control plugin thread. 333 """ 334 self.__edActionCluster.synchronize()
335 336
337 - def setClusterSize(self, _iClusterSize):
338 """ 339 This method sets the size of the action cluster, i.e. the number of threads 340 that will be executed simultaneously. 341 """ 342 self.__iClusterSize = _iClusterSize
343 344
345 - def executePlugin(self, _edPlugin, _bSynchronous=False):
346 """ 347 This method is used to start executable plugins in pipeline asynchronously. 348 """ 349 if _bSynchronous: 350 self.executePluginSynchronous(_edPlugin) 351 else: 352 _edPlugin.execute()
353 354
355 - def executePluginSynchronous(self, _edPlugin):
356 """ 357 This method is used to start executable plugins in pipeline synchronously. 358 """ 359 _edControlSlotSUCCESS = EDSlot() 360 _edControlSlotFAILURE = EDSlot() 361 362 map(_edControlSlotSUCCESS.connect, _edPlugin.getSlotSUCCESS().getListMethod()) 363 map(_edControlSlotFAILURE.connect, _edPlugin.getSlotFAILURE().getListMethod()) 364 365 _edPlugin.getSlotSUCCESS().emptyListMethod() 366 _edPlugin.getSlotFAILURE().emptyListMethod() 367 368 _edPlugin.executeSynchronous() 369 370 if (not _edPlugin.isFailure()): 371 EDVerbose.DEBUG("EDControlPlugin.executeSynchronous slotSUCCESS") 372 # Check that something doesn't go wrong in the success method! 373 try: 374 _edControlSlotSUCCESS.call(_edPlugin) 375 376 except Exception: 377 EDVerbose.DEBUG("EDControlPlugin.executeSynchronous: ERROR in slotSUCCESS!") 378 EDVerbose.writeErrorTrace() 379 _edPlugin.setFailure() 380 381 if (_edPlugin.isFailure()): 382 EDVerbose.DEBUG("EDControlPlugin.executeSynchronous slotFAILURE") 383 # Check that something doesn't go wrong in the success method! 384 try: 385 _edControlSlotFAILURE.call(_edPlugin) 386 387 except Exception: 388 EDVerbose.DEBUG("EDControlPlugin.executeSynchronous: ERROR in slotFAILURE!") 389 EDVerbose.writeErrorTrace()
390