from __future__ import annotations import subprocess from collections.abc import Iterable from dataclasses import dataclass @dataclass(frozen=True) class Hunk: header: str lines: list[str] def line_count(self) -> int: return 1 + len(self.lines) @dataclass(frozen=True) class FileDiff: path: str header_lines: list[str] hunks: list[Hunk] def line_count(self) -> int: return len(self.header_lines) + sum(h.line_count() for h in self.hunks) @dataclass(frozen=True) class DiffChunk: path: str header_lines: list[str] hunks: list[Hunk] def to_text(self) -> str: lines: list[str] = [] lines.extend(self.header_lines) for hunk in self.hunks: lines.append(hunk.header) lines.extend(hunk.lines) return "\n".join(lines).rstrip() + "\n" def line_count(self) -> int: return len(self.header_lines) + sum(h.line_count() for h in self.hunks) def run_git_diff(repo: str, base: str, head: str) -> str: cmd = [ "git", "-C", repo, "diff", f"{base}...{head}", "--unified=3", "--no-color", ] result = subprocess.run(cmd, check=False, capture_output=True, text=True) if result.returncode not in (0, 1): raise RuntimeError(result.stderr.strip() or "git diff failed") return result.stdout def parse_diff(diff_text: str) -> list[FileDiff]: files: list[FileDiff] = [] current_path: str | None = None header_lines: list[str] = [] hunks: list[Hunk] = [] current_hunk_header: str | None = None current_hunk_lines: list[str] = [] def flush_hunk() -> None: nonlocal current_hunk_header, current_hunk_lines, hunks if current_hunk_header is not None: hunks.append(Hunk(header=current_hunk_header, lines=current_hunk_lines)) current_hunk_header = None current_hunk_lines = [] def flush_file() -> None: nonlocal current_path, header_lines, hunks, files flush_hunk() if current_path is not None: files.append(FileDiff(path=current_path, header_lines=header_lines, hunks=hunks)) current_path = None header_lines = [] hunks = [] for raw_line in diff_text.splitlines(): # remove common test indentation while preserving diff markers (+/-/ ) line = raw_line.lstrip() if line.startswith("diff --git "): flush_file() header_lines = [line] parts = line.split() if len(parts) >= 4 and parts[3].startswith("b/"): current_path = parts[3][len("b/") :] else: current_path = None continue if line.startswith("+++ "): header_lines.append(line) if line.startswith("+++ b/"): current_path = line[len("+++ b/") :] continue if line.startswith("--- "): header_lines.append(line) continue if current_path is None and line.startswith("index "): header_lines.append(line) continue if line.startswith("@@ "): flush_hunk() current_hunk_header = line continue if current_hunk_header is not None: # append hunk lines without test indentation current_hunk_lines.append(line) elif line.strip() != "": header_lines.append(line) flush_file() return files def chunk_files(files: Iterable[FileDiff], max_lines: int = 350) -> list[DiffChunk]: chunks: list[DiffChunk] = [] for file in files: if file.line_count() <= max_lines: chunks.append( DiffChunk(path=file.path, header_lines=file.header_lines, hunks=file.hunks) ) continue current_hunks: list[Hunk] = [] current_lines = len(file.header_lines) for hunk in file.hunks: hunk_lines = hunk.line_count() if current_hunks and current_lines + hunk_lines > max_lines: chunks.append( DiffChunk(path=file.path, header_lines=file.header_lines, hunks=current_hunks) ) current_hunks = [] current_lines = len(file.header_lines) current_hunks.append(hunk) current_lines += hunk_lines if current_hunks: chunks.append( DiffChunk(path=file.path, header_lines=file.header_lines, hunks=current_hunks) ) return chunks