import sqlite3 import threading import uuid from datetime import datetime from .settings import DB_PATH _DB_LOCK = threading.Lock() DEFAULT_FFMPEG_PASS1_TEMPLATE = ( 'ffmpeg -hwaccel auto -y -ss {start} -t {duration} -i {input} ' '-map_metadata -1 -map_chapters -1 -an -sn -b:v 2M ' '-x265-params no-slow-firstpass=1:pass=1:open-gop=0:keyint=60:min-keyint=60:' 'scenecut=0:vbv-maxrate=4000:vbv-bufsize=8000:stats={stats}:pools=+ ' '-tune animation -c:v libx265 -f null {null}' ) DEFAULT_FFMPEG_PASS2_TEMPLATE = ( 'ffmpeg -hwaccel auto -y -ss {start} -t {duration} -i {input} ' '-map_metadata -1 -map_chapters -1 -map 0:v:0 -map 0:a? ' '-disposition:v:0 default -b:v 2M ' '-x265-params pass=2:open-gop=0:keyint=60:min-keyint=60:scenecut=0:' 'vbv-maxrate=4000:vbv-bufsize=8000:stats={stats}:pools=+ ' '-tune animation -b:a 128k -ac:a 2 -c:v libx265 -c:a libopus -c:s subrip {output}' ) def _now() -> str: return datetime.utcnow().isoformat(timespec="seconds") + "Z" def _row_to_dict(row): if row is None: return None return {key: row[key] for key in row.keys()} def get_conn() -> sqlite3.Connection: DB_PATH.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db() -> None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.executescript( """ CREATE TABLE IF NOT EXISTS projects ( id TEXT PRIMARY KEY, name TEXT NOT NULL, segments_count INTEGER NOT NULL, intro_seconds REAL NOT NULL, outro_seconds REAL NOT NULL, reencode_enabled INTEGER NOT NULL DEFAULT 0, encoding_passes INTEGER NOT NULL DEFAULT 1, target_os TEXT NOT NULL DEFAULT 'windows', ffmpeg_pass1_template TEXT, ffmpeg_pass2_template TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS settings ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS videos ( id TEXT PRIMARY KEY, project_id TEXT NOT NULL, filename TEXT NOT NULL, file_path TEXT NOT NULL, duration_seconds REAL NOT NULL, is_exported INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS markers ( id TEXT PRIMARY KEY, video_id TEXT NOT NULL, position_seconds REAL NOT NULL, source TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS segment_edits ( id TEXT PRIMARY KEY, video_id TEXT NOT NULL, segment_key TEXT NOT NULL, start_seconds REAL NOT NULL, end_seconds REAL NOT NULL, color TEXT, modified_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_markers_video_id ON markers(video_id); CREATE INDEX IF NOT EXISTS idx_segment_edits_video_id ON segment_edits(video_id); """ ) cur.execute("PRAGMA table_info(projects)") columns = {row[1] for row in cur.fetchall()} if "reencode_enabled" not in columns: cur.execute("ALTER TABLE projects ADD COLUMN reencode_enabled INTEGER NOT NULL DEFAULT 0") if "encoding_passes" not in columns: cur.execute("ALTER TABLE projects ADD COLUMN encoding_passes INTEGER NOT NULL DEFAULT 1") if "target_os" not in columns: cur.execute("ALTER TABLE projects ADD COLUMN target_os TEXT NOT NULL DEFAULT 'windows'") if "ffmpeg_pass1_template" not in columns: cur.execute("ALTER TABLE projects ADD COLUMN ffmpeg_pass1_template TEXT") if "ffmpeg_pass2_template" not in columns: cur.execute("ALTER TABLE projects ADD COLUMN ffmpeg_pass2_template TEXT") cur.execute("PRAGMA table_info(segment_edits)") segment_edit_columns = {row[1] for row in cur.fetchall()} if "modified_at" not in segment_edit_columns: cur.execute("ALTER TABLE segment_edits ADD COLUMN modified_at TEXT") cur.execute("UPDATE segment_edits SET modified_at = ?", (_now(),)) if "color" not in segment_edit_columns: cur.execute("ALTER TABLE segment_edits ADD COLUMN color TEXT") cur.execute("PRAGMA table_info(videos)") video_columns = {row[1] for row in cur.fetchall()} if "is_exported" not in video_columns: cur.execute("ALTER TABLE videos ADD COLUMN is_exported INTEGER NOT NULL DEFAULT 0") cur.execute( "UPDATE projects SET ffmpeg_pass1_template = ? WHERE ffmpeg_pass1_template IS NULL", (DEFAULT_FFMPEG_PASS1_TEMPLATE,), ) cur.execute( "UPDATE projects SET ffmpeg_pass2_template = ? WHERE ffmpeg_pass2_template IS NULL", (DEFAULT_FFMPEG_PASS2_TEMPLATE,), ) conn.commit() conn.close() def mark_video_exported(video_id: str) -> None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("UPDATE videos SET is_exported = 1 WHERE id = ?", (video_id,)) conn.commit() def _normalize_project(project: dict | None) -> dict | None: if not project: return None project["reencode_enabled"] = bool(project.get("reencode_enabled", 0)) project["encoding_passes"] = int(project.get("encoding_passes", 1)) project["target_os"] = project.get("target_os", "windows") project["ffmpeg_pass1_template"] = project.get("ffmpeg_pass1_template") or DEFAULT_FFMPEG_PASS1_TEMPLATE project["ffmpeg_pass2_template"] = project.get("ffmpeg_pass2_template") or DEFAULT_FFMPEG_PASS2_TEMPLATE return project def create_project( name: str, segments_count: int, intro_seconds: float, outro_seconds: float, reencode_enabled: bool = False, encoding_passes: int = 1, target_os: str = "windows", ffmpeg_pass1_template: str | None = None, ffmpeg_pass2_template: str | None = None, ) -> dict: project_id = uuid.uuid4().hex now = _now() with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute( """ INSERT INTO projects ( id, name, segments_count, intro_seconds, outro_seconds, reencode_enabled, encoding_passes, target_os, ffmpeg_pass1_template, ffmpeg_pass2_template, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( project_id, name, segments_count, intro_seconds, outro_seconds, 1 if reencode_enabled else 0, encoding_passes, target_os, ffmpeg_pass1_template or DEFAULT_FFMPEG_PASS1_TEMPLATE, ffmpeg_pass2_template or DEFAULT_FFMPEG_PASS2_TEMPLATE, now, now, ), ) cur.execute( "INSERT OR REPLACE INTO settings (key, value) VALUES ('current_project_id', ?)", (project_id,), ) conn.commit() cur.execute("SELECT * FROM projects WHERE id = ?", (project_id,)) project = _normalize_project(_row_to_dict(cur.fetchone())) conn.close() return project def get_project(project_id: str) -> dict | None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("SELECT * FROM projects WHERE id = ?", (project_id,)) project = _normalize_project(_row_to_dict(cur.fetchone())) conn.close() return project def get_current_project() -> dict | None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("SELECT value FROM settings WHERE key = 'current_project_id'") row = cur.fetchone() if not row: conn.close() return None project_id = row[0] cur.execute("SELECT * FROM projects WHERE id = ?", (project_id,)) project = _normalize_project(_row_to_dict(cur.fetchone())) conn.close() return project def update_current_project( name: str | None = None, segments_count: int | None = None, intro_seconds: float | None = None, outro_seconds: float | None = None, reencode_enabled: bool | None = None, encoding_passes: int | None = None, target_os: str | None = None, ffmpeg_pass1_template: str | None = None, ffmpeg_pass2_template: str | None = None, ) -> dict | None: project = get_current_project() if not project: return None fields = [] params = [] if name is not None: fields.append("name = ?") params.append(name) if segments_count is not None: fields.append("segments_count = ?") params.append(segments_count) if intro_seconds is not None: fields.append("intro_seconds = ?") params.append(intro_seconds) if outro_seconds is not None: fields.append("outro_seconds = ?") params.append(outro_seconds) if reencode_enabled is not None: fields.append("reencode_enabled = ?") params.append(1 if reencode_enabled else 0) if encoding_passes is not None: fields.append("encoding_passes = ?") params.append(encoding_passes) if target_os is not None: fields.append("target_os = ?") params.append(target_os) if ffmpeg_pass1_template is not None: fields.append("ffmpeg_pass1_template = ?") params.append(ffmpeg_pass1_template or DEFAULT_FFMPEG_PASS1_TEMPLATE) if ffmpeg_pass2_template is not None: fields.append("ffmpeg_pass2_template = ?") params.append(ffmpeg_pass2_template or DEFAULT_FFMPEG_PASS2_TEMPLATE) if not fields: return project fields.append("updated_at = ?") params.append(_now()) params.append(project["id"]) with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute(f"UPDATE projects SET {', '.join(fields)} WHERE id = ?", params) conn.commit() cur.execute("SELECT * FROM projects WHERE id = ?", (project["id"],)) updated = _normalize_project(_row_to_dict(cur.fetchone())) conn.close() return updated def create_video(project_id: str, filename: str, file_path: str, duration_seconds: float) -> dict: video_id = uuid.uuid4().hex now = _now() with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute( """ INSERT INTO videos (id, project_id, filename, file_path, duration_seconds, is_exported, created_at) VALUES (?, ?, ?, ?, ?, 0, ?) """, (video_id, project_id, filename, file_path, duration_seconds, now), ) conn.commit() cur.execute("SELECT * FROM videos WHERE id = ?", (video_id,)) video = _row_to_dict(cur.fetchone()) conn.close() return video def get_video(video_id: str) -> dict | None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("SELECT * FROM videos WHERE id = ?", (video_id,)) video = _row_to_dict(cur.fetchone()) conn.close() return video def list_videos(project_id: str) -> list[dict]: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute( "SELECT * FROM videos WHERE project_id = ? ORDER BY created_at DESC", (project_id,), ) rows = cur.fetchall() conn.close() return [_row_to_dict(row) for row in rows] def list_markers(video_id: str) -> list[float]: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute( "SELECT position_seconds FROM markers WHERE video_id = ? ORDER BY position_seconds", (video_id,), ) rows = cur.fetchall() conn.close() return [row[0] for row in rows] def replace_markers(video_id: str, markers: list[float], source: str) -> list[float]: now = _now() markers = [float(m) for m in markers] with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("DELETE FROM markers WHERE video_id = ?", (video_id,)) for position in markers: cur.execute( """ INSERT INTO markers (id, video_id, position_seconds, source, created_at) VALUES (?, ?, ?, ?, ?) """, (uuid.uuid4().hex, video_id, position, source, now), ) conn.commit() conn.close() return sorted(markers) def list_segment_edits(video_id: str) -> list[dict]: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute( "SELECT * FROM segment_edits WHERE video_id = ? ORDER BY segment_key", (video_id,), ) rows = cur.fetchall() conn.close() return [_row_to_dict(row) for row in rows] def replace_segment_edits(video_id: str, segments: list[dict]) -> list[dict]: now = _now() with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("DELETE FROM segment_edits WHERE video_id = ?", (video_id,)) for segment in segments: cur.execute( """ INSERT INTO segment_edits ( id, video_id, segment_key, start_seconds, end_seconds, color, modified_at ) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( uuid.uuid4().hex, video_id, segment["segment_key"], float(segment["start_seconds"]), float(segment["end_seconds"]), segment.get("color"), now, ), ) conn.commit() cur.execute( """ SELECT segment_key, start_seconds, end_seconds, color, modified_at FROM segment_edits WHERE video_id = ? ORDER BY rowid """, (video_id,), ) rows = cur.fetchall() conn.close() return [_row_to_dict(row) for row in rows] def delete_video(video_id: str) -> dict | None: with _DB_LOCK: conn = get_conn() cur = conn.cursor() cur.execute("SELECT * FROM videos WHERE id = ?", (video_id,)) video = _row_to_dict(cur.fetchone()) if not video: conn.close() return None cur.execute("DELETE FROM markers WHERE video_id = ?", (video_id,)) cur.execute("DELETE FROM segment_edits WHERE video_id = ?", (video_id,)) cur.execute("DELETE FROM videos WHERE id = ?", (video_id,)) conn.commit() conn.close() return video