diff --git a/main.py b/main.py deleted file mode 100644 index 4bf8a9a..0000000 --- a/main.py +++ /dev/null @@ -1,143 +0,0 @@ -import argparse -import sys - -from pwr import ( - MANIFEST_MAGIC, - PATCH_MAGIC, - SIGNATURE_MAGIC, - WOUNDS_MAGIC, - ManifestReader, - PatchReader, - SignatureReader, - SyncOpType, - WoundsReader, - apply_patch_to_folders, -) -from pwr.wire import read_magic - - -def _inspect_patch(path: str) -> None: - files = 0 - rsync_files = 0 - bsdiff_files = 0 - ops = 0 - data_bytes = 0 - controls = 0 - - with PatchReader.open(path) as reader: - compression = ( - reader.header.compression.algorithm - if reader.header and reader.header.compression - else None - ) - for entry in reader.iter_file_entries(): - files += 1 - if entry.is_rsync(): - rsync_files += 1 - for op in entry.sync_ops or []: - ops += 1 - if op.type == SyncOpType.DATA: - data_bytes += len(op.data or b"") - elif entry.is_bsdiff(): - bsdiff_files += 1 - for ctrl in entry.bsdiff_controls or []: - controls += 1 - data_bytes += len(ctrl.add or b"") + len(ctrl.copy or b"") - - print("patch") - print(f" files: {files} (rsync={rsync_files}, bsdiff={bsdiff_files})") - print(f" ops: {ops}, controls: {controls}") - print(f" data bytes: {data_bytes}") - print(f" compression: {compression}") - - -def _inspect_signature(path: str) -> None: - blocks = 0 - with SignatureReader.open(path) as reader: - compression = ( - reader.header.compression.algorithm - if reader.header and reader.header.compression - else None - ) - for _ in reader.iter_block_hashes(): - blocks += 1 - print("signature") - print(f" block hashes: {blocks}") - print(f" compression: {compression}") - - -def _inspect_manifest(path: str) -> None: - hashes = 0 - with ManifestReader.open(path) as reader: - compression = ( - reader.header.compression.algorithm - if reader.header and reader.header.compression - else None - ) - for _ in reader.iter_block_hashes(): - hashes += 1 - print("manifest") - print(f" block hashes: {hashes}") - print(f" compression: {compression}") - - -def _inspect_wounds(path: str) -> None: - wounds = 0 - with WoundsReader.open(path) as reader: - for _ in reader.iter_wounds(): - wounds += 1 - print("wounds") - print(f" wounds: {wounds}") - - -def main() -> None: - parser = argparse.ArgumentParser(description="Minimal wharf/pwr inspector") - subparsers = parser.add_subparsers(dest="command") - - inspect_parser = subparsers.add_parser("inspect", help="inspect a wharf file") - inspect_parser.add_argument("path", help="path to .pwr/.pws/.pwm/.pww file") - - apply_parser = subparsers.add_parser("apply", help="apply a .pwr patch to folders") - apply_parser.add_argument("patch", help="path to .pwr file") - apply_parser.add_argument("target", help="folder with the old version") - apply_parser.add_argument("output", help="folder to write the new version") - - parser.add_argument( - "path", - nargs="?", - help="path to .pwr/.pws/.pwm/.pww file (shorthand for inspect)", - ) - - args = parser.parse_args() - - if args.command == "apply": - apply_patch_to_folders(args.patch, args.target, args.output) - return - - path = None - if args.command == "inspect": - path = args.path - elif args.path: - path = args.path - - if not path: - parser.print_help(sys.stderr) - return - - with open(path, "rb") as handle: - magic = read_magic(handle) - - if magic == PATCH_MAGIC: - _inspect_patch(path) - elif magic == SIGNATURE_MAGIC: - _inspect_signature(path) - elif magic == MANIFEST_MAGIC: - _inspect_manifest(path) - elif magic == WOUNDS_MAGIC: - _inspect_wounds(path) - else: - print(f"unknown magic: {magic}") - - -if __name__ == "__main__": - main() diff --git a/pwr/cli.py b/pwr/cli.py new file mode 100644 index 0000000..a173273 --- /dev/null +++ b/pwr/cli.py @@ -0,0 +1,226 @@ +"""Minimal wharf/pwr inspector CLI.""" + +from pathlib import Path +from typing import Annotated + +import typer +from rich.console import Console +from rich.panel import Panel +from rich.table import Table + +from . import ( + MANIFEST_MAGIC, + PATCH_MAGIC, + SIGNATURE_MAGIC, + WOUNDS_MAGIC, + ManifestReader, + PatchReader, + SignatureReader, + SyncOpType, + WoundsReader, + apply_patch_to_folders, +) +from .wire import read_magic + +app = typer.Typer( + name="pwr", + help="Minimal wharf/pwr inspector and patcher.", + add_completion=False, + rich_markup_mode="rich", +) +console = Console() + + +@app.callback(invoke_without_command=True) +def main(ctx: typer.Context) -> None: + """Minimal wharf/pwr inspector and patcher.""" + if ctx.invoked_subcommand is None: + console.print(ctx.get_help()) + + +def _inspect_patch(path: Path) -> None: + """Display detailed patch file information.""" + files = 0 + rsync_files = 0 + bsdiff_files = 0 + ops = 0 + data_bytes = 0 + controls = 0 + + with PatchReader.open(str(path)) as reader: + compression = ( + reader.header.compression.algorithm + if reader.header and reader.header.compression + else None + ) + for entry in reader.iter_file_entries(): + files += 1 + if entry.is_rsync(): + rsync_files += 1 + for op in entry.sync_ops or []: + ops += 1 + if op.type == SyncOpType.DATA: + data_bytes += len(op.data or b"") + elif entry.is_bsdiff(): + bsdiff_files += 1 + for ctrl in entry.bsdiff_controls or []: + controls += 1 + data_bytes += len(ctrl.add or b"") + len(ctrl.copy or b"") + + table = Table(title="[bold cyan]Patch File[/bold cyan]", show_header=False) + table.add_column("Property", style="bold") + table.add_column("Value", style="green") + + table.add_row("Files", f"{files} [dim](rsync={rsync_files}, bsdiff={bsdiff_files})[/dim]") + table.add_row("Operations", str(ops)) + table.add_row("Controls", str(controls)) + table.add_row("Data bytes", f"{data_bytes:,}") + table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") + + console.print(table) + + +def _inspect_signature(path: Path) -> None: + """Display signature file information.""" + blocks = 0 + with SignatureReader.open(str(path)) as reader: + compression = ( + reader.header.compression.algorithm + if reader.header and reader.header.compression + else None + ) + for _ in reader.iter_block_hashes(): + blocks += 1 + + table = Table(title="[bold cyan]Signature File[/bold cyan]", show_header=False) + table.add_column("Property", style="bold") + table.add_column("Value", style="green") + + table.add_row("Block hashes", f"{blocks:,}") + table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") + + console.print(table) + + +def _inspect_manifest(path: Path) -> None: + """Display manifest file information.""" + hashes = 0 + with ManifestReader.open(str(path)) as reader: + compression = ( + reader.header.compression.algorithm + if reader.header and reader.header.compression + else None + ) + for _ in reader.iter_block_hashes(): + hashes += 1 + + table = Table(title="[bold cyan]Manifest File[/bold cyan]", show_header=False) + table.add_column("Property", style="bold") + table.add_column("Value", style="green") + + table.add_row("Block hashes", f"{hashes:,}") + table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") + + console.print(table) + + +def _inspect_wounds(path: Path) -> None: + """Display wounds file information.""" + wounds = 0 + with WoundsReader.open(str(path)) as reader: + for _ in reader.iter_wounds(): + wounds += 1 + + table = Table(title="[bold cyan]Wounds File[/bold cyan]", show_header=False) + table.add_column("Property", style="bold") + table.add_column("Value", style="green") + + table.add_row("Wounds", f"{wounds:,}") + + console.print(table) + + +def _do_inspect(path: Path) -> None: + """Detect file type and inspect accordingly.""" + with open(path, "rb") as handle: + magic = read_magic(handle) + + if magic == PATCH_MAGIC: + _inspect_patch(path) + elif magic == SIGNATURE_MAGIC: + _inspect_signature(path) + elif magic == MANIFEST_MAGIC: + _inspect_manifest(path) + elif magic == WOUNDS_MAGIC: + _inspect_wounds(path) + else: + console.print(f"[bold red]Error:[/bold red] Unknown file magic: {magic}") + raise typer.Exit(1) + + +@app.command() +def inspect( + path: Annotated[ + Path, + typer.Argument( + help="Path to a wharf file (.pwr, .pws, .pwm, .pww)", + exists=True, + readable=True, + ), + ], +) -> None: + """ + [bold]Inspect[/bold] a wharf file and display its contents. + + Supports patch (.pwr), signature (.pws), manifest (.pwm), and wounds (.pww) files. + """ + _do_inspect(path) + + +@app.command() +def apply( + patch: Annotated[ + Path, + typer.Argument( + help="Path to the .pwr patch file", + exists=True, + readable=True, + ), + ], + target: Annotated[ + Path, + typer.Argument( + help="Folder containing the old version", + exists=True, + file_okay=False, + dir_okay=True, + ), + ], + output: Annotated[ + Path, + typer.Argument( + help="Folder to write the new version", + ), + ], +) -> None: + """ + [bold]Apply[/bold] a patch file to update a folder. + + Takes the old version from TARGET and writes the patched result to OUTPUT. + """ + with console.status("[bold green]Applying patch...[/bold green]"): + apply_patch_to_folders(str(patch), str(target), str(output)) + + console.print( + Panel( + f"[green]✓[/green] Patch applied successfully!\n\n" + f"[dim]Source:[/dim] {target}\n" + f"[dim]Output:[/dim] {output}", + title="[bold green]Complete[/bold green]", + border_style="green", + ) + ) + + +if __name__ == "__main__": + app() diff --git a/pyproject.toml b/pyproject.toml index a23adb5..32cdb55 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,5 +6,10 @@ readme = "README.md" requires-python = ">=3.12" dependencies = [ "brotli>=1.2.0", + "typer>=0.15.0", + "rich>=13.0.0", "zstandard>=0.25.0", ] + +[project.scripts] +pwr = "pwr.cli:app"