"""Minimal wharf/pwr inspector CLI.""" from pathlib import Path from typing import Annotated import typer from rich.console import Console from rich.panel import Panel from rich.table import Table from . import ( MANIFEST_MAGIC, PATCH_MAGIC, SIGNATURE_MAGIC, WOUNDS_MAGIC, ManifestReader, PatchReader, SignatureReader, SyncOpType, WoundsReader, apply_patch_to_folders, ) from .wire import read_magic app = typer.Typer( name="pwr", help="Minimal wharf/pwr inspector and patcher.", add_completion=False, rich_markup_mode="rich", ) console = Console() @app.callback(invoke_without_command=True) def main(ctx: typer.Context) -> None: """Minimal wharf/pwr inspector and patcher.""" if ctx.invoked_subcommand is None: console.print(ctx.get_help()) def _inspect_patch(path: Path) -> None: """Display detailed patch file information.""" files = 0 rsync_files = 0 bsdiff_files = 0 ops = 0 data_bytes = 0 controls = 0 with PatchReader.open(str(path)) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for entry in reader.iter_file_entries(): files += 1 if entry.is_rsync(): rsync_files += 1 for op in entry.sync_ops or []: ops += 1 if op.type == SyncOpType.DATA: data_bytes += len(op.data or b"") elif entry.is_bsdiff(): bsdiff_files += 1 for ctrl in entry.bsdiff_controls or []: controls += 1 data_bytes += len(ctrl.add or b"") + len(ctrl.copy or b"") table = Table(title="[bold cyan]Patch File[/bold cyan]", show_header=False) table.add_column("Property", style="bold") table.add_column("Value", style="green") table.add_row("Files", f"{files} [dim](rsync={rsync_files}, bsdiff={bsdiff_files})[/dim]") table.add_row("Operations", str(ops)) table.add_row("Controls", str(controls)) table.add_row("Data bytes", f"{data_bytes:,}") table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") console.print(table) def _inspect_signature(path: Path) -> None: """Display signature file information.""" blocks = 0 with SignatureReader.open(str(path)) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for _ in reader.iter_block_hashes(): blocks += 1 table = Table(title="[bold cyan]Signature File[/bold cyan]", show_header=False) table.add_column("Property", style="bold") table.add_column("Value", style="green") table.add_row("Block hashes", f"{blocks:,}") table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") console.print(table) def _inspect_manifest(path: Path) -> None: """Display manifest file information.""" hashes = 0 with ManifestReader.open(str(path)) as reader: compression = ( reader.header.compression.algorithm if reader.header and reader.header.compression else None ) for _ in reader.iter_block_hashes(): hashes += 1 table = Table(title="[bold cyan]Manifest File[/bold cyan]", show_header=False) table.add_column("Property", style="bold") table.add_column("Value", style="green") table.add_row("Block hashes", f"{hashes:,}") table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]") console.print(table) def _inspect_wounds(path: Path) -> None: """Display wounds file information.""" wounds = 0 with WoundsReader.open(str(path)) as reader: for _ in reader.iter_wounds(): wounds += 1 table = Table(title="[bold cyan]Wounds File[/bold cyan]", show_header=False) table.add_column("Property", style="bold") table.add_column("Value", style="green") table.add_row("Wounds", f"{wounds:,}") console.print(table) def _do_inspect(path: Path) -> None: """Detect file type and inspect accordingly.""" with open(path, "rb") as handle: magic = read_magic(handle) if magic == PATCH_MAGIC: _inspect_patch(path) elif magic == SIGNATURE_MAGIC: _inspect_signature(path) elif magic == MANIFEST_MAGIC: _inspect_manifest(path) elif magic == WOUNDS_MAGIC: _inspect_wounds(path) else: console.print(f"[bold red]Error:[/bold red] Unknown file magic: {magic}") raise typer.Exit(1) @app.command() def inspect( path: Annotated[ Path, typer.Argument( help="Path to a wharf file (.pwr, .pws, .pwm, .pww)", exists=True, readable=True, ), ], ) -> None: """ [bold]Inspect[/bold] a wharf file and display its contents. Supports patch (.pwr), signature (.pws), manifest (.pwm), and wounds (.pww) files. """ _do_inspect(path) @app.command() def apply( patch: Annotated[ Path, typer.Argument( help="Path to the .pwr patch file", exists=True, readable=True, ), ], target: Annotated[ Path, typer.Argument( help="Folder containing the old version", exists=True, file_okay=False, dir_okay=True, ), ], output: Annotated[ Path, typer.Argument( help="Folder to write the new version", ), ], ) -> None: """ [bold]Apply[/bold] a patch file to update a folder. Takes the old version from TARGET and writes the patched result to OUTPUT. """ with console.status("[bold green]Applying patch...[/bold green]"): apply_patch_to_folders(str(patch), str(target), str(output)) console.print( Panel( f"[green]✓[/green] Patch applied successfully!\n\n" f"[dim]Source:[/dim] {target}\n" f"[dim]Output:[/dim] {output}", title="[bold green]Complete[/bold green]", border_style="green", ) ) if __name__ == "__main__": app()