Files
2026-06-02 18:59:31 +02:00

214 lines
5.5 KiB
Python

import re
import subprocess
from functools import lru_cache
from pathlib import Path
from typing import Callable
from .settings import (
BLACKDETECT_DURATION,
BLACKDETECT_PIX_TH,
BLACKDETECT_WINDOW,
FFMPEG_BIN,
FFPROBE_BIN,
)
_BLACK_RE = re.compile(r"black_start:(?P<start>[0-9\.]+)\s+black_end:(?P<end>[0-9\.]+)")
def probe_duration(path: str) -> float:
cmd = [
FFPROBE_BIN,
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "ffprobe failed")
return float(result.stdout.strip())
def _parse_rate(value: str) -> float | None:
if not value:
return None
if "/" in value:
num, den = value.split("/", 1)
try:
denominator = float(den)
if denominator == 0:
return None
return float(num) / denominator
except ValueError:
return None
try:
return float(value)
except ValueError:
return None
def probe_framerate(path: str) -> float | None:
cmd = [
FFPROBE_BIN,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=avg_frame_rate,r_frame_rate",
"-of",
"default=noprint_wrappers=1:nokey=1",
str(path),
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
return None
for line in result.stdout.splitlines():
parsed = _parse_rate(line.strip())
if parsed and parsed > 0:
return parsed
return None
def run_blackdetect(path: str, start: float, duration: float) -> list[tuple[float, float]]:
cmd = [
FFMPEG_BIN,
"-hide_banner",
"-ss",
str(start),
"-t",
str(duration),
"-i",
str(path),
"-vf",
f"blackdetect=d={BLACKDETECT_DURATION}:pix_th={BLACKDETECT_PIX_TH}",
"-an",
"-f",
"null",
"-",
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "ffmpeg blackdetect failed")
segments: list[tuple[float, float]] = []
for line in result.stderr.splitlines():
match = _BLACK_RE.search(line)
if match:
seg_start = float(match.group("start")) + start
seg_end = float(match.group("end")) + start
segments.append((seg_start, seg_end))
return segments
def autodetect_cuts(
path: str,
total_duration: float,
intro_seconds: float,
outro_seconds: float,
segments_count: int,
window_seconds: float | None = None,
progress_cb: Callable[[int, int], None] | None = None,
) -> list[float]:
if segments_count < 2:
return []
window = window_seconds or BLACKDETECT_WINDOW
core_duration = max(0.0, total_duration - intro_seconds - outro_seconds)
if core_duration <= 0:
return []
segment_len = core_duration / float(segments_count)
cut_points: list[float] = []
total_cuts = segments_count - 1
for index in range(1, segments_count):
theoretical = intro_seconds + segment_len * index
scan_start = max(0.0, theoretical - window)
scan_duration = min(window * 2.0, total_duration - scan_start)
black_segments = run_blackdetect(path, scan_start, scan_duration)
if black_segments:
best = max(black_segments, key=lambda item: item[1] - item[0])
cut_point = (best[0] + best[1]) / 2.0
else:
cut_point = theoretical
cut_points.append(round(cut_point, 3))
if progress_cb:
progress_cb(index, total_cuts)
return cut_points
def extract_frame_bytes(
path: str,
timestamp: float,
fast: bool = False,
scale_width: int | None = None,
) -> bytes:
file_path = Path(path)
stat = file_path.stat()
timestamp_ms = max(0, round(timestamp * 1000))
return _extract_frame_bytes_cached(
str(file_path),
stat.st_mtime_ns,
timestamp_ms,
fast,
scale_width,
)
@lru_cache(maxsize=256)
def _extract_frame_bytes_cached(
path: str,
mtime_ns: int,
timestamp_ms: int,
fast: bool,
scale_width: int | None,
) -> bytes:
del mtime_ns
timestamp = timestamp_ms / 1000.0
cmd = [
FFMPEG_BIN,
"-hide_banner",
"-loglevel",
"error",
]
if fast:
cmd += [
"-ss",
str(timestamp),
"-i",
str(path),
]
else:
seek_start = max(timestamp - 2.0, 0.0)
cmd += [
"-ss",
str(seek_start),
"-i",
str(path),
"-ss",
str(timestamp - seek_start),
]
cmd += [
"-frames:v",
"1",
]
if scale_width:
cmd += ["-vf", f"scale={scale_width}:-2"]
cmd += [
"-an",
"-f",
"image2pipe",
"-vcodec",
"mjpeg",
"-",
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if result.returncode != 0:
error = result.stderr.decode("utf-8", errors="ignore").strip()
raise RuntimeError(error or "ffmpeg frame extraction failed")
return result.stdout