Package duplicity :: Module asyncscheduler
[hide private]
[frames] | no frames]

Source Code for Module duplicity.asyncscheduler

  1  # -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- 
  2  # 
  3  # Copyright 2002 Ben Escoto <ben@emerose.org> 
  4  # Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 
  5  # Copyright 2008 Peter Schuller <peter.schuller@infidyne.com> 
  6  # 
  7  # This file is part of duplicity. 
  8  # 
  9  # Duplicity is free software; you can redistribute it and/or modify it 
 10  # under the terms of the GNU General Public License as published by the 
 11  # Free Software Foundation; either version 2 of the License, or (at your 
 12  # option) any later version. 
 13  # 
 14  # Duplicity is distributed in the hope that it will be useful, but 
 15  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 16  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 17  # General Public License for more details. 
 18  # 
 19  # You should have received a copy of the GNU General Public License 
 20  # along with duplicity; if not, write to the Free Software Foundation, 
 21  # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 22   
 23  """ 
 24  Asynchronous job scheduler, for concurrent execution with minimalistic 
 25  dependency guarantees. 
 26  """ 
 27   
 28  import duplicity 
 29  from duplicity import log 
 30  from duplicity.dup_threading import require_threading 
 31  from duplicity.dup_threading import interruptably_wait 
 32  from duplicity.dup_threading import async_split 
 33  from duplicity.dup_threading import with_lock 
 34   
 35  thread    = duplicity.dup_threading.thread_module() 
 36  threading = duplicity.dup_threading.threading_module() 
 37   
