Package duplicity :: Module diffdir
[hide private]
[frames] | no frames]

Source Code for Module duplicity.diffdir

  1  # -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- 
  2  # 
  3  # Copyright 2002 Ben Escoto <ben@emerose.org> 
  4  # Copyright 2007 Kenneth Loafman <kenneth@loafman.com> 
  5  # 
  6  # This file is part of duplicity. 
  7  # 
  8  # Duplicity is free software; you can redistribute it and/or modify it 
  9  # under the terms of the GNU General Public License as published by the 
 10  # Free Software Foundation; either version 2 of the License, or (at your 
 11  # option) any later version. 
 12  # 
 13  # Duplicity is distributed in the hope that it will be useful, but 
 14  # WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU 
 16  # General Public License for more details. 
 17  # 
 18  # You should have received a copy of the GNU General Public License 
 19  # along with duplicity; if not, write to the Free Software Foundation, 
 20  # Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 
 21   
 22  """ 
 23  Functions for producing signatures and deltas of directories 
 24   
 25  Note that the main processes of this module have two parts.  In the 
 26  first, the signature or delta is constructed of a ROPath iterator.  In 
 27  the second, the ROPath iterator is put into tar block form. 
 28  """ 
 29   
 30  import cStringIO, types 
 31  from duplicity import statistics 
 32  from duplicity.path import * #@UnusedWildImport 
 33  from duplicity.lazy import * #@UnusedWildImport 
 34   
 35  # A StatsObj will be written to this from DirDelta_WriteSig only. 
 36  stats = None 
 37   
 38   
