1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """Define some lazy data structures and functions acting on them"""
23
24 import os
25
26 from duplicity.static import *
27
28
30 """Hold static methods for the manipulation of lazy iterators"""
31
32 - def filter(predicate, iterator):
33 """Like filter in a lazy functional programming language"""
34 for i in iterator:
35 if predicate(i):
36 yield i
37
38 - def map(function, iterator):
39 """Like map in a lazy functional programming language"""
40 for i in iterator:
41 yield function(i)
42
44 """Run function on each element in iterator"""
45 for i in iterator:
46 function(i)
47
49 """Lazily concatenate iterators"""
50 for iter in iters:
51 for i in iter:
52 yield i
53
54 - def cat2(iter_of_iters):
55 """Lazily concatenate iterators, iterated by big iterator"""
56 for iter in iter_of_iters:
57 for i in iter:
58 yield i
59
61 """True if iterator has length 0"""
62 for i in iter:
63 return None
64 return 1
65
66 - def equal(iter1, iter2, verbose = None, operator = lambda x, y: x == y):
67 """True if iterator 1 has same elements as iterator 2
68
69 Use equality operator, or == if it is unspecified.
70
71 """
72 for i1 in iter1:
73 try:
74 i2 = iter2.next()
75 except StopIteration:
76 if verbose:
77 print "End when i1 = %s" % (i1,)
78 return None
79 if not operator(i1, i2):
80 if verbose:
81 print "%s not equal to %s" % (i1, i2)
82 return None
83 try:
84 i2 = iter2.next()
85 except StopIteration:
86 return 1
87 if verbose:
88 print "End when i2 = %s" % (i2,)
89 return None
90
92 """True if any element in iterator is true. Short circuiting"""
93 i = None
94 for i in iter:
95 if i:
96 return i
97 return i
98
100 """True if all elements in iterator are true. Short circuiting"""
101 i = 1
102 for i in iter:
103 if not i:
104 return i
105 return i
106
108 """Return length of iterator"""
109 i = 0
110 while 1:
111 try:
112 iter.next()
113 except StopIteration:
114 return i
115 i = i+1
116
117 - def foldr(f, default, iter):
118 """foldr the "fundamental list recursion operator"?"""
119 try:
120 next = iter.next()
121 except StopIteration:
122 return default
123 return f(next, Iter.foldr(f, default, iter))
124
125 - def foldl(f, default, iter):
126 """the fundamental list iteration operator.."""
127 while 1:
128 try:
129 next = iter.next()
130 except StopIteration:
131 return default
132 default = f(default, next)
133
134 - def multiplex(iter, num_of_forks, final_func = None, closing_func = None):
135 """Split a single iterater into a number of streams
136
137 The return val will be a list with length num_of_forks, each
138 of which will be an iterator like iter. final_func is the
139 function that will be called on each element in iter just as
140 it is being removed from the buffer. closing_func is called
141 when all the streams are finished.
142
143 """
144 if num_of_forks == 2 and not final_func and not closing_func:
145 im2 = IterMultiplex2(iter)
146 return (im2.yielda(), im2.yieldb())
147 if not final_func:
148 final_func = lambda i: None
149 if not closing_func:
150 closing_func = lambda: None
151
152
153
154 buffer = []
155
156
157
158 starting_forkposition = [-1] * num_of_forks
159 forkposition = starting_forkposition[:]
160 called_closing_func = [None]
161
162 def get_next(fork_num):
163 """Return the next element requested by fork_num"""
164 if forkposition[fork_num] == -1:
165 try:
166 buffer.insert(0, iter.next())
167 except StopIteration:
168
169 if (forkposition == starting_forkposition and
170 not called_closing_func[0]):
171 closing_func()
172 called_closing_func[0] = None
173 raise StopIteration
174 for i in range(num_of_forks):
175 forkposition[i] += 1
176
177 return_val = buffer[forkposition[fork_num]]
178 forkposition[fork_num] -= 1
179
180 blen = len(buffer)
181 if not (blen-1) in forkposition:
182
183 assert forkposition[fork_num] == blen-2
184 final_func(buffer[blen-1])
185 del buffer[blen-1]
186 return return_val
187
188 def make_iterator(fork_num):
189 while(1):
190 yield get_next(fork_num)
191
192 return tuple(map(make_iterator, range(num_of_forks)))
193
194 MakeStatic(Iter)
195
196
198 """Multiplex an iterator into 2 parts
199
200 This is a special optimized case of the Iter.multiplex function,
201 used when there is no closing_func or final_func, and we only want
202 to split it into 2. By profiling, this is a time sensitive class.
203
204 """
206 self.a_leading_by = 0
207 self.buffer = []
208 self.iter = iter
209
211 """Return first iterator"""
212 buf, iter = self.buffer, self.iter
213 while(1):
214 if self.a_leading_by >= 0:
215
216 elem = iter.next()
217 buf.append(elem)
218 else:
219
220 elem = buf.pop(0)
221 self.a_leading_by += 1
222 yield elem
223
225 """Return second iterator"""
226 buf, iter = self.buffer, self.iter
227 while(1):
228 if self.a_leading_by <= 0:
229
230 elem = iter.next()
231 buf.append(elem)
232 else:
233
234 elem = buf.pop(0)
235 self.a_leading_by -= 1
236 yield elem
237
238
240 """Tree style reducer object for iterator - stolen from rdiff-backup
241
242 The indicies of a RORPIter form a tree type structure. This class
243 can be used on each element of an iter in sequence and the result
244 will be as if the corresponding tree was reduced. This tries to
245 bridge the gap between the tree nature of directories, and the
246 iterator nature of the connection between hosts and the temporal
247 order in which the files are processed.
248
249 This will usually be used by subclassing ITRBranch below and then
250 call the initializer below with the new class.
251
252 """
253 - def __init__(self, branch_class, branch_args):
254 """ITR initializer"""
255 self.branch_class = branch_class
256 self.branch_args = branch_args
257 self.index = None
258 self.root_branch = branch_class(*branch_args)
259 self.branches = [self.root_branch]
260
262 """Run Finish() on all branches index has passed
263
264 When we pass out of a branch, delete it and process it with
265 the parent. The innermost branches will be the last in the
266 list. Return None if we are out of the entire tree, and 1
267 otherwise.
268
269 """
270 branches = self.branches
271 while 1:
272 to_be_finished = branches[-1]
273 base_index = to_be_finished.base_index
274 if base_index != index[:len(base_index)]:
275
276 to_be_finished.call_end_proc()
277 del branches[-1]
278 if not branches:
279 return None
280 branches[-1].branch_process(to_be_finished)
281 else:
282 return 1
283
285 """Return branch of type self.branch_class, add to branch list"""
286 branch = self.branch_class(*self.branch_args)
287 self.branches.append(branch)
288 return branch
289
297
299 """Call at end of sequence to tie everything up"""
300 while 1:
301 to_be_finished = self.branches.pop()
302 to_be_finished.call_end_proc()
303 if not self.branches:
304 break
305 self.branches[-1].branch_process(to_be_finished)
306
308 """Process args, where args[0] is current position in iterator
309
310 Returns true if args successfully processed, false if index is
311 not in the current tree and thus the final result is
312 available.
313
314 Also note below we set self.index after doing the necessary
315 start processing, in case there is a crash in the middle.
316
317 """
318 index = args[0]
319 if self.index is None:
320 self.process_w_branch(index, self.root_branch, args)
321 self.index = index
322 return 1
323
324 if index <= self.index:
325 log.Warn(_("Warning: oldindex %s >= newindex %s") %
326 (self.index, index))
327 return 1
328
329 if self.finish_branches(index) is None:
330 return None
331 last_branch = self.branches[-1]
332 if last_branch.start_successful:
333 if last_branch.can_fast_process(*args):
334 robust.check_common_error(last_branch.on_error,
335 last_branch.fast_process, args)
336 else:
337 branch = self.add_branch()
338 self.process_w_branch(index, branch, args)
339 else:
340 last_branch.log_prev_error(index)
341
342 self.index = index
343 return 1
344
345
347 """Helper class for IterTreeReducer above
348
349 There are five stub functions below: start_process, end_process,
350 branch_process, fast_process, and can_fast_process. A class that
351 subclasses this one will probably fill in these functions to do
352 more.
353
354 """
355 base_index = index = None
356 finished = None
357 caught_exception = start_successful = None
358
369
371 """Do some initial processing (stub)"""
372 pass
373
375 """Do any final processing before leaving branch (stub)"""
376 pass
377
379 """Process a branch right after it is finished (stub)"""
380 assert branch.finished
381 pass
382
384 """True if object can be processed without new branch (stub)"""
385 return None
386
388 """Process args without new child branch (stub)"""
389 pass
390
392 """This is run on any exception in start/end-process"""
393 self.caught_exception = 1
394 if args and args[0] and isinstance(args[0], tuple):
395 filename = os.path.join(*args[0])
396 elif self.index:
397 filename = os.path.join(*self.index)
398 else:
399 filename = "."
400 log.Warn(_("Error '%s' processing %s") % (exc, filename))
401
403 """Call function if no pending exception"""
404 if not index:
405 index_str = "."
406 else:
407 index_str = os.path.join(*index)
408 log.Warn(_("Skipping %s because of previous error") % index_str)
409
410
411 from duplicity import log
412 from duplicity import robust
413