38 -class AsyncScheduler:
39 """ 40 Easy-to-use scheduler of function calls to be executed 41 concurrently. A very simple dependency mechanism exists in the 42 form of barriers (see insert_barrier()). 43 44 Each instance has a concurrency level associated with it. A 45 concurrency of 0 implies that all tasks will be executed 46 synchronously when scheduled. A concurrency of 1 indicates that a 47 task will be executed asynchronously, but never concurrently with 48 other tasks. Both 0 and 1 guarantee strict ordering among all 49 tasks (i.e., they will be executed in the order scheduled). 50 51 At concurrency levels above 1, the tasks will end up being 52 executed in an order undetermined except insofar as is enforced by 53 calls to insert_barrier(). 54 55 An AsynchScheduler should be created for any independent process; 56 the scheduler will assume that if any background job fails (raises 57 an exception), it makes further work moot. 58 """ 59
60 - def __init__(self, concurrency):
61 """ 62 Create an asynchronous scheduler that executes jobs with the 63 given level of concurrency. 64 """ 65 log.Info("%s: %s" % (self.__class__.__name__, 66 _("instantiating at concurrency %d") % 67 (concurrency))) 68 assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,) 69 70 self.__failed = False # has at least one task failed so far? 71 self.__failed_waiter = None # when __failed, the waiter of the first task that failed 72 self.__concurrency = concurrency 73 self.__worker_count = 0 # number of active workers 74 self.__waiter_count = 0 # number of threads waiting to submit work 75 self.__barrier = False # barrier currently in effect? 76 self.__cv = threading.Condition() # for simplicity, we use a single cv with its lock 77 # # for everything, even if the resulting notifyAll():s 78 # # are not technically efficient. 79 80 if concurrency > 0: 81 require_threading("concurrency > 0 (%d)" % (concurrency,))
82
83 - def insert_barrier(self):
84 """ 85 Proclaim that any tasks scheduled prior to the call to this 86 method MUST be executed prior to any tasks scheduled after the 87 call to this method. 88 89 The intended use case is that if task B depends on A, a 90 barrier must be inserted in between to guarantee that A 91 happens before B. 92 """ 93 log.Debug("%s: %s" % (self.__class__.__name__, _("inserting barrier"))) 94 # With concurrency 0 it's a NOOP, and due to the special case in 95 # task scheduling we do not want to append to the queue (will never 96 # be popped). 97 if self.__concurrency > 0: 98 def _insert_barrier(): 99 self.__barrier = True
100 101 with_lock(self.__cv, _insert_barrier)
102
103 - def schedule_task(self, fn, params):
104 """ 105 Schedule the given task (callable, typically function) for 106 execution. Pass the given parameters to the function when 107 calling it. Returns a callable which can optionally be used 108 to wait for the task to complete, either by returning its 109 return value or by propagating any exception raised by said 110 task. 111 112 This method may block or return immediately, depending on the 113 configuration and state of the scheduler. 114 115 This method may also raise an exception in order to trigger 116 failures early, if the task (if run synchronously) or a previous 117 task has already failed. 118 119 NOTE: Pay particular attention to the scope in which this is 120 called. In particular, since it will execute concurrently in 121 the background, assuming fn is a closure, any variables used 122 most be properly bound in the closure. This is the reason for 123 the convenience feature of being able to give parameters to 124 the call, to avoid having to wrap the call itself in a 125 function in order to "fixate" variables in, for example, an 126 enclosing loop. 127 """ 128 assert fn is not None 129 130 # Note: It is on purpose that we keep track of concurrency in 131 # the front end and launch threads for each task, rather than 132 # keep a pool of workers. The overhead is not relevant in the 133 # situation this will be used, and it removes complexity in 134 # terms of ensuring the scheduler is garbage collected/shut 135 # down properly when no longer referenced/needed by calling 136 # code. 137 138 if self.__concurrency == 0: 139 # special case this to not require any platform support for 140 # threading at all 141 log.Info("%s: %s" % (self.__class__.__name__, 142 _("running task synchronously (asynchronicity disabled)")), 143 log.InfoCode.synchronous_upload_begin) 144 145 return self.__run_synchronously(fn, params) 146 else: 147 log.Info("%s: %s" % (self.__class__.__name__, 148 _("scheduling task for asynchronous execution")), 149 log.InfoCode.asynchronous_upload_begin) 150 151 return self.__run_asynchronously(fn, params)
152
153 - def wait(self):
154 """ 155 Wait for the scheduler to become entirely empty (i.e., all 156 tasks having run to completion). 157 158 IMPORTANT: This is only useful with a single caller scheduling 159 tasks, such that no call to schedule_task() is currently in 160 progress or may happen subsequently to the call to wait(). 161 """ 162 def _wait(): 163 interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
164 165 with_lock(self.__cv, _wait) 166
167 - def __run_synchronously(self, fn, params):
168 169 # When running synchronously, we immediately leak any exception raised 170 # for immediate failure reporting to calling code. 171 ret = fn(*params) 172 173 def _waiter(): 174 return ret
175 176 log.Info("%s: %s" % (self.__class__.__name__, 177 _("task completed successfully")), 178 log.InfoCode.synchronous_upload_done) 179 180 return _waiter 181
182 - def __run_asynchronously(self, fn, params):
183 (waiter, caller) = async_split(lambda: fn(*params)) 184 185 def check_pending_failure(): 186 if self.__failed: 187 log.Info("%s: %s" % (self.__class__.__name__, 188 _("a previously scheduled task has failed; " 189 "propagating the result immediately")), 190 log.InfoCode.asynchronous_upload_done) 191 self.__failed_waiter() 192 raise AssertionError("%s: waiter should have raised an exception; " 193 "this is a bug" % (self.__class__.__name__,))
194 195 def wait_for_and_register_launch(): 196 check_pending_failure() # raise on fail 197 while self.__worker_count >= self.__concurrency or self.__barrier: 198 if self.__worker_count == 0: 199 assert self.__barrier, "barrier should be in effect" 200 self.__barrier = False 201 self.__cv.notifyAll() 202 else: 203 self.__waiter_count += 1 204 self.__cv.wait() 205 self.__waiter_count -= 1 206 207 check_pending_failure() # raise on fail 208 209 self.__worker_count += 1 210 log.Debug("%s: %s" % (self.__class__.__name__, 211 _("active workers = %d") % (self.__worker_count,))) 212 213 # simply wait for an OK condition to start, then launch our worker. the worker 214 # never waits on us, we just wait for them. 215 with_lock(self.__cv, wait_for_and_register_launch) 216 217 self.__start_worker(caller) 218 219 return waiter 220
221 - def __start_worker(self, caller):
222 """ 223 Start a new worker. 224 """ 225 def trampoline(): 226 try: 227 self.__execute_caller(caller) 228 finally: 229 def complete_worker(): 230 self.__worker_count -= 1 231 log.Debug("%s: %s" % (self.__class__.__name__, 232 _("active workers = %d") % (self.__worker_count,))) 233 self.__cv.notifyAll()
234 with_lock(self.__cv, complete_worker) 235 236 thread.start_new_thread(trampoline, ()) 237
238 - def __execute_caller(self, caller):
239 # The caller half that we get here will not propagate 240 # errors back to us, but rather propagate it back to the 241 # "other half" of the async split. 242 succeeded, waiter = caller() 243 if not succeeded: 244 def _signal_failed(): 245 if not self.__failed: 246 self.__failed = True 247 self.__failed_waiter = waiter 248 self.__cv.notifyAll()
249 with_lock(self.__cv, _signal_failed) 250 251 log.Info("%s: %s" % (self.__class__.__name__, 252 _("task execution done (success: %s)") % succeeded), 253 log.InfoCode.asynchronous_upload_done) 254