39 -class DiffDirException(Exception):
40 pass
41 42
43 -def DirSig(path_iter):
44 """ 45 Alias for SigTarBlockIter below 46 """ 47 return SigTarBlockIter(path_iter)
48 49
50 -def DirFull(path_iter):
51 """ 52 Return a tarblock full backup of items in path_iter 53 54 A full backup is just a diff starting from nothing (it may be less 55 elegant than using a standard tar file, but we can be sure that it 56 will be easy to split up the tar and make the volumes the same 57 sizes). 58 """ 59 return DirDelta(path_iter, cStringIO.StringIO(""))
60 61
62 -def DirFull_WriteSig(path_iter, sig_outfp):
63 """ 64 Return full backup like above, but also write signature to sig_outfp 65 """ 66 return DirDelta_WriteSig(path_iter, cStringIO.StringIO(""), sig_outfp)
67 68
69 -def DirDelta(path_iter, dirsig_fileobj_list):
70 """ 71 Produce tarblock diff given dirsig_fileobj_list and pathiter 72 73 dirsig_fileobj_list should either be a tar fileobj or a list of 74 those, sorted so the most recent is last. 75 """ 76 global stats 77 stats = statistics.StatsDeltaProcess() 78 if type(dirsig_fileobj_list) is types.ListType: 79 sig_iter = combine_path_iters(map(sigtar2path_iter, 80 dirsig_fileobj_list)) 81 else: 82 sig_iter = sigtar2path_iter(dirsig_fileobj_list) 83 delta_iter = get_delta_iter(path_iter, sig_iter) 84 if globals.dry_run: 85 return DummyBlockIter(delta_iter) 86 else: 87 return DeltaTarBlockIter(delta_iter)
88 89
90 -def delta_iter_error_handler(exc, new_path, sig_path, sig_tar = None):
91 """ 92 Called by get_delta_iter, report error in getting delta 93 """ 94 if new_path: 95 index_string = new_path.get_relative_path() 96 elif sig_path: 97 index_string = sig_path.get_relative_path() 98 else: 99 assert 0, "Both new and sig are None for some reason" 100 log.Warn(_("Error %s getting delta for %s") % (str(exc), index_string)) 101 return None
102 103
104 -def get_delta_path(new_path, sig_path, sigTarFile = None):
105 """ 106 Return new delta_path which, when read, writes sig to sig_fileobj, 107 if sigTarFile is not None 108 """ 109 assert new_path 110 if sigTarFile: 111 ti = new_path.get_tarinfo() 112 index = new_path.index 113 delta_path = new_path.get_ropath() 114 log.Debug(_("Getting delta of %s and %s") % (new_path, sig_path)) 115 116 def callback(sig_string): 117 """ 118 Callback activated when FileWithSignature read to end 119 """ 120 ti.size = len(sig_string) 121 ti.name = "signature/" + "/".join(index) 122 sigTarFile.addfile(ti, cStringIO.StringIO(sig_string))
123 124 if new_path.isreg() and sig_path and sig_path.difftype == "signature": 125 delta_path.difftype = "diff" 126 old_sigfp = sig_path.open("rb") 127 newfp = FileWithReadCounter(new_path.open("rb")) 128 if sigTarFile: 129 newfp = FileWithSignature(newfp, callback, 130 new_path.getsize()) 131 delta_path.setfileobj(librsync.DeltaFile(old_sigfp, newfp)) 132 else: 133 delta_path.difftype = "snapshot" 134 if sigTarFile: 135 ti.name = "snapshot/" + "/".join(index) 136 if not new_path.isreg(): 137 if sigTarFile: 138 sigTarFile.addfile(ti) 139 if stats: 140 stats.SourceFileSize += delta_path.getsize() 141 else: 142 newfp = FileWithReadCounter(new_path.open("rb")) 143 if sigTarFile: 144 newfp = FileWithSignature(newfp, callback, 145 new_path.getsize()) 146 delta_path.setfileobj(newfp) 147 new_path.copy_attribs(delta_path) 148 delta_path.stat.st_size = new_path.stat.st_size 149 return delta_path 150 151
152 -def log_delta_path(delta_path, new_path = None, stats = None):
153 """ 154 Look at delta path and log delta. Add stats if new_path is set 155 """ 156 if delta_path.difftype == "snapshot": 157 if new_path and stats: 158 stats.add_new_file(new_path) 159 log.Info(_("A %s") % 160 (delta_path.get_relative_path(),), 161 log.InfoCode.diff_file_new, 162 util.escape(delta_path.get_relative_path())) 163 else: 164 if new_path and stats: 165 stats.add_changed_file(new_path) 166 log.Info(_("M %s") % 167 (delta_path.get_relative_path(),), 168 log.InfoCode.diff_file_changed, 169 util.escape(delta_path.get_relative_path()))
170 171
172 -def get_delta_iter(new_iter, sig_iter, sig_fileobj=None):
173 """ 174 Generate delta iter from new Path iter and sig Path iter. 175 176 For each delta path of regular file type, path.difftype with be 177 set to "snapshot", "diff". sig_iter will probably iterate ROPaths 178 instead of Paths. 179 180 If sig_fileobj is not None, will also write signatures to sig_fileobj. 181 """ 182 collated = collate2iters(new_iter, sig_iter) 183 if sig_fileobj: 184 sigTarFile = tarfile.TarFile("arbitrary", "w", sig_fileobj) 185 else: 186 sigTarFile = None 187 for new_path, sig_path in collated: 188 log.Debug(_("Comparing %s and %s") % (new_path and new_path.index, 189 sig_path and sig_path.index)) 190 if not new_path or not new_path.type: 191 # file doesn't exist 192 if sig_path and sig_path.exists(): 193 # but signature says it did 194 log.Info(_("D %s") % 195 (sig_path.get_relative_path(),), 196 log.InfoCode.diff_file_deleted, 197 util.escape(sig_path.get_relative_path())) 198 if sigTarFile: 199 ti = ROPath(sig_path.index).get_tarinfo() 200 ti.name = "deleted/" + "/".join(sig_path.index) 201 sigTarFile.addfile(ti) 202 stats.add_deleted_file() 203 yield ROPath(sig_path.index) 204 elif not sig_path or new_path != sig_path: 205 # Must calculate new signature and create delta 206 delta_path = robust.check_common_error(delta_iter_error_handler, 207 get_delta_path, 208 (new_path, sig_path, sigTarFile)) 209 if delta_path: 210 # log and collect stats 211 log_delta_path(delta_path, new_path, stats) 212 yield delta_path 213 else: 214 # if not, an error must have occurred 215 stats.Errors += 1 216 else: 217 stats.add_unchanged_file(new_path) 218 stats.close() 219 if sigTarFile: 220 sigTarFile.close()
221 222
223 -def sigtar2path_iter(sigtarobj):
224 """ 225 Convert signature tar file object open for reading into path iter 226 """ 227 tf = tarfile.TarFile("Arbitrary Name", "r", sigtarobj) 228 tf.debug = 2 229 for tarinfo in tf: 230 for prefix in ["signature/", "snapshot/", "deleted/"]: 231 if tarinfo.name.startswith(prefix): 232 # strip prefix and from name and set it to difftype 233 name, difftype = tarinfo.name[len(prefix):], prefix[:-1] 234 break 235 else: 236 raise DiffDirException("Bad tarinfo name %s" % (tarinfo.name,)) 237 238 index = tuple(name.split("/")) 239 if not index[-1]: 240 index = index[:-1] # deal with trailing /, "" 241 242 ropath = ROPath(index) 243 ropath.difftype = difftype 244 if difftype == "signature" or difftype == "snapshot": 245 ropath.init_from_tarinfo(tarinfo) 246 if ropath.isreg(): 247 ropath.setfileobj(tf.extractfile(tarinfo)) 248 yield ropath 249 sigtarobj.close()
250 251
252 -def collate2iters(riter1, riter2):
253 """ 254 Collate two iterators. 255 256 The elements yielded by each iterator must be have an index 257 variable, and this function returns pairs (elem1, elem2), (elem1, 258 None), or (None, elem2) two elements in a pair will have the same 259 index, and earlier indicies are yielded later than later indicies. 260 """ 261 relem1, relem2 = None, None 262 while 1: 263 if not relem1: 264 try: 265 relem1 = riter1.next() 266 except StopIteration: 267 if relem2: 268 yield (None, relem2) 269 for relem2 in riter2: 270 yield (None, relem2) 271 break 272 index1 = relem1.index 273 if not relem2: 274 try: 275 relem2 = riter2.next() 276 except StopIteration: 277 if relem1: 278 yield (relem1, None) 279 for relem1 in riter1: 280 yield (relem1, None) 281 break 282 index2 = relem2.index 283 284 if index1 < index2: 285 yield (relem1, None) 286 relem1 = None 287 elif index1 == index2: 288 yield (relem1, relem2) 289 relem1, relem2 = None, None 290 else: 291 # index2 is less 292 yield (None, relem2) 293 relem2 = None
294 295
296 -def combine_path_iters(path_iter_list):
297 """ 298 Produce new iterator by combining the iterators in path_iter_list 299 300 This new iter will iterate every path that is in path_iter_list in 301 order of increasing index. If multiple iterators in 302 path_iter_list yield paths with the same index, combine_path_iters 303 will discard all paths but the one yielded by the last path_iter. 304 305 This is used to combine signature iters, as the output will be a 306 full up-to-date signature iter. 307 """ 308 path_iter_list = path_iter_list[:] # copy before destructive reverse 309 path_iter_list.reverse() 310 311 def get_triple(iter_index): 312 """ 313 Represent the next element as a triple, to help sorting 314 """ 315 try: 316 path = path_iter_list[iter_index].next() 317 except StopIteration: 318 return None 319 return (path.index, iter_index, path)
320 321 def refresh_triple_list(triple_list): 322 """ 323 Update all elements with path_index same as first element 324 """ 325 path_index = triple_list[0][0] 326 iter_index = 0 327 while iter_index < len(triple_list): 328 old_triple = triple_list[iter_index] 329 if old_triple[0] == path_index: 330 new_triple = get_triple(old_triple[1]) 331 if new_triple: 332 triple_list[iter_index] = new_triple 333 iter_index += 1 334 else: 335 del triple_list[iter_index] 336 else: 337 break # assumed triple_list sorted, so can exit now 338 339 triple_list = filter(lambda x: x, map(get_triple, 340 range(len(path_iter_list)))) 341 while triple_list: 342 triple_list.sort() 343 yield triple_list[0][2] 344 refresh_triple_list(triple_list) 345 346
347 -def DirDelta_WriteSig(path_iter, sig_infp_list, newsig_outfp):
348 """ 349 Like DirDelta but also write signature into sig_fileobj 350 351 Like DirDelta, sig_infp_list can be a tar fileobj or a sorted list 352 of those. A signature will only be written to newsig_outfp if it 353 is different from (the combined) sig_infp_list. 354 """ 355 global stats 356 stats = statistics.StatsDeltaProcess() 357 if type(sig_infp_list) is types.ListType: 358 sig_path_iter = get_combined_path_iter(sig_infp_list) 359 else: 360 sig_path_iter = sigtar2path_iter(sig_infp_list) 361 delta_iter = get_delta_iter(path_iter, sig_path_iter, newsig_outfp) 362 if globals.dry_run: 363 return DummyBlockIter(delta_iter) 364 else: 365 return DeltaTarBlockIter(delta_iter)
366 367
368 -def get_combined_path_iter(sig_infp_list):
369 """ 370 Return path iter combining signatures in list of open sig files 371 """ 372 return combine_path_iters(map(sigtar2path_iter, sig_infp_list))
373 374
375 -class FileWithReadCounter:
376 """ 377 File-like object which also computes amount read as it is read 378 """
379 - def __init__(self, infile):
380 """FileWithReadCounter initializer""" 381 self.infile = infile
382
383 - def read(self, length = -1):
384 buf = self.infile.read(length) 385 if stats: 386 stats.SourceFileSize += len(buf) 387 return buf
388
389 - def close(self):
390 return self.infile.close()
391 392
393 -class FileWithSignature:
394 """ 395 File-like object which also computes signature as it is read 396 """ 397 blocksize = 32 * 1024
398 - def __init__(self, infile, callback, filelen, *extra_args):
399 """ 400 FileTee initializer 401 402 The object will act like infile, but whenever it is read it 403 add infile's data to a SigGenerator object. When the file has 404 been read to the end the callback will be called with the 405 calculated signature, and any extra_args if given. 406 407 filelen is used to calculate the block size of the signature. 408 """ 409 self.infile, self.callback = infile, callback 410 self.sig_gen = librsync.SigGenerator(get_block_size(filelen)) 411 self.activated_callback = None 412 self.extra_args = extra_args
413
414 - def read(self, length = -1):
415 buf = self.infile.read(length) 416 self.sig_gen.update(buf) 417 return buf
418
419 - def close(self):
420 # Make sure all of infile read 421 if not self.activated_callback: 422 while self.read(self.blocksize): 423 pass 424 self.activated_callback = 1 425 self.callback(self.sig_gen.getsig(), *self.extra_args) 426 return self.infile.close()
427 428
429 -class TarBlock:
430 """ 431 Contain information to add next file to tar 432 """
433 - def __init__(self, index, data):
434 """ 435 TarBlock initializer - just store data 436 """ 437 self.index = index 438 self.data = data
439 440
441 -class TarBlockIter:
442 """ 443 A bit like an iterator, yield tar blocks given input iterator 444 445 Unlike an iterator, however, control over the maximum size of a 446 tarblock is available by passing an argument to next(). Also the 447 get_footer() is available. 448 """
449 - def __init__(self, input_iter):
450 """ 451 TarBlockIter initializer 452 """ 453 self.input_iter = input_iter 454 self.offset = 0l # total length of data read 455 self.process_waiting = False # process_continued has more blocks 456 self.process_next_vol_number = None # next volume number to write in multivol 457 self.previous_index = None # holds index of last block returned 458 self.previous_block = None # holds block of last block returned 459 self.remember_next = False # see remember_next_index() 460 self.remember_value = None # holds index of next block 461 self.remember_block = None # holds block of next block 462 463 # We need to instantiate a dummy TarFile just to get access to 464 # some of the functions like _get_full_headers. 465 self.tf = tarfile.TarFromIterator(None)
466
467 - def tarinfo2tarblock(self, index, tarinfo, file_data = ""):
468 """ 469 Make tarblock out of tarinfo and file data 470 """ 471 tarinfo.size = len(file_data) 472 headers = self.tf._get_full_headers(tarinfo) 473 blocks, remainder = divmod(tarinfo.size, tarfile.BLOCKSIZE) #@UnusedVariable 474 if remainder > 0: 475 filler_data = "\0" * (tarfile.BLOCKSIZE - remainder) 476 else: 477 filler_data = "" 478 return TarBlock(index, "%s%s%s" % (headers, file_data, filler_data))
479
480 - def process(self, val, size):
481 """ 482 Turn next value of input_iter into a TarBlock 483 """ 484 assert not self.process_waiting 485 XXX # Override in subclass @UndefinedVariable
486
487 - def process_continued(self, size):
488 """ 489 Get more tarblocks 490 491 If processing val above would produce more than one TarBlock, 492 get the rest of them by calling process_continue. 493 """ 494 assert self.process_waiting 495 XXX # Override in subclass @UndefinedVariable
496
497 - def next(self, size = 1024 * 1024):
498 """ 499 Return next block, no bigger than size, and update offset 500 """ 501 if self.process_waiting: 502 result = self.process_continued(size) 503 else: 504 # Below a StopIteration exception will just be passed upwards 505 result = self.process(self.input_iter.next(), size) 506 block_number = self.process_next_vol_number 507 self.offset += len(result.data) 508 self.previous_index = result.index 509 self.previous_block = block_number 510 if self.remember_next: 511 self.remember_value = result.index 512 self.remember_block = block_number 513 self.remember_next = False 514 return result
515
516 - def get_previous_index(self):
517 """ 518 Return index of last tarblock, or None if no previous index 519 """ 520 return self.previous_index, self.previous_block
521
522 - def remember_next_index(self):
523 """ 524 When called, remember the index of the next block iterated 525 """ 526 self.remember_next = True 527 self.remember_value = None 528 self.remember_block = None
529
530 - def recall_index(self):
531 """ 532 Retrieve index remembered with remember_next_index 533 """ 534 return self.remember_value, self.remember_block
535 543
544 - def __iter__(self):
545 return self
546 547
548 -class DummyBlockIter(TarBlockIter):
549 """ 550 TarBlockIter that does no file reading 551 """
552 - def process(self, delta_ropath, size):
553 """ 554 Get a fake tarblock from delta_ropath 555 """ 556 ti = delta_ropath.get_tarinfo() 557 index = delta_ropath.index 558 559 # Return blocks of deleted files or fileless snapshots 560 if not delta_ropath.type or not delta_ropath.fileobj: 561 return self.tarinfo2tarblock(index, ti) 562 563 if stats: 564 # Since we don't read the source files, we can't analyze them. 565 # Best we can do is count them raw. 566 stats.SourceFiles += 1 567 stats.SourceFileSize += delta_ropath.getsize() 568 log.Progress(None, stats.SourceFileSize) 569 return self.tarinfo2tarblock(index, ti)
570 571
572 -class SigTarBlockIter(TarBlockIter):
573 """ 574 TarBlockIter that yields blocks of a signature tar from path_iter 575 """
576 - def process(self, path, size):
577 """ 578 Return associated signature TarBlock from path 579 580 Here size is just ignored --- let's hope a signature isn't too 581 big. Also signatures are stored in multiple volumes so it 582 doesn't matter. 583 """ 584 ti = path.get_tarinfo() 585 if path.isreg(): 586 sfp = librsync.SigFile(path.open("rb"), 587 get_block_size(path.getsize())) 588 sigbuf = sfp.read() 589 sfp.close() 590 ti.name = "signature/" + "/".join(path.index) 591 return self.tarinfo2tarblock(path.index, ti, sigbuf) 592 else: 593 ti.name = "snapshot/" + "/".join(path.index) 594 return self.tarinfo2tarblock(path.index, ti)
595 596
597 -class DeltaTarBlockIter(TarBlockIter):
598 """ 599 TarBlockIter that yields parts of a deltatar file 600 601 Unlike SigTarBlockIter, the argument to __init__ is a 602 delta_path_iter, so the delta information has already been 603 calculated. 604 """
605 - def process(self, delta_ropath, size):
606 """ 607 Get a tarblock from delta_ropath 608 """ 609 def add_prefix(tarinfo, prefix): 610 """Add prefix to the name of a tarinfo file""" 611 if tarinfo.name == ".": 612 tarinfo.name = prefix + "/" 613 else: 614 tarinfo.name = "%s/%s" % (prefix, tarinfo.name)
615 616 ti = delta_ropath.get_tarinfo() 617 index = delta_ropath.index 618 619 # Return blocks of deleted files or fileless snapshots 620 if not delta_ropath.type or not delta_ropath.fileobj: 621 if not delta_ropath.type: 622 add_prefix(ti, "deleted") 623 else: 624 assert delta_ropath.difftype == "snapshot" 625 add_prefix(ti, "snapshot") 626 return self.tarinfo2tarblock(index, ti) 627 628 # Now handle single volume block case 629 fp = delta_ropath.open("rb") 630 # Below the 512 is the usual length of a tar header 631 data, last_block = self.get_data_block(fp, size - 512) 632 if stats: 633 stats.RawDeltaSize += len(data) 634 if last_block: 635 if delta_ropath.difftype == "snapshot": 636 add_prefix(ti, "snapshot") 637 elif delta_ropath.difftype == "diff": 638 add_prefix(ti, "diff") 639 else: 640 assert 0, "Unknown difftype" 641 return self.tarinfo2tarblock(index, ti, data) 642 643 # Finally, do multivol snapshot or diff case 644 full_name = "multivol_%s/%s" % (delta_ropath.difftype, ti.name) 645 ti.name = full_name + "/1" 646 self.process_prefix = full_name 647 self.process_fp = fp 648 self.process_ropath = delta_ropath 649 self.process_waiting = 1 650 self.process_next_vol_number = 2 651 return self.tarinfo2tarblock(index, ti, data)
652
653 - def get_data_block(self, fp, max_size):
654 """ 655 Return pair (next data block, boolean last data block) 656 """ 657 read_size = min(64*1024, max(max_size, 512)) 658 buf = fp.read(read_size) 659 if len(buf) < read_size: 660 if fp.close(): 661 raise DiffDirException("Error closing file") 662 return (buf, True) 663 else: 664 return (buf, False)
665
666 - def process_continued(self, size):
667 """ 668 Return next volume in multivol diff or snapshot 669 """ 670 assert self.process_waiting 671 ropath = self.process_ropath 672 ti, index = ropath.get_tarinfo(), ropath.index 673 ti.name = "%s/%d" % (self.process_prefix, self.process_next_vol_number) 674 data, last_block = self.get_data_block(self.process_fp, size - 512) 675 if stats: 676 stats.RawDeltaSize += len(data) 677 if last_block: 678 self.process_prefix = None 679 self.process_fp = None 680 self.process_ropath = None 681 self.process_waiting = None 682 self.process_next_vol_number = None 683 else: 684 self.process_next_vol_number += 1 685 return self.tarinfo2tarblock(index, ti, data)
686 687
688 -def write_block_iter(block_iter, out_obj):
689 """ 690 Write block_iter to filename, path, or file object 691 """ 692 if isinstance(out_obj, Path): 693 fp = open(out_obj.name, "wb") 694 elif type(out_obj) is types.StringType: 695 fp = open(out_obj, "wb") 696 else: 697 fp = out_obj 698 for block in block_iter: 699 fp.write(block.data) 700 fp.write(block_iter.get_footer()) 701 assert not fp.close() 702 if isinstance(out_obj, Path): 703 out_obj.setdata()
704 705
706 -def get_block_size(file_len):
707 """ 708 Return a reasonable block size to use on files of length file_len 709 710 If the block size is too big, deltas will be bigger than is 711 necessary. If the block size is too small, making deltas and 712 patching can take a really long time. 713 """ 714 if file_len < 1024000: 715 return 512 # set minimum of 512 bytes 716 else: 717 # Split file into about 2000 pieces, rounding to 512 718 file_blocksize = long((file_len / (2000 * 512)) * 512) 719 return min(file_blocksize, 2048L)
720