1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 __authors__ = [ "Olof Svensson" ]
29 __contact__ = "svensson@esrf.fr"
30 __license__ = "LGPLv3+"
31 __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France"
32
33 """
34 This class is used for creating a "cluster" of plugins that are launched
35 in parallel and the synchronized.
36 """
37
38
39 import threading
40
41 from EDAction import EDAction
42
43 from EDUtilsParallel import EDUtilsParallel
44
46 """
47 This class is used for creating a "cluster" of actions (e.g. plugins) that are launched
48 in parallel and then synchronized.
49 """
50
51
53 """
54 Initalises the action cluster. The max number of threads to be used can be forced,
55 if omitted the number of processors number of threads
56 @param _iNoThreads: max number of threads to be used
57 @type _iNoThreads: integer
58 """
59 EDAction.__init__(self)
60 self.__iNoThreads = _iNoThreads
61 self.__semaphoreActionCluster = None
62 self.__lActions = []
63
64
66 """
67 Executes the action cluster. This method will return only when
68 all actions have finished.
69 """
70 self.DEBUG("EDActionCluster.process")
71 if (not self.__iNoThreads):
72 self.__iNoThreads = EDUtilsParallel.detectNumberOfCPUs()
73 self.__semaphoreActionCluster = threading.Semaphore(self.__iNoThreads)
74 for edAction in self.__lActions:
75 edAction.connectSUCCESS(self.__semaphoreRelease)
76 edAction.connectFAILURE(self.__setActionClusterFailure)
77 edAction.connectFAILURE(self.__semaphoreRelease)
78 self.DEBUG("EDActionCluster.process : Starting action %s" % edAction.getClassName())
79 self.__semaphoreActionCluster.acquire()
80 edAction.execute()
81
82 for edAction in self.__lActions:
83 edAction.join()
84
85
86
88 """
89 This private method is called if the edAction executed as part of the action
90 cluster is a success. It will release one thread from the semaphore of the action
91 so that a new thread can be started.
92 """
93 self.DEBUG("EDActionCluster.__semaphoreRelease : action %s ended." % _edAction.getClassName())
94 self.__semaphoreActionCluster.release()
95
96
98 """
99 This is a private method that sets the EDActioCluster instance to failure if one
100 of the actions ends in failure.
101 """
102 self.DEBUG("EDActionCluster.__setActionClusterFailure called from action %s" % _edAction.getClassName())
103 self.setFailure()
104
105
107 """
108 This method adds an action (e.g. plugin) to the list of actions to be executed in parallel.
109 @param _edAction: an actiond, e.g. a plugin
110 @type _edAction: EDAction
111 """
112 self.__lActions.append(_edAction)
113
114
116 """
117 This method forces the number of threads to be used of the action cluster.
118 @param _iNoThreads: max number of threads to be used
119 @type _iNoThreads: integer
120 """
121 self.__iNoThreads = _iNoThreads
122