import re import subprocess from functools import lru_cache from pathlib import Path from typing import Callable from .settings import ( BLACKDETECT_DURATION, BLACKDETECT_PIX_TH, BLACKDETECT_WINDOW, FFMPEG_BIN, FFPROBE_BIN, ) _BLACK_RE = re.compile(r"black_start:(?P[0-9\.]+)\s+black_end:(?P[0-9\.]+)") def probe_duration(path: str) -> float: cmd = [ FFPROBE_BIN, "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", str(path), ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "ffprobe failed") return float(result.stdout.strip()) def _parse_rate(value: str) -> float | None: if not value: return None if "/" in value: num, den = value.split("/", 1) try: denominator = float(den) if denominator == 0: return None return float(num) / denominator except ValueError: return None try: return float(value) except ValueError: return None def probe_framerate(path: str) -> float | None: cmd = [ FFPROBE_BIN, "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=avg_frame_rate,r_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", str(path), ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: return None for line in result.stdout.splitlines(): parsed = _parse_rate(line.strip()) if parsed and parsed > 0: return parsed return None def run_blackdetect(path: str, start: float, duration: float) -> list[tuple[float, float]]: cmd = [ FFMPEG_BIN, "-hide_banner", "-ss", str(start), "-t", str(duration), "-i", str(path), "-vf", f"blackdetect=d={BLACKDETECT_DURATION}:pix_th={BLACKDETECT_PIX_TH}", "-an", "-f", "null", "-", ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "ffmpeg blackdetect failed") segments: list[tuple[float, float]] = [] for line in result.stderr.splitlines(): match = _BLACK_RE.search(line) if match: seg_start = float(match.group("start")) + start seg_end = float(match.group("end")) + start segments.append((seg_start, seg_end)) return segments def autodetect_cuts( path: str, total_duration: float, intro_seconds: float, outro_seconds: float, segments_count: int, window_seconds: float | None = None, progress_cb: Callable[[int, int], None] | None = None, ) -> list[float]: if segments_count < 2: return [] window = window_seconds or BLACKDETECT_WINDOW core_duration = max(0.0, total_duration - intro_seconds - outro_seconds) if core_duration <= 0: return [] segment_len = core_duration / float(segments_count) cut_points: list[float] = [] total_cuts = segments_count - 1 for index in range(1, segments_count): theoretical = intro_seconds + segment_len * index scan_start = max(0.0, theoretical - window) scan_duration = min(window * 2.0, total_duration - scan_start) black_segments = run_blackdetect(path, scan_start, scan_duration) if black_segments: best = max(black_segments, key=lambda item: item[1] - item[0]) cut_point = (best[0] + best[1]) / 2.0 else: cut_point = theoretical cut_points.append(round(cut_point, 3)) if progress_cb: progress_cb(index, total_cuts) return cut_points def extract_frame_bytes( path: str, timestamp: float, fast: bool = False, scale_width: int | None = None, ) -> bytes: file_path = Path(path) stat = file_path.stat() timestamp_ms = max(0, round(timestamp * 1000)) return _extract_frame_bytes_cached( str(file_path), stat.st_mtime_ns, timestamp_ms, fast, scale_width, ) @lru_cache(maxsize=256) def _extract_frame_bytes_cached( path: str, mtime_ns: int, timestamp_ms: int, fast: bool, scale_width: int | None, ) -> bytes: del mtime_ns timestamp = timestamp_ms / 1000.0 cmd = [ FFMPEG_BIN, "-hide_banner", "-loglevel", "error", ] if fast: cmd += [ "-ss", str(timestamp), "-i", str(path), ] else: seek_start = max(timestamp - 2.0, 0.0) cmd += [ "-ss", str(seek_start), "-i", str(path), "-ss", str(timestamp - seek_start), ] cmd += [ "-frames:v", "1", ] if scale_width: cmd += ["-vf", f"scale={scale_width}:-2"] cmd += [ "-an", "-f", "image2pipe", "-vcodec", "mjpeg", "-", ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: error = result.stderr.decode("utf-8", errors="ignore").strip() raise RuntimeError(error or "ffmpeg frame extraction failed") return result.stdout