Module EDActionCluster
[hide private]
[frames] | no frames]

Source Code for Module EDActionCluster

  1  # 
  2  #    Project: The EDNA Kernel 
  3  #             http://www.edna-site.org 
  4  # 
  5  #    File: "$Id$" 
  6  # 
  7  #    Copyright (C) 2008-2010 European Synchrotron Radiation Facility 
  8  #                            Grenoble, France 
  9  # 
 10  #    Principal authors: Olof Svensson (svensson@esrf.fr)  
 11  # 
 12  #    This program is free software: you can redistribute it and/or modify 
 13  #    it under the terms of the GNU Lesser General Public License as published 
 14  #    by the Free Software Foundation, either version 3 of the License, or 
 15  #    (at your option) any later version. 
 16  # 
 17  #    This program is distributed in the hope that it will be useful, 
 18  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 19  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 20  #    GNU Lesser General Public License for more details. 
 21  # 
 22  #    You should have received a copy of the GNU General Public License 
 23  #    and the GNU Lesser General Public License  along with this program.   
 24  #    If not, see <http://www.gnu.org/licenses/>. 
 25  # 
 26   
 27   
 28  __authors__ = [ "Olof Svensson" ] 
 29  __contact__ = "svensson@esrf.fr" 
 30  __license__ = "LGPLv3+" 
 31  __copyright__ = "European Synchrotron Radiation Facility, Grenoble, France" 
 32   
 33  """ 
 34  This class is used for creating a "cluster" of plugins that are launched 
 35  in parallel and the synchronized. 
 36  """ 
 37   
 38   
 39  import threading 
 40   
 41  from EDAction import EDAction 
 42   
 43  from EDUtilsParallel import EDUtilsParallel 
 44   
45 -class EDActionCluster(EDAction):
46 """ 47 This class is used for creating a "cluster" of actions (e.g. plugins) that are launched 48 in parallel and then synchronized. 49 """ 50 51
52 - def __init__(self, _iNoThreads=None):
53 """ 54 Initalises the action cluster. The max number of threads to be used can be forced, 55 if omitted the number of processors number of threads 56 @param _iNoThreads: max number of threads to be used 57 @type _iNoThreads: integer 58 """ 59 EDAction.__init__(self) 60 self.__iNoThreads = _iNoThreads 61 self.__semaphoreActionCluster = None 62 self.__lActions = []
63 64
65 - def process(self, _edPlugin=None):
66 """ 67 Executes the action cluster. This method will return only when 68 all actions have finished. 69 """ 70 self.DEBUG("EDActionCluster.process") 71 if (not self.__iNoThreads): 72 self.__iNoThreads = EDUtilsParallel.detectNumberOfCPUs() 73 self.__semaphoreActionCluster = threading.Semaphore(self.__iNoThreads) 74 for edAction in self.__lActions: 75 edAction.connectSUCCESS(self.__semaphoreRelease) 76 edAction.connectFAILURE(self.__setActionClusterFailure) 77 edAction.connectFAILURE(self.__semaphoreRelease) 78 self.DEBUG("EDActionCluster.process : Starting action %s" % edAction.getClassName()) 79 self.__semaphoreActionCluster.acquire() 80 edAction.execute() 81 # Wait for all threads to finish 82 for edAction in self.__lActions: 83 edAction.join()
84 85 86
87 - def __semaphoreRelease(self, _edAction=None):
88 """ 89 This private method is called if the edAction executed as part of the action 90 cluster is a success. It will release one thread from the semaphore of the action 91 so that a new thread can be started. 92 """ 93 self.DEBUG("EDActionCluster.__semaphoreRelease : action %s ended." % _edAction.getClassName()) 94 self.__semaphoreActionCluster.release()
95 96
97 - def __setActionClusterFailure(self, _edAction=None):
98 """ 99 This is a private method that sets the EDActioCluster instance to failure if one 100 of the actions ends in failure. 101 """ 102 self.DEBUG("EDActionCluster.__setActionClusterFailure called from action %s" % _edAction.getClassName()) 103 self.setFailure()
104 105
106 - def addAction(self, _edAction):
107 """ 108 This method adds an action (e.g. plugin) to the list of actions to be executed in parallel. 109 @param _edAction: an actiond, e.g. a plugin 110 @type _edAction: EDAction 111 """ 112 self.__lActions.append(_edAction)
113 114
115 - def setClusterSize(self, _iNoThreads):
116 """ 117 This method forces the number of threads to be used of the action cluster. 118 @param _iNoThreads: max number of threads to be used 119 @type _iNoThreads: integer 120 """ 121 self.__iNoThreads = _iNoThreads
122