Files
Video-Cutter/backend/app/mkv.py
T
PickleRick 4a7d1e6663 1. fix bulk edit and export Now I can only edit one video at the time overwise the timestamp get overwritten. Save timestamps for each file uploaded
2. in export bulk make clear wich files will get elaborated and whick is being elaborated also add multple progress bar one for bulk one for the actual video and one for actual process (use Material You expresssive and make them clear)
3. remove save marker button; autosave then save edit
4. add paste timestamp button on hover other timestamp input
5. make unlink button not global but add a link icon colored by couple to let understand which timestamps are linked and make the single link removed (link should work that if I edit a timestamp the next/previuos based on which are linked get automatically setted to the next/previous frame timestamp)
6. I'm unable to pit whichever timestamp I want in the timestamp table for example 00:07:23.109 (the one in the frame visualizer) get automatically changed to 00:07:23.110 making the colored border not working correctly sometimes fix both
2026-06-03 00:54:41 +02:00

500 lines
15 KiB
Python

import os
import re
import shlex
import subprocess
import time
import logging
from pathlib import Path
from typing import Callable
from .settings import FFMPEG_BIN, MKVMERGE_BIN
ProgressCallback = Callable[[float, float, str | None, dict | None], None]
LogCallback = Callable[[str], None]
logger = logging.getLogger(__name__)
def _logical_cpus() -> int:
return max(1, os.cpu_count() or 1)
def _format_eta(seconds: float | None) -> str | None:
if seconds is None or not isinstance(seconds, (int, float)) or seconds < 0:
return None
seconds = int(round(seconds))
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
if hours:
return f"{hours:d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"
def _run_mkvmerge(
cmd: list[str],
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> None:
run_cmd = [cmd[0], "--gui-mode", *cmd[1:]]
logger.info("Starting mkvmerge process: %s", label)
if log_cb:
log_cb(f"Starting mkvmerge: {label}")
process = subprocess.Popen(
run_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
output: list[str] = []
pattern = re.compile(r"#GUI#progress\s+(\d+)")
if process.stdout:
for line in process.stdout:
output.append(line)
match = pattern.search(line)
if match and progress_cb:
stage_progress = max(0.0, min(1.0, int(match.group(1)) / 100.0))
progress_cb(
progress_start + progress_weight * stage_progress,
1.0,
f"{label}: {int(round(stage_progress * 100))}%",
{"stage": label, "stage_progress": stage_progress},
)
if process.wait() != 0:
logger.error("mkvmerge failed: %s", label)
raise RuntimeError("".join(output).strip() or "mkvmerge failed")
def _run_ffmpeg_template(
template: str,
input_path: str,
output_path: Path,
start: float,
duration: float,
stats_path: Path,
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> None:
threads = _logical_cpus()
placeholders = {
"ffmpeg": FFMPEG_BIN,
"input": str(Path(input_path)).replace("\\", "/"),
"output": str(output_path).replace("\\", "/"),
"start": f"{max(0.0, start):.6f}",
"duration": f"{max(0.0, duration):.6f}",
"end": f"{max(0.0, start + duration):.6f}",
"stats": str(stats_path).replace("\\", "/"),
"null": "NUL" if os.name == "nt" else "/dev/null",
"threads": str(threads),
"x265_threads": str(threads),
}
command = template.format(**placeholders).strip()
if not command:
return
args = shlex.split(command, posix=True)
if args and args[0].lower() == "ffmpeg":
args[0] = FFMPEG_BIN
if "-threads" not in args:
args = [args[0], "-threads", str(threads), *args[1:]]
if "-x265-params" in args:
param_index = args.index("-x265-params") + 1
if param_index < len(args) and "pools=" not in args[param_index]:
args[param_index] = f"{args[param_index]}:pools=+"
if "-progress" not in args:
args = [args[0], "-hide_banner", "-nostats", "-progress", "pipe:1", *args[1:]]
if log_cb:
log_cb(f"Starting ffmpeg: {label} ({format_timestamp(start)} + {duration:.3f}s)")
logger.info("Starting ffmpeg process: %s (%s + %.3fs)", label, format_timestamp(start), duration)
env = os.environ.copy()
env.setdefault("OMP_NUM_THREADS", str(threads))
env.setdefault("X265_NUM_THREADS", str(threads))
process = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
env=env,
)
output: list[str] = []
last_stage_progress = 0.0
if process.stdout:
for line in process.stdout:
output.append(line)
if "=" not in line:
continue
key, value = line.strip().split("=", 1)
elapsed = None
if key == "out_time_ms":
try:
elapsed = float(value) / 1_000_000.0
except ValueError:
elapsed = None
elif key == "out_time_us":
try:
elapsed = float(value) / 1_000_000.0
except ValueError:
elapsed = None
elif key == "out_time":
parts = value.split(":")
if len(parts) == 3:
try:
elapsed = int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])
except ValueError:
elapsed = None
if elapsed is not None and duration > 0 and progress_cb:
stage_progress = max(last_stage_progress, min(1.0, elapsed / duration))
last_stage_progress = stage_progress
progress_cb(
progress_start + progress_weight * stage_progress,
1.0,
f"{label}: {int(round(stage_progress * 100))}%",
{"stage": label, "stage_progress": stage_progress},
)
if process.wait() != 0:
logger.error("ffmpeg failed: %s", label)
raise RuntimeError("".join(output[-80:]).strip() or "ffmpeg reencode failed")
def format_timestamp(seconds: float) -> str:
seconds = max(0.0, float(seconds))
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60.0
return f"{hours:02d}:{minutes:02d}:{secs:06.3f}"
def split_range(
input_path: str,
start: float,
end: float,
output_path: Path,
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> Path:
output_path = output_path.with_suffix(".mkv")
output_path.parent.mkdir(parents=True, exist_ok=True)
split_arg = f"parts:{format_timestamp(start)}-{format_timestamp(end)}"
cmd = [MKVMERGE_BIN, "-o", str(output_path), "--split", split_arg, str(input_path)]
_run_mkvmerge(cmd, label, progress_cb, progress_start, progress_weight, log_cb)
if output_path.exists():
return output_path
candidates = sorted(output_path.parent.glob(f"{output_path.stem}-*.mkv"))
if not candidates:
raise RuntimeError("mkvmerge did not produce output files")
final_path = output_path
if final_path.exists():
final_path.unlink()
candidates[0].replace(final_path)
for extra in candidates[1:]:
extra.unlink()
return final_path
def encode_range(
input_path: str,
start: float,
end: float,
output_path: Path,
project_dir: Path,
pass1_template: str | None,
pass2_template: str,
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> Path:
output_path = output_path.with_suffix(".mkv")
output_path.parent.mkdir(parents=True, exist_ok=True)
duration = max(0.0, end - start)
if duration <= 0:
raise RuntimeError("Cannot encode an empty segment")
stats_dir = project_dir / "ffmpeg-stats"
stats_dir.mkdir(parents=True, exist_ok=True)
stats_path = stats_dir / output_path.stem
pass1_weight = progress_weight * 0.45 if pass1_template else 0.0
pass2_weight = progress_weight - pass1_weight
if pass1_template:
_run_ffmpeg_template(
pass1_template,
input_path,
output_path,
start,
duration,
stats_path,
f"{label} pass 1",
progress_cb,
progress_start,
pass1_weight,
log_cb,
)
_run_ffmpeg_template(
pass2_template,
input_path,
output_path,
start,
duration,
stats_path,
f"{label} pass 2 + audio",
progress_cb,
progress_start + pass1_weight,
pass2_weight,
log_cb,
)
if not output_path.exists():
raise RuntimeError("ffmpeg did not produce output file")
return output_path
def cut_range(
input_path: str,
start: float,
end: float,
output_path: Path,
project_dir: Path,
reencode: bool,
pass1_template: str | None,
pass2_template: str | None,
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> Path:
if not reencode:
return split_range(
input_path,
start,
end,
output_path,
label,
progress_cb,
progress_start,
progress_weight,
log_cb,
)
if not pass2_template:
raise RuntimeError("Reencode is enabled but ffmpeg pass 2 template is empty")
return encode_range(
input_path,
start,
end,
output_path,
project_dir,
pass1_template,
pass2_template,
label,
progress_cb,
progress_start,
progress_weight,
log_cb,
)
def append_segments(
intro_path: Path | None,
segment_path: Path,
outro_path: Path | None,
output_path: Path,
label: str,
progress_cb: ProgressCallback | None = None,
progress_start: float = 0.0,
progress_weight: float = 1.0,
log_cb: LogCallback | None = None,
) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
args: list[str] = []
if intro_path:
args.append(str(intro_path))
args.append("+")
args.append(str(segment_path))
if outro_path:
args.append("+")
args.append(str(outro_path))
cmd = [MKVMERGE_BIN, "-o", str(output_path)] + args
_run_mkvmerge(cmd, label, progress_cb, progress_start, progress_weight, log_cb)
def build_episodes(
video_path: str,
total_duration: float,
intro_seconds: float,
outro_seconds: float,
cut_points: list[float],
output_dir: Path,
temp_dir: Path,
project_dir: Path,
output_prefix: str = "episode",
reencode: bool = False,
ffmpeg_pass1_template: str | None = None,
ffmpeg_pass2_template: str | None = None,
progress_cb: ProgressCallback | None = None,
log_cb: LogCallback | None = None,
custom_segments: list[tuple[float, float]] | None = None,
) -> list[str]:
output_dir.mkdir(parents=True, exist_ok=True)
temp_dir.mkdir(parents=True, exist_ok=True)
min_segment = 0.001
outputs: list[str] = []
core_ranges: list[tuple[int, float, float]] = []
min_segment = 0.001
if custom_segments and len(custom_segments) > 0:
total_segments = len(custom_segments)
for index, (start, end) in enumerate(custom_segments, start=1):
core_ranges.append((index, start, end))
else:
core_end = max(intro_seconds, total_duration - outro_seconds)
boundaries = [p for p in sorted(cut_points) if intro_seconds < p < core_end]
boundaries.append(core_end)
prev = intro_seconds
safe_boundaries: list[float] = []
for end in boundaries:
if end - prev <= min_segment:
continue
safe_boundaries.append(end)
prev = end
if not safe_boundaries:
raise RuntimeError("No valid segments after filtering short ranges")
prev = intro_seconds
total_segments = len(safe_boundaries)
for index, end in enumerate(safe_boundaries, start=1):
core_ranges.append((index, prev, end))
prev = end
intro_work = intro_seconds if intro_seconds > min_segment else 0.0
outro_start = max(0.0, total_duration - outro_seconds)
outro_work = total_duration - outro_start if outro_seconds > min_segment else 0.0
core_work = sum(max(0.0, end - start) for _, start, end in core_ranges)
mux_work = max(1.0, total_segments * 2.0)
total_work = max(1.0, intro_work + outro_work + core_work + mux_work)
completed_work = 0.0
def stage_progress(
base: float,
weight: float,
message_prefix: str,
) -> ProgressCallback:
def callback(done: float, total: float, message: str | None, details: dict | None) -> None:
stage_fraction = done / total if total else 0.0
payload = dict(details or {})
payload.setdefault("stage", message_prefix)
payload["stage_progress"] = max(0.0, min(1.0, stage_fraction))
if progress_cb:
progress_cb(
base + weight * payload["stage_progress"],
total_work,
message,
payload,
)
return callback
intro_path = None
if intro_work > min_segment:
label = "Extracting intro"
intro_path = cut_range(
video_path,
0.0,
intro_seconds,
temp_dir / "intro.mkv",
project_dir,
reencode,
ffmpeg_pass1_template,
ffmpeg_pass2_template,
label,
stage_progress(completed_work, intro_work, label),
0.0,
1.0,
log_cb,
)
completed_work += intro_work
outro_path = None
if outro_work > min_segment:
label = "Extracting outro"
outro_path = cut_range(
video_path,
outro_start,
total_duration,
temp_dir / "outro.mkv",
project_dir,
reencode,
ffmpeg_pass1_template,
ffmpeg_pass2_template,
label,
stage_progress(completed_work, outro_work, label),
0.0,
1.0,
log_cb,
)
completed_work += outro_work
for index, start, end in core_ranges:
segment_duration = max(min_segment, end - start)
label = f"Segment {index}/{total_segments}"
segment_path = cut_range(
video_path,
start,
end,
temp_dir / f"segment_{index:02d}.mkv",
project_dir,
reencode,
ffmpeg_pass1_template,
ffmpeg_pass2_template,
label,
stage_progress(completed_work, segment_duration, label),
0.0,
1.0,
log_cb,
)
completed_work += segment_duration
episode_path = output_dir / f"{output_prefix}_{index:02d}.mkv"
mux_label = f"Muxing episode {index}/{total_segments}"
append_segments(
intro_path,
segment_path,
outro_path,
episode_path,
mux_label,
stage_progress(completed_work, 2.0, mux_label),
0.0,
1.0,
log_cb,
)
outputs.append(str(episode_path))
completed_work += 2.0
if progress_cb:
progress_cb(
completed_work,
total_work,
f"Exported segment {index}/{total_segments}",
{"stage": f"Segment {index}/{total_segments} complete", "stage_progress": 1.0},
)
return outputs