import argparse import sys from pwr import ( MANIFEST_MAGIC, PATCH_MAGIC, SIGNATURE_MAGIC, WOUNDS_MAGIC, ManifestReader, PatchReader, SignatureReader, SyncOpType, WoundsReader, apply_patch_to_folders, ) from pwr.wire import read_magic def _inspect_patch(path: str) -> None: files = 0 rsync_files = 0 bsdiff_files = 0 ops = 0 data_bytes = 0 controls = 0 with PatchReader.open(path) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for entry in reader.iter_file_entries(): files += 1 if entry.is_rsync(): rsync_files += 1 for op in entry.sync_ops or []: ops += 1 if op.type == SyncOpType.DATA: data_bytes += len(op.data or b"") elif entry.is_bsdiff(): bsdiff_files += 1 for ctrl in entry.bsdiff_controls or []: controls += 1 data_bytes += len(ctrl.add or b"") + len(ctrl.copy or b"") print("patch") print(f" files: {files} (rsync={rsync_files}, bsdiff={bsdiff_files})") print(f" ops: {ops}, controls: {controls}") print(f" data bytes: {data_bytes}") print(f" compression: {compression}") def _inspect_signature(path: str) -> None: blocks = 0 with SignatureReader.open(path) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for _ in reader.iter_block_hashes(): blocks += 1 print("signature") print(f" block hashes: {blocks}") print(f" compression: {compression}") def _inspect_manifest(path: str) -> None: hashes = 0 with ManifestReader.open(path) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for _ in reader.iter_block_hashes(): hashes += 1 print("manifest") print(f" block hashes: {hashes}") print(f" compression: {compression}") def _inspect_wounds(path: str) -> None: wounds = 0 with WoundsReader.open(path) as reader: for _ in reader.iter_wounds(): wounds += 1 print("wounds") print(f" wounds: {wounds}") def main() -> None: parser = argparse.ArgumentParser(description="Minimal wharf/pwr inspector") subparsers = parser.add_subparsers(dest="command") inspect_parser = subparsers.add_parser("inspect", help="inspect a wharf file") inspect_parser.add_argument("path", help="path to .pwr/.pws/.pwm/.pww file") apply_parser = subparsers.add_parser("apply", help="apply a .pwr patch to folders") apply_parser.add_argument("patch", help="path to .pwr file") apply_parser.add_argument("target", help="folder with the old version") apply_parser.add_argument("output", help="folder to write the new version") parser.add_argument( "path", nargs="?", help="path to .pwr/.pws/.pwm/.pww file (shorthand for inspect)", ) args = parser.parse_args() if args.command == "apply": apply_patch_to_folders(args.patch, args.target, args.output) return path = None if args.command == "inspect": path = args.path elif args.path: path = args.path if not path: parser.print_help(sys.stderr) return with open(path, "rb") as handle: magic = read_magic(handle) if magic == PATCH_MAGIC: _inspect_patch(path) elif magic == SIGNATURE_MAGIC: _inspect_signature(path) elif magic == MANIFEST_MAGIC: _inspect_manifest(path) elif magic == WOUNDS_MAGIC: _inspect_wounds(path) else: print(f"unknown magic: {magic}") if __name__ == "__main__": main()