1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 from __future__ import with_statement
29
30 __authors__ = ["Marie-Francoise Incardona", "Olof Svensson"]
31 __contact__ = "svensson@esrf.fr"
32 __license__ = "LGPLv3+"
33 __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
34
35
36 import os, time, gc
37
38 from EDVerbose import EDVerbose
39 from EDPlugin import EDPlugin
40 from EDSlot import EDSlot
41 from EDConfiguration import EDConfiguration
42 from EDActionCluster import EDActionCluster
43 from EDFactoryPluginStatic import EDFactoryPluginStatic
44
45
47 """
48 An EDPluginControl is a plugin that is responsible for a EDPluginExec or EDPluginControl plugin execution:
49 It is responsible for:
50 - The EDPluginExec or EDPluginControl Workflow
51 - The data propagation between the EDPluginExec
52 - The translation between generic and specific data models via EDHandler classes
53 - The error/warning propagation
54 - The executive summaries propagation
55 - Execution of an "action cluster": a set of plugins can be added to a so called "action cluster"
56 with the method "addPluginToActionCluster". All the plugins in the cluster can then be executed
57 simultaneously with the method "executeActionCluster" and synchronized with the method
58 "synchronizeActionCluster". The number of threads used by the action cluster is by default the
59 number of processors available on the computer, but this value can be changed either by
60 calling the method "setClusterSize" or by using the configuration parameter "clusterSize".
61 """
62
64 """
65 """
66 EDPlugin.__init__(self)
67 self.__strPluginToBeControlledName = None
68 self.__dictControlledPlugins = {}
69 self.__edActionCluster = None
70 self.__iClusterSize = None
71 self.__listOfLoadedPlugins = []
72
73
92
93
95 """
96 Reset all plugins kept in memory
97 """
98 self.__listOfLoadedPlugins = []
99 gc.collect()
100
102 """
103 """
104 return self.__listOfLoadedPlugins
105
106
108 """
109 Remove a plugin from the list of loaded plugins to free some memory
110 @param _plugin: plugin to remove
111 @type _plugin: instance of the class EDPlugin
112 """
113 if _plugin in self.__listOfLoadedPlugins:
114 with self.locked():
115 self.__listOfLoadedPlugins.remove(_plugin)
116 self.DEBUG("EDPluginControl.removeLoadedPlugin: Caught, removed %s unreferenced objects. currently there are %i plugins" % (gc.get_count(), len(self.__listOfLoadedPlugins)))
117 gc.collect()
118 else:
119 self.DEBUG("EDPluginControl.removeLoadedPlugin: Missed. currently there are %i plugins" % len(self.__listOfLoadedPlugins))
120
121
123 EDVerbose.DEBUG("EDPluginControl.synchronizePlugins")
124 bSynchronized = False
125 while not bSynchronized:
126 listPluginOrig = self.__listOfLoadedPlugins[:]
127 for edPlugin in listPluginOrig:
128 if edPlugin.isStarted() and (not edPlugin.isEnded()):
129 edPlugin.synchronize()
130 time.sleep(0.01)
131 with self.locked():
132 bSynchronized = (self.__listOfLoadedPlugins == listPluginOrig)
133
134
136 """
137 This method loads and returns a list of references to the plugins to be controlled.
138
139 The name of the plugin to be controlled is set set before calling this method using the
140 "setControlledPluginName" method.
141
142 The base name of the plugin to be controlled is used as the working
143 directory name of the plugin in question. The name of the plugin is used as
144 base name.
145 """
146 EDVerbose.DEBUG("EDPluginControl.loadPlugins")
147 listKeys = self.__dictControlledPlugins.keys()
148 listLoadedPlugins = []
149 for strKey in listKeys:
150 strPluginName = self.__dictControlledPlugins[strKey]
151 edPlugin = EDFactoryPluginStatic.loadPlugin(strPluginName)
152 edPlugin.setBaseDirectory(self.getWorkingDirectory())
153 edPlugin.setBaseName(strPluginName)
154 listLoadedPlugins.append(edPlugin)
155 return listLoadedPlugins
156
157
158 - def loadPlugin(self, _strPluginToBeControlledName=None, _strBaseName=None):
159 """
160 This method loads and returns a reference to the plugin to be controlled.
161
162 The name of the plugin to be controlled can either be passed as an
163 argument, or bet set before calling this method using the
164 "setPluginToBeControlledName".
165
166 The base name of the plugin to be controlled is used as the working
167 directory name of the plugin in question. If no argument is supplied
168 the name of the plugin is used as base name. In the case of creation of
169 several plugins to be launched simultaneously, the base name should be
170 different for each plugin and hence must be provided explicitly.
171 """
172 EDVerbose.DEBUG("EDPluginControl.loadPlugin")
173 if (_strPluginToBeControlledName is None):
174 strPluginName = self.__strPluginToBeControlledName
175 else:
176 strPluginName = _strPluginToBeControlledName
177 edPlugin = EDFactoryPluginStatic.loadPlugin(strPluginName)
178 if (edPlugin is None):
179 strErrorMessage = "EDPluginControl.loadPlugin : Cannot load plugin %s" % strPluginName
180 EDVerbose.error(strErrorMessage)
181 self.addErrorMessage(strErrorMessage)
182 raise RuntimeError, strErrorMessage
183 else:
184 self.__listOfLoadedPlugins.append(edPlugin)
185 edPlugin.setBaseDirectory(self.getWorkingDirectory())
186 if (_strBaseName is None):
187
188
189 strRenamedPlugin = self.compactPluginName(strPluginName)
190 strNewWorkingDirectory = os.path.join(self.getWorkingDirectory(), strRenamedPlugin)
191 if (os.path.exists(strNewWorkingDirectory)):
192 edPlugin.setBaseName(edPlugin.createBaseName())
193 else:
194 edPlugin.setBaseName(strRenamedPlugin)
195 else:
196 edPlugin.setBaseName(_strBaseName)
197 return edPlugin
198
199
201 """
202 Adds a name-value pair to the dictionary to map the general to the specific name of a plugin to be controlled
203 """
204 self.__dictControlledPlugins[_strControlledPluginName] = _strControlledPluginValue
205
206
208 """
209 Returns the name of the plugin to be controlled.
210 """
211 strPluginname = None
212 if self.__dictControlledPlugins.has_key(_strControlledPluginName):
213 strPluginname = self.__dictControlledPlugins[_strControlledPluginName]
214 return strPluginname
215
216
218 """
219 Adds a list of warning messages in the existing list of warning messages
220 """
221 EDVerbose.DEBUG("EDPluginControl.addWarningMessages")
222 for strWarningMessage in _listWarningMessages:
223 self.addWarningMessage(strWarningMessage)
224
225
227 """
228 Adds a list of error messages in the existing list of error messages
229 """
230 EDVerbose.DEBUG("EDPluginControl.addErrorMessages")
231 for strErrorMessage in _listErrorMessages:
232 self.addErrorMessage(strErrorMessage)
233
234
236 """
237 Propagates failure messages from a plugin including unexpected errors
238 Should be called in the plugin control method invoked when a plugin exec fails (doActionFailure<>)
239 """
240 EDVerbose.DEBUG("EDPluginControl.retrieveFailureMessages")
241 self.retrieveWarningMessages(_edPlugin)
242 self.retrieveErrorMessages(_edPlugin, _strMethodCaller, True)
243
244
246 """
247 Propagates success messages from a plugin
248 Error messages are retrieved because a plugin could end successfully with errors (depending on the use case)
249 In this case, there is no check for unexpected errors
250 """
251 EDVerbose.DEBUG("EDPluginControl.retrieveSuccessMessages")
252 self.retrieveWarningMessages(_edPlugin)
253 self.retrieveErrorMessages(_edPlugin, _strMethodCaller, False)
254
255
257 """
258 Propagates error messages from a plugin
259 if _bFailure is true, this method has been called from retrieveFailureMessages
260 in this case, checks for potential unexpected errors coming from the EDPluginExec
261 """
262 EDVerbose.DEBUG("EDPluginControl.retrieveErrorMessages")
263 listErrorMessages = _edPlugin.getListOfErrorMessages()
264 if (len(listErrorMessages) == 0) and (_bFailure is True):
265 strErrorMessage = "%s : Adding Unexpected error" % _strMethodCaller
266 EDVerbose.DEBUG(strErrorMessage)
267 listErrorMessages.append(strErrorMessage)
268 self.addErrorMessages(listErrorMessages)
269
270
277
278
290
291
308
309
311 """
312 This method adds a plugin instance to an action cluster.
313 """
314 if self.__edActionCluster == None:
315 self.__edActionCluster = EDActionCluster()
316 self.__edActionCluster.addAction(_edPlugin)
317 self.__listOfLoadedPlugins.append(_edPlugin)
318
319
321 """
322 This method executes the action cluster. The action cluster is executed
323 asynchronoulsy.
324 """
325 if self.__iClusterSize != None:
326 self.__edActionCluster.setClusterSize(self.__iClusterSize)
327 self.__edActionCluster.execute()
328
329
331 """
332 This method synchronises the action cluster with the control plugin thread.
333 """
334 self.__edActionCluster.synchronize()
335
336
338 """
339 This method sets the size of the action cluster, i.e. the number of threads
340 that will be executed simultaneously.
341 """
342 self.__iClusterSize = _iClusterSize
343
344
346 """
347 This method is used to start executable plugins in pipeline asynchronously.
348 """
349 if _bSynchronous:
350 self.executePluginSynchronous(_edPlugin)
351 else:
352 _edPlugin.execute()
353
354
390