1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 """
24 Asynchronous job scheduler, for concurrent execution with minimalistic
25 dependency guarantees.
26 """
27
28 import duplicity
29 from duplicity import log
30 from duplicity.dup_threading import require_threading
31 from duplicity.dup_threading import interruptably_wait
32 from duplicity.dup_threading import async_split
33 from duplicity.dup_threading import with_lock
34
35 thread = duplicity.dup_threading.thread_module()
36 threading = duplicity.dup_threading.threading_module()
37
39 """
40 Easy-to-use scheduler of function calls to be executed
41 concurrently. A very simple dependency mechanism exists in the
42 form of barriers (see insert_barrier()).
43
44 Each instance has a concurrency level associated with it. A
45 concurrency of 0 implies that all tasks will be executed
46 synchronously when scheduled. A concurrency of 1 indicates that a
47 task will be executed asynchronously, but never concurrently with
48 other tasks. Both 0 and 1 guarantee strict ordering among all
49 tasks (i.e., they will be executed in the order scheduled).
50
51 At concurrency levels above 1, the tasks will end up being
52 executed in an order undetermined except insofar as is enforced by
53 calls to insert_barrier().
54
55 An AsynchScheduler should be created for any independent process;
56 the scheduler will assume that if any background job fails (raises
57 an exception), it makes further work moot.
58 """
59
61 """
62 Create an asynchronous scheduler that executes jobs with the
63 given level of concurrency.
64 """
65 log.Info("%s: %s" % (self.__class__.__name__,
66 _("instantiating at concurrency %d") %
67 (concurrency)))
68 assert concurrency >= 0, "%s concurrency level must be >= 0" % (self.__class__.__name__,)
69
70 self.__failed = False
71 self.__failed_waiter = None
72 self.__concurrency = concurrency
73 self.__worker_count = 0
74 self.__waiter_count = 0
75 self.__barrier = False
76 self.__cv = threading.Condition()
77
78
79
80 if concurrency > 0:
81 require_threading("concurrency > 0 (%d)" % (concurrency,))
82
84 """
85 Proclaim that any tasks scheduled prior to the call to this
86 method MUST be executed prior to any tasks scheduled after the
87 call to this method.
88
89 The intended use case is that if task B depends on A, a
90 barrier must be inserted in between to guarantee that A
91 happens before B.
92 """
93 log.Debug("%s: %s" % (self.__class__.__name__, _("inserting barrier")))
94
95
96
97 if self.__concurrency > 0:
98 def _insert_barrier():
99 self.__barrier = True
100
101 with_lock(self.__cv, _insert_barrier)
102
104 """
105 Schedule the given task (callable, typically function) for
106 execution. Pass the given parameters to the function when
107 calling it. Returns a callable which can optionally be used
108 to wait for the task to complete, either by returning its
109 return value or by propagating any exception raised by said
110 task.
111
112 This method may block or return immediately, depending on the
113 configuration and state of the scheduler.
114
115 This method may also raise an exception in order to trigger
116 failures early, if the task (if run synchronously) or a previous
117 task has already failed.
118
119 NOTE: Pay particular attention to the scope in which this is
120 called. In particular, since it will execute concurrently in
121 the background, assuming fn is a closure, any variables used
122 most be properly bound in the closure. This is the reason for
123 the convenience feature of being able to give parameters to
124 the call, to avoid having to wrap the call itself in a
125 function in order to "fixate" variables in, for example, an
126 enclosing loop.
127 """
128 assert fn is not None
129
130
131
132
133
134
135
136
137
138 if self.__concurrency == 0:
139
140
141 log.Info("%s: %s" % (self.__class__.__name__,
142 _("running task synchronously (asynchronicity disabled)")),
143 log.InfoCode.synchronous_upload_begin)
144
145 return self.__run_synchronously(fn, params)
146 else:
147 log.Info("%s: %s" % (self.__class__.__name__,
148 _("scheduling task for asynchronous execution")),
149 log.InfoCode.asynchronous_upload_begin)
150
151 return self.__run_asynchronously(fn, params)
152
154 """
155 Wait for the scheduler to become entirely empty (i.e., all
156 tasks having run to completion).
157
158 IMPORTANT: This is only useful with a single caller scheduling
159 tasks, such that no call to schedule_task() is currently in
160 progress or may happen subsequently to the call to wait().
161 """
162 def _wait():
163 interruptably_wait(self.__cv, lambda: self.__worker_count == 0 and self.__waiter_count == 0)
164
165 with_lock(self.__cv, _wait)
166
168
169
170
171 ret = fn(*params)
172
173 def _waiter():
174 return ret
175
176 log.Info("%s: %s" % (self.__class__.__name__,
177 _("task completed successfully")),
178 log.InfoCode.synchronous_upload_done)
179
180 return _waiter
181
183 (waiter, caller) = async_split(lambda: fn(*params))
184
185 def check_pending_failure():
186 if self.__failed:
187 log.Info("%s: %s" % (self.__class__.__name__,
188 _("a previously scheduled task has failed; "
189 "propagating the result immediately")),
190 log.InfoCode.asynchronous_upload_done)
191 self.__failed_waiter()
192 raise AssertionError("%s: waiter should have raised an exception; "
193 "this is a bug" % (self.__class__.__name__,))
194
195 def wait_for_and_register_launch():
196 check_pending_failure()
197 while self.__worker_count >= self.__concurrency or self.__barrier:
198 if self.__worker_count == 0:
199 assert self.__barrier, "barrier should be in effect"
200 self.__barrier = False
201 self.__cv.notifyAll()
202 else:
203 self.__waiter_count += 1
204 self.__cv.wait()
205 self.__waiter_count -= 1
206
207 check_pending_failure()
208
209 self.__worker_count += 1
210 log.Debug("%s: %s" % (self.__class__.__name__,
211 _("active workers = %d") % (self.__worker_count,)))
212
213
214
215 with_lock(self.__cv, wait_for_and_register_launch)
216
217 self.__start_worker(caller)
218
219 return waiter
220
222 """
223 Start a new worker.
224 """
225 def trampoline():
226 try:
227 self.__execute_caller(caller)
228 finally:
229 def complete_worker():
230 self.__worker_count -= 1
231 log.Debug("%s: %s" % (self.__class__.__name__,
232 _("active workers = %d") % (self.__worker_count,)))
233 self.__cv.notifyAll()
234 with_lock(self.__cv, complete_worker)
235
236 thread.start_new_thread(trampoline, ())
237
239
240
241
242 succeeded, waiter = caller()
243 if not succeeded:
244 def _signal_failed():
245 if not self.__failed:
246 self.__failed = True
247 self.__failed_waiter = waiter
248 self.__cv.notifyAll()
249 with_lock(self.__cv, _signal_failed)
250
251 log.Info("%s: %s" % (self.__class__.__name__,
252 _("task execution done (success: %s)") % succeeded),
253 log.InfoCode.asynchronous_upload_done)
254