227 lines
6.2 KiB
Python
227 lines
6.2 KiB
Python
"""Minimal wharf/pwr inspector CLI."""
|
|
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
|
|
from . import (
|
|
MANIFEST_MAGIC,
|
|
PATCH_MAGIC,
|
|
SIGNATURE_MAGIC,
|
|
WOUNDS_MAGIC,
|
|
ManifestReader,
|
|
PatchReader,
|
|
SignatureReader,
|
|
SyncOpType,
|
|
WoundsReader,
|
|
apply_patch_to_folders,
|
|
)
|
|
from .wire import read_magic
|
|
|
|
app = typer.Typer(
|
|
name="pwr",
|
|
help="Minimal wharf/pwr inspector and patcher.",
|
|
add_completion=False,
|
|
rich_markup_mode="rich",
|
|
)
|
|
console = Console()
|
|
|
|
|
|
@app.callback(invoke_without_command=True)
|
|
def main(ctx: typer.Context) -> None:
|
|
"""Minimal wharf/pwr inspector and patcher."""
|
|
if ctx.invoked_subcommand is None:
|
|
console.print(ctx.get_help())
|
|
|
|
|
|
def _inspect_patch(path: Path) -> None:
|
|
"""Display detailed patch file information."""
|
|
files = 0
|
|
rsync_files = 0
|
|
bsdiff_files = 0
|
|
ops = 0
|
|
data_bytes = 0
|
|
controls = 0
|
|
|
|
with PatchReader.open(str(path)) as reader:
|
|
compression = (
|
|
reader.header.compression.algorithm
|
|
if reader.header and reader.header.compression
|
|
else None
|
|
)
|
|
for entry in reader.iter_file_entries():
|
|
files += 1
|
|
if entry.is_rsync():
|
|
rsync_files += 1
|
|
for op in entry.sync_ops or []:
|
|
ops += 1
|
|
if op.type == SyncOpType.DATA:
|
|
data_bytes += len(op.data or b"")
|
|
elif entry.is_bsdiff():
|
|
bsdiff_files += 1
|
|
for ctrl in entry.bsdiff_controls or []:
|
|
controls += 1
|
|
data_bytes += len(ctrl.add or b"") + len(ctrl.copy or b"")
|
|
|
|
table = Table(title="[bold cyan]Patch File[/bold cyan]", show_header=False)
|
|
table.add_column("Property", style="bold")
|
|
table.add_column("Value", style="green")
|
|
|
|
table.add_row("Files", f"{files} [dim](rsync={rsync_files}, bsdiff={bsdiff_files})[/dim]")
|
|
table.add_row("Operations", str(ops))
|
|
table.add_row("Controls", str(controls))
|
|
table.add_row("Data bytes", f"{data_bytes:,}")
|
|
table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]")
|
|
|
|
console.print(table)
|
|
|
|
|
|
def _inspect_signature(path: Path) -> None:
|
|
"""Display signature file information."""
|
|
blocks = 0
|
|
with SignatureReader.open(str(path)) as reader:
|
|
compression = (
|
|
reader.header.compression.algorithm
|
|
if reader.header and reader.header.compression
|
|
else None
|
|
)
|
|
for _ in reader.iter_block_hashes():
|
|
blocks += 1
|
|
|
|
table = Table(title="[bold cyan]Signature File[/bold cyan]", show_header=False)
|
|
table.add_column("Property", style="bold")
|
|
table.add_column("Value", style="green")
|
|
|
|
table.add_row("Block hashes", f"{blocks:,}")
|
|
table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]")
|
|
|
|
console.print(table)
|
|
|
|
|
|
def _inspect_manifest(path: Path) -> None:
|
|
"""Display manifest file information."""
|
|
hashes = 0
|
|
with ManifestReader.open(str(path)) as reader:
|
|
compression = (
|
|
reader.header.compression.algorithm
|
|
if reader.header and reader.header.compression
|
|
else None
|
|
)
|
|
for _ in reader.iter_block_hashes():
|
|
hashes += 1
|
|
|
|
table = Table(title="[bold cyan]Manifest File[/bold cyan]", show_header=False)
|
|
table.add_column("Property", style="bold")
|
|
table.add_column("Value", style="green")
|
|
|
|
table.add_row("Block hashes", f"{hashes:,}")
|
|
table.add_row("Compression", str(compression) if compression else "[dim]none[/dim]")
|
|
|
|
console.print(table)
|
|
|
|
|
|
def _inspect_wounds(path: Path) -> None:
|
|
"""Display wounds file information."""
|
|
wounds = 0
|
|
with WoundsReader.open(str(path)) as reader:
|
|
for _ in reader.iter_wounds():
|
|
wounds += 1
|
|
|
|
table = Table(title="[bold cyan]Wounds File[/bold cyan]", show_header=False)
|
|
table.add_column("Property", style="bold")
|
|
table.add_column("Value", style="green")
|
|
|
|
table.add_row("Wounds", f"{wounds:,}")
|
|
|
|
console.print(table)
|
|
|
|
|
|
def _do_inspect(path: Path) -> None:
|
|
"""Detect file type and inspect accordingly."""
|
|
with open(path, "rb") as handle:
|
|
magic = read_magic(handle)
|
|
|
|
if magic == PATCH_MAGIC:
|
|
_inspect_patch(path)
|
|
elif magic == SIGNATURE_MAGIC:
|
|
_inspect_signature(path)
|
|
elif magic == MANIFEST_MAGIC:
|
|
_inspect_manifest(path)
|
|
elif magic == WOUNDS_MAGIC:
|
|
_inspect_wounds(path)
|
|
else:
|
|
console.print(f"[bold red]Error:[/bold red] Unknown file magic: {magic}")
|
|
raise typer.Exit(1)
|
|
|
|
|
|
@app.command()
|
|
def inspect(
|
|
path: Annotated[
|
|
Path,
|
|
typer.Argument(
|
|
help="Path to a wharf file (.pwr, .pws, .pwm, .pww)",
|
|
exists=True,
|
|
readable=True,
|
|
),
|
|
],
|
|
) -> None:
|
|
"""
|
|
[bold]Inspect[/bold] a wharf file and display its contents.
|
|
|
|
Supports patch (.pwr), signature (.pws), manifest (.pwm), and wounds (.pww) files.
|
|
"""
|
|
_do_inspect(path)
|
|
|
|
|
|
@app.command()
|
|
def apply(
|
|
patch: Annotated[
|
|
Path,
|
|
typer.Argument(
|
|
help="Path to the .pwr patch file",
|
|
exists=True,
|
|
readable=True,
|
|
),
|
|
],
|
|
target: Annotated[
|
|
Path,
|
|
typer.Argument(
|
|
help="Folder containing the old version",
|
|
exists=True,
|
|
file_okay=False,
|
|
dir_okay=True,
|
|
),
|
|
],
|
|
output: Annotated[
|
|
Path,
|
|
typer.Argument(
|
|
help="Folder to write the new version",
|
|
),
|
|
],
|
|
) -> None:
|
|
"""
|
|
[bold]Apply[/bold] a patch file to update a folder.
|
|
|
|
Takes the old version from TARGET and writes the patched result to OUTPUT.
|
|
"""
|
|
with console.status("[bold green]Applying patch...[/bold green]"):
|
|
apply_patch_to_folders(str(patch), str(target), str(output))
|
|
|
|
console.print(
|
|
Panel(
|
|
f"[green]✓[/green] Patch applied successfully!\n\n"
|
|
f"[dim]Source:[/dim] {target}\n"
|
|
f"[dim]Output:[/dim] {output}",
|
|
title="[bold green]Complete[/bold green]",
|
|
border_style="green",
|
|
)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|