Files
hevc-encoder/server/worker.py
T
2026-06-16 22:25:33 +01:00

1238 lines
50 KiB
Python

#!/usr/bin/env python3
"""
HEVC Re-Encode Worker (SQLite version)
======================================
Architecture:
- SQLite database (media.db) tracks files and status.
- Phase 1: SCAN — Walk local/mounted directories and probe codecs.
- Phase 2: PROCESS — Encode/remux locally to HEVC MKV.
"""
import json
import os
import errno
import re
import shutil
import sqlite3
import subprocess
import threading
import time
import urllib.request
from collections import deque
from concurrent.futures import Future, ThreadPoolExecutor
from enum import Enum
from typing import Optional
DB_FILE = os.path.join(os.path.dirname(__file__), "media.db")
class WorkerState(Enum):
IDLE = "idle"
SCANNING = "scanning"
RUNNING = "running"
PAUSED = "paused"
DONE = "done"
class HEVCWorker:
def __init__(self):
self.state: WorkerState = WorkerState.IDLE
self.log_buffer: deque = deque(maxlen=500)
self._log_subscribers: list = []
self._log_lock = threading.Lock()
self.active_encodes: dict = {}
self._last_config: dict = {}
self._pause_event = threading.Event()
self._stop_event = threading.Event()
self._pause_event.set()
self._lock = threading.Lock()
self._worker_thread: Optional[threading.Thread] = None
self._ffprobe_cmd: Optional[str] = None
self._ffmpeg_cmd: Optional[str] = None
self._init_db()
def _init_db(self):
with sqlite3.connect(DB_FILE) as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS media (
filepath TEXT PRIMARY KEY,
basename TEXT,
original_size INTEGER,
codec TEXT,
container TEXT,
status TEXT,
saved_bytes INTEGER DEFAULT 0
)
"""
)
conn.execute("UPDATE media SET status='pending' WHERE status IN ('encoding', 'remuxing', 'scanning')")
conn.execute(
"UPDATE media SET status='error' WHERE status='pending' AND codec='unknown' AND container='unknown'"
)
def _db_execute(self, query: str, params: tuple = ()):
with sqlite3.connect(DB_FILE) as conn:
conn.execute("PRAGMA journal_mode=WAL")
return conn.execute(query, params)
def _db_fetchone(self, query: str, params: tuple = ()):
with sqlite3.connect(DB_FILE) as conn:
return conn.execute(query, params).fetchone()
def _db_fetchall(self, query: str, params: tuple = ()):
with sqlite3.connect(DB_FILE) as conn:
return conn.execute(query, params).fetchall()
def _log(self, level: str, message: str):
entry = {
"time": time.strftime("%Y-%m-%d %H:%M:%S"),
"level": level,
"message": message,
}
with self._log_lock:
self.log_buffer.append(entry)
for q in self._log_subscribers:
q.append(entry)
print(f"[{entry['time']}] [{level}] {message}", flush=True)
def subscribe_logs(self, q: deque):
with self._log_lock:
self._log_subscribers.append(q)
def unsubscribe_logs(self, q: deque):
with self._log_lock:
if q in self._log_subscribers:
self._log_subscribers.remove(q)
def _find_binary(self, base_name: str) -> Optional[str]:
candidates = [base_name]
if os.name == "nt":
candidates = [f"{base_name}.exe", base_name]
for candidate in candidates:
resolved = shutil.which(candidate)
if resolved:
return resolved
if os.name == "nt":
common_dirs = [
r"C:\ffmpeg\bin",
r"C:\Program Files\ffmpeg\bin",
r"C:\Program Files (x86)\ffmpeg\bin",
]
for folder in common_dirs:
for candidate in candidates:
full = os.path.join(folder, candidate)
if os.path.isfile(full):
return full
return None
def _safe_move(self, src: str, dst: str) -> None:
"""Move a file across filesystems safely without copying metadata.
Tries an atomic rename/replace first; on EXDEV (cross-device) falls back
to a copy using shutil.copyfile (not copy2) and removing the source.
"""
if os.path.normpath(src) == os.path.normpath(dst):
return
last_error = None
for attempt in range(1, 4):
try:
# Prefer atomic replace when possible
os.replace(src, dst)
return
except OSError as e:
last_error = e
# If the destination is already there, try removing it first.
if os.path.exists(dst):
try:
os.remove(dst)
except Exception:
pass
# If it's not cross-device, try a simple rename as a fallback.
if getattr(e, "errno", None) != errno.EXDEV:
try:
os.rename(src, dst)
return
except Exception as rename_error:
last_error = rename_error
# For transient permission/lock issues, retry a few times.
if attempt < 3:
time.sleep(0.25 * attempt)
# Fallback: copy file contents only (no metadata) then remove source.
try:
shutil.copyfile(src, dst)
os.remove(src)
except Exception:
if last_error is not None:
raise last_error
raise
def _encoder_available(self, encoder_name: str) -> bool:
"""Check whether ffmpeg supports the requested encoder (simple string match)."""
try:
ffmpeg_cmd = self._ffmpeg_cmd or "ffmpeg"
res = subprocess.run([ffmpeg_cmd, "-hide_banner", "-encoders"], capture_output=True, text=True, timeout=10)
if res.returncode != 0:
return False
return encoder_name in res.stdout
except Exception:
return False
def _nvenc_preset(self, preset: str) -> str:
"""Map generic preset names to NVENC preset names."""
preset = (preset or "medium").strip().lower()
mapping = {
"ultrafast": "p1",
"superfast": "p2",
"veryfast": "p3",
"faster": "p4",
"fast": "p5",
"medium": "p5",
"slow": "p6",
"slower": "p7",
"veryslow": "p7",
}
if preset.startswith("p") and preset[1:].isdigit():
return preset
return mapping.get(preset, "p5")
def _normalize_name_piece(self, text: str) -> str:
text = re.sub(r"[._]+", " ", text or "")
text = re.sub(r"\s+", " ", text)
return text.strip(" -._")
def _normalize_title_piece(self, text: str) -> str:
text = re.sub(r"\b(\d+(?:\.\d+){1,})\b", lambda match: match.group(1).replace(".", "-"), text or "")
text = re.sub(r"[._]+", " ", text or "")
text = re.sub(r"\s+", " ", text)
return text.strip(" -._")
def _canonical_audio_label(self, codec_name: str) -> str:
codec = (codec_name or "").strip().lower()
mapping = {
"aac": "AAC",
"ac3": "AC3",
"eac3": "EAC3",
"dts": "DTS",
"truehd": "TrueHD",
"mp3": "MP3",
"flac": "FLAC",
"opus": "Opus",
"vorbis": "Vorbis",
"pcm_s16le": "PCM",
"pcm_s24le": "PCM",
}
if codec in mapping:
return mapping[codec]
return codec.upper() if codec else ""
def _sanitize_release_tokens(self, text: str) -> list[str]:
raw = self._normalize_name_piece(text)
tokens: list[str] = []
patterns = [
(r"\b(2160p|1080p|720p|576p|480p|4k)\b", lambda m: m.group(1).lower()),
(r"\b(BluRay|Blu-Ray|WEB-DL|WEBRip|WEB Rip|BRRip|HDRip|HDTV|AMZN|NF|DSNP|ATVP|iTunes|REMUX|TELESYNC|CAM|DVD|DVDRip|UHD)\b", lambda m: m.group(1).replace(" ", "-")),
]
for pattern, formatter in patterns:
for match in re.finditer(pattern, raw, flags=re.IGNORECASE):
token = formatter(match)
if token and token not in tokens:
tokens.append(token)
return tokens
def _build_output_stem(
self,
source_path: str,
source_video_codec: str,
source_audio_codec: str,
output_audio_codec: str,
rename_output: bool,
) -> str:
base = os.path.splitext(os.path.basename(source_path))[0]
if not rename_output:
return base
raw = self._normalize_name_piece(base)
# TV shows: keep title + SxxEyy, and only add codec tags when the file is already final HEVC/AAC MKV.
tv_match = re.search(r"(?i)\bS(?P<season>\d{1,2})E(?P<episode>\d{1,2})\b", raw)
alt_tv_match = re.search(r"(?i)\b(?P<season>\d{1,2})x(?P<episode>\d{2})\b", raw)
if tv_match or alt_tv_match:
match = tv_match or alt_tv_match
season = int(match.group("season"))
episode = int(match.group("episode"))
title_part = re.sub(r"[\[\]\(\)]", " ", base[: match.start()])
title_part = self._normalize_title_piece(title_part)
out_parts = [title_part, f"S{season:02d}E{episode:02d}"]
source_video = (source_video_codec or "").strip().lower()
source_audio = (source_audio_codec or "").strip().lower()
if source_video == "hevc" and source_audio == "aac":
out_parts.append("HEVC")
out_parts.append("AAC")
return self._normalize_name_piece(" ".join([p for p in out_parts if p]))
# Movies: keep title + year, and only add codec tags when the file is already final HEVC/AAC MKV.
year_match = re.search(r"\b((?:19|20)\d{2})\b", raw)
title_part = raw
year_part = ""
if year_match:
year_part = year_match.group(1)
title_part = re.sub(r"[\[\]\(\)]", " ", base[: year_match.start()])
title_part = self._normalize_title_piece(title_part)
else:
# Strip bracket noise if there is no explicit year.
title_part = re.sub(r"\s*\[.*?\]\s*", " ", title_part)
title_part = re.sub(r"\s*\(.*?\)\s*", " ", title_part)
title_part = self._normalize_title_piece(title_part)
out_parts = [title_part]
if year_part:
out_parts.append(year_part)
source_video = (source_video_codec or "").strip().lower()
source_audio = (source_audio_codec or "").strip().lower()
if source_video == "hevc" and source_audio == "aac":
out_parts.append("HEVC")
out_parts.append("AAC")
return self._normalize_name_piece(" ".join([p for p in out_parts if p]))
def _build_output_path(
self,
source_path: str,
source_video_codec: str,
source_audio_codec: str,
output_audio_codec: str,
rename_output: bool,
) -> str:
directory = os.path.dirname(source_path)
stem = self._build_output_stem(source_path, source_video_codec, source_audio_codec, output_audio_codec, rename_output)
candidate = os.path.join(directory, f"{stem}.mkv")
if os.path.normpath(candidate) == os.path.normpath(source_path):
return candidate
counter = 2
while os.path.exists(candidate):
candidate = os.path.join(directory, f"{stem} ({counter}).mkv")
counter += 1
if counter > 99:
break
return candidate
def _source_has_audio(self, audio_codec: str) -> bool:
return (audio_codec or "").strip().lower() not in {"", "unknown", "none"}
def _resolve_audio_codec(self, requested_codec: str, source_audio_codec: str) -> str:
requested = (requested_codec or "auto").strip().lower()
source = (source_audio_codec or "").strip().lower()
if not self._source_has_audio(source):
return "an"
if requested == "auto":
return "aac" if source == "aac" else "copy"
if requested in {"copy", "aac", "ac3"}:
return requested
return "copy"
def _get_subtitle_codecs(self, filepath: str) -> list:
"""Return a list of subtitle codec names found in the file (lowercased)."""
try:
ffprobe_cmd = self._ffprobe_cmd or "ffprobe"
result = subprocess.run(
[
ffprobe_cmd,
"-v",
"quiet",
"-show_entries",
"stream=codec_type,codec_name",
"-of",
"json",
filepath,
],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
return []
data = json.loads(result.stdout or "{}")
codecs = []
for s in data.get("streams", []):
if s.get("codec_type") == "subtitle":
codecs.append((s.get("codec_name") or "").lower())
return codecs
except Exception:
return []
def _ensure_required_tools(self, config: dict = None, require_ffprobe: bool = False, require_ffmpeg: bool = False) -> bool:
config = config or {}
custom_ffprobe = config.get("custom_ffprobe", "").strip()
custom_ffmpeg = config.get("custom_ffmpeg", "").strip()
if require_ffprobe:
if custom_ffprobe:
self._ffprobe_cmd = custom_ffprobe
elif not self._ffprobe_cmd:
self._ffprobe_cmd = self._find_binary("ffprobe")
if not self._ffprobe_cmd or (custom_ffprobe and not os.path.isfile(self._ffprobe_cmd)):
ext = ".exe" if os.name == "nt" else ""
self._log(
"ERROR",
f"ffprobe not found or invalid path. Install FFmpeg and ensure ffprobe{ext} is available.",
)
return False
if require_ffmpeg:
if custom_ffmpeg:
self._ffmpeg_cmd = custom_ffmpeg
elif not self._ffmpeg_cmd:
self._ffmpeg_cmd = self._find_binary("ffmpeg")
if not self._ffmpeg_cmd or (custom_ffmpeg and not os.path.isfile(self._ffmpeg_cmd)):
ext = ".exe" if os.name == "nt" else ""
self._log(
"ERROR",
f"ffmpeg not found or invalid path. Install FFmpeg and ensure ffmpeg{ext} is available.",
)
return False
return True
def _parse_ffprobe_output(self, raw_json: str) -> tuple[str, str, str]:
data = json.loads(raw_json or "{}")
fmt = data.get("format", {}).get("format_name", "unknown").lower()
container = "mkv" if ("matroska" in fmt or "webm" in fmt) else (fmt.split(",")[0] if fmt else "unknown")
streams = data.get("streams", [])
v_codec = "unknown"
a_codec = "unknown"
for s in streams:
if s.get("codec_type") == "video" and v_codec == "unknown":
v_codec = s.get("codec_name", "unknown").lower()
elif s.get("codec_type") == "audio" and a_codec == "unknown":
a_codec = s.get("codec_name", "unknown").lower()
return v_codec, container, a_codec
def _get_video_info(self, filepath: str) -> tuple[str, str, str]:
try:
ffprobe_cmd = self._ffprobe_cmd or "ffprobe"
result = subprocess.run(
[
ffprobe_cmd,
"-v",
"quiet",
"-show_entries",
"stream=codec_type,codec_name:format=format_name",
"-of",
"json",
filepath,
],
capture_output=True,
text=True,
timeout=90,
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "ffprobe failed")
return self._parse_ffprobe_output(result.stdout)
except Exception as exc:
self._log("ERROR", f"ffprobe check failed for {os.path.basename(filepath)}: {exc}")
return "unknown", "unknown", "unknown"
def _get_duration(self, filepath: str) -> Optional[float]:
try:
ffprobe_cmd = self._ffprobe_cmd or "ffprobe"
result = subprocess.run(
[
ffprobe_cmd,
"-v",
"quiet",
"-show_entries",
"format=duration",
"-of",
"json",
filepath,
],
capture_output=True,
text=True,
timeout=60,
)
if result.returncode != 0:
return None
data = json.loads(result.stdout)
return float(data["format"]["duration"])
except Exception:
return None
def _recommended_workers(self) -> int:
cpu_count = max(1, os.cpu_count() or 4)
if cpu_count >= 12:
rec = cpu_count // 2
elif cpu_count >= 6:
rec = cpu_count - 2
else:
rec = max(1, cpu_count - 1)
return max(1, min(12, rec))
def _run_scan(self, config: dict):
media_dir = str(config.get("media_dir") or "").strip()
if not media_dir:
self.state = WorkerState.IDLE
self._log("ERROR", "No media directory configured. Set one in the dashboard first.")
return
min_size_bytes = int(config.get("min_size_bytes", 0))
excluded_folders = config.get("excluded_folders", [])
configured_audio_codec = config.get("audio_codec", "auto").lower()
rename_output = bool(config.get("rename_output", False))
self.state = WorkerState.SCANNING
self._stop_event.clear()
self._pause_event.set()
self._log("INFO", f"Scan starting: {media_dir}")
if not os.path.isdir(media_dir):
self.state = WorkerState.IDLE
self._log("ERROR", f"Media directory not found: {media_dir}")
return
if not self._ensure_required_tools(config=config, require_ffprobe=True):
self.state = WorkerState.IDLE
return
video_extensions = {".mkv", ".mp4", ".avi", ".ts", ".m4v", ".mov", ".wmv"}
all_files: list[tuple[str, int]] = []
for root, _dirs, files in os.walk(media_dir):
rel_root = os.path.relpath(root, media_dir)
rel_parts = [] if rel_root == "." else [p.lower() for p in rel_root.split(os.sep)]
if any(excl.strip().lower() in rel_parts for excl in excluded_folders if excl and excl.strip()):
continue
for fname in files:
name_lower = fname.lower()
if ".hevc_tmp." in name_lower or ".remux_tmp." in name_lower:
continue
if os.path.splitext(fname)[1].lower() in video_extensions:
full = os.path.join(root, fname)
try:
all_files.append((full, os.path.getsize(full)))
except OSError:
continue
all_files.sort(key=lambda x: x[0].lower())
self._log("INFO", f"Found {len(all_files)} potential media files. Checking database...")
# Sync database with files on disk (remove obsolete entries)
seen_files = set(f[0] for f in all_files)
try:
db_entries = self._db_fetchall("SELECT filepath FROM media")
for row in db_entries:
if not row:
continue
db_path = row[0]
if db_path not in seen_files:
self._db_execute("DELETE FROM media WHERE filepath = ?", (db_path,))
except Exception as e:
self._log("WARN", f"Could not sync database entries with disk: {e}")
new_count = 0
ignored_count = 0
probe_error_count = 0
for i, entry in enumerate(all_files, 1):
try:
filepath, file_size = entry
except Exception:
# Skip malformed entries
continue
if self._stop_event.is_set():
break
basename = os.path.basename(filepath)
row = self._db_fetchone("SELECT status, codec, container FROM media WHERE filepath = ?", (filepath,))
if row:
status, codec, container = row
# Re-probe if previous attempt was failed / unknown
if codec == "unknown" or container == "unknown":
self._log("INFO", f"[{i}/{len(all_files)}] Re-ffprobe: {basename}")
codec, container, a_codec = self._get_video_info(filepath)
if codec == "unknown" and container == "unknown":
probe_error_count += 1
self._db_execute(
"UPDATE media SET status = 'error', codec = 'unknown', container = 'unknown' WHERE filepath = ?",
(filepath,)
)
continue
else:
self._db_execute(
"UPDATE media SET codec = ?, container = ? WHERE filepath = ?",
(codec, container, filepath)
)
# Pull the audio codec for re-evaluation and rename decisions.
_, _, a_codec = self._get_video_info(filepath)
resolved_audio_codec = self._resolve_audio_codec(configured_audio_codec, a_codec)
if resolved_audio_codec == "an":
audio_ok = not self._source_has_audio(a_codec)
elif resolved_audio_codec == "copy":
audio_ok = self._source_has_audio(a_codec)
else:
audio_ok = (a_codec == resolved_audio_codec)
is_hevc_mkv = (codec == "hevc" and container == "mkv")
expected_stem = self._build_output_stem(filepath, codec, a_codec, resolved_audio_codec, rename_output)
current_stem = os.path.splitext(os.path.basename(filepath))[0]
needs_rename = rename_output and expected_stem.lower() != current_stem.lower()
if rename_output and needs_rename:
final_path = self._build_output_path(filepath, codec, a_codec, resolved_audio_codec, rename_output)
try:
self._safe_move(filepath, final_path)
self._db_execute(
"UPDATE media SET filepath = ?, basename = ? WHERE filepath = ?",
(final_path, os.path.basename(final_path), filepath),
)
self._log("SUCCESS", f"Renamed {basename} -> {os.path.basename(final_path)}")
filepath = final_path
basename = os.path.basename(final_path)
current_stem = os.path.splitext(basename)[0]
needs_rename = False
except Exception as e:
self._log("ERROR", f"Failed to rename {filepath} during scan: {e}")
self._db_execute("UPDATE media SET status = 'pending' WHERE filepath = ?", (filepath,))
# Re-evaluate status based on config for pending/ignored/done files.
if status in ("pending", "ignored", "done"):
if is_hevc_mkv and audio_ok and not needs_rename:
new_status = "done"
else:
new_status = "pending"
if status != new_status:
self._db_execute("UPDATE media SET status = ? WHERE filepath = ?", (new_status, filepath))
status = new_status
# Count based on final status
if status == "pending":
new_count += 1
elif status == "ignored":
ignored_count += 1
elif status == "error":
probe_error_count += 1
else:
self._log("INFO", f"[{i}/{len(all_files)}] ffprobe: {basename}")
codec, container, a_codec = self._get_video_info(filepath)
if codec == "unknown" and container == "unknown":
probe_error_count += 1
self._db_execute(
"INSERT INTO media (filepath, basename, original_size, codec, container, status) VALUES (?, ?, ?, ?, ?, ?)",
(filepath, basename, int(file_size), codec, container, "error"),
)
continue
is_hevc_mkv = (codec == "hevc" and container == "mkv")
resolved_audio_codec = self._resolve_audio_codec(configured_audio_codec, a_codec)
if resolved_audio_codec == "an":
audio_ok = not self._source_has_audio(a_codec)
elif resolved_audio_codec == "copy":
audio_ok = self._source_has_audio(a_codec)
else:
audio_ok = (a_codec == resolved_audio_codec)
expected_stem = self._build_output_stem(filepath, codec, a_codec, resolved_audio_codec, rename_output)
current_stem = os.path.splitext(os.path.basename(filepath))[0]
needs_rename = rename_output and expected_stem.lower() != current_stem.lower()
if rename_output and needs_rename:
final_path = self._build_output_path(filepath, codec, a_codec, resolved_audio_codec, rename_output)
try:
self._safe_move(filepath, final_path)
filepath = final_path
basename = os.path.basename(final_path)
current_stem = os.path.splitext(basename)[0]
needs_rename = False
self._log("SUCCESS", f"Renamed {entry[0].split(os.sep)[-1]} -> {basename}")
except Exception as e:
self._log("ERROR", f"Failed to rename {filepath} during scan: {e}")
needs_rename = False
if is_hevc_mkv and audio_ok and not needs_rename:
status = "done"
else:
status = "pending"
new_count += 1
self._db_execute(
"INSERT INTO media (filepath, basename, original_size, codec, container, status) VALUES (?, ?, ?, ?, ?, ?)",
(filepath, basename, int(file_size), codec, container, status),
)
self.state = WorkerState.IDLE if self._stop_event.is_set() else WorkerState.DONE
self._log(
"SUCCESS",
f"Scan finished. Added {new_count} files to process, ignored {ignored_count} small files, probe errors {probe_error_count}.",
)
def _run_process(self, config: dict):
if not self._ensure_required_tools(config=config, require_ffmpeg=True):
self.state = WorkerState.IDLE
return
requested_threads = config.get("threads")
max_workers = int(requested_threads) if requested_threads else self._recommended_workers()
max_workers = max(1, min(max_workers, max(1, os.cpu_count() or max_workers)))
preset = config.get("preset", "medium")
crf = int(config.get("crf", 22))
discord_webhook = str(config.get("discord_webhook") or "").strip()
temp_dir = str(config.get("temp_dir") or "").strip()
encoder = config.get("encoder", "libx265")
audio_codec = config.get("audio_codec", "aac")
rename_output = bool(config.get("rename_output", False))
force_reencode = bool(config.get("force_reencode", False))
if not temp_dir:
self.state = WorkerState.IDLE
self._log("ERROR", "No temporary directory configured. Set one in the dashboard first.")
return
self.state = WorkerState.RUNNING
self._stop_event.clear()
self._pause_event.set()
self._last_config = dict(config)
self._discord_webhook = discord_webhook # expose to worker threads
queue = self._db_fetchall("SELECT filepath, original_size, codec, container FROM media WHERE status = 'pending'")
if not queue:
self.state = WorkerState.IDLE
self._log("INFO", "Nothing to process — no pending files in database.")
return
self._log("INFO", f"Processing {len(queue)} pending files | threads={max_workers} | preset={preset} | crf={crf}")
min_size_bytes = int(config.get("min_size_bytes", 0))
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="encode") as executor:
futures: dict[Future, str] = {}
for row in queue:
if not row or len(row) < 4:
self._log("WARN", f"Skipping malformed DB row in queue: {row}")
continue
filepath, size, codec, container = row
if self._stop_event.is_set():
break
while not self._pause_event.is_set():
if self._stop_event.is_set():
break
time.sleep(0.5)
if self._stop_event.is_set():
break
future = executor.submit(
self._process_file,
filepath,
size,
codec,
container,
preset,
crf,
min_size_bytes,
temp_dir,
encoder,
audio_codec,
rename_output,
force_reencode,
)
futures[future] = filepath
for future in futures:
filepath = futures[future]
try:
success = future.result()
if not success and not self._stop_event.is_set():
self._db_execute("UPDATE media SET status = 'error' WHERE filepath = ?", (filepath,))
except Exception as exc:
self._log("ERROR", f"Thread raised exception for {os.path.basename(filepath)}: {exc}")
self._db_execute("UPDATE media SET status = 'error' WHERE filepath = ?", (filepath,))
if self._stop_event.is_set():
self.state = WorkerState.IDLE
self._log("INFO", "Worker stopped.")
self._db_execute("UPDATE media SET status = 'pending' WHERE status = 'encoding'")
else:
self.state = WorkerState.DONE
self._log("SUCCESS", "Processing queue finished!")
def _process_file(
self,
filepath: str,
original_size: int,
codec: str,
container: str,
preset: str,
crf: int,
min_size_bytes: int,
temp_dir: str,
encoder: str = "libx265",
audio_codec: str = "auto",
rename_output: bool = False,
force_reencode: bool = False,
) -> bool:
self._db_execute("UPDATE media SET status = 'encoding' WHERE filepath = ?", (filepath,))
basename = os.path.basename(filepath)
ffmpeg_cmd = self._ffmpeg_cmd or "ffmpeg"
# Determine temp directory and check if writeable
use_custom_temp = False
if temp_dir:
try:
os.makedirs(temp_dir, exist_ok=True)
test_file = os.path.join(temp_dir, f".write_test_{os.getpid()}")
with open(test_file, "w") as f:
f.write("test")
os.remove(test_file)
use_custom_temp = True
except Exception as e:
self._log("WARN", f"Custom temporary directory '{temp_dir}' is not writeable, using source directory. Error: {e}")
# Unique file ID to prevent collisions
import uuid
unique_id = uuid.uuid4().hex
_v_codec, _c_container, a_codec = self._get_video_info(filepath)
# Resolve audio mode based on the source stream and user selection.
resolved_audio_codec = self._resolve_audio_codec(audio_codec, a_codec)
if resolved_audio_codec == "an":
audio_ok = not self._source_has_audio(a_codec)
elif resolved_audio_codec == "copy":
audio_ok = self._source_has_audio(a_codec)
else:
audio_ok = (a_codec == resolved_audio_codec)
is_hevc_video = (codec == "hevc")
final_path = self._build_output_path(filepath, codec, a_codec, resolved_audio_codec, rename_output)
needs_rename_only = rename_output and os.path.normpath(final_path) != os.path.normpath(filepath) and is_hevc_video and container == "mkv" and audio_ok
needs_remux_only = container != "mkv" and is_hevc_video and audio_ok
if needs_rename_only:
self._log("INFO", f"Renaming file: {basename}")
try:
self._safe_move(filepath, final_path)
except Exception as e:
self._log("ERROR", f"Failed to rename {filepath} to {final_path}: {e}")
return False
self._db_execute(
"UPDATE media SET status = 'done', filepath = ?, basename = ? WHERE filepath = ?",
(final_path, os.path.basename(final_path), filepath),
)
self._log("SUCCESS", f"Renamed {basename} -> {os.path.basename(final_path)}")
return True
if needs_remux_only:
self._log("INFO", f"Remuxing file to MKV: {basename}")
if use_custom_temp:
temp_path = os.path.join(temp_dir, f"{unique_id}.mkv")
else:
temp_path = os.path.splitext(filepath)[0] + ".remux_tmp.mkv"
with self._lock:
self.active_encodes[basename] = {
"file": basename,
"fps": 0,
"speed": "N/A",
"bitrate": "N/A",
"progress": 0.0,
"eta": "remuxing...",
"original_size": original_size,
"original_size_human": _human_size(original_size),
"duration": 0,
}
ret = subprocess.run(
[ffmpeg_cmd, "-y", "-i", filepath, "-c", "copy", temp_path],
capture_output=True,
text=True,
)
with self._lock:
self.active_encodes.pop(basename, None)
if ret.returncode == 0 and os.path.exists(temp_path) and os.path.getsize(temp_path) > 0:
# Probe to confirm it's mkv
probe_codec, probe_container, probe_acodec = self._get_video_info(temp_path)
if probe_container == "mkv":
try:
if os.path.exists(filepath) and filepath != final_path:
os.remove(filepath)
except Exception as e:
self._log("ERROR", f"Failed to remove original file {filepath}: {e}")
try:
# Use safe move to handle cross-filesystem moves without metadata copy issues
self._safe_move(temp_path, final_path)
except Exception as e:
self._log("ERROR", f"CRITICAL: Failed to move temp file {temp_path} to final path {final_path}: {e}")
self._log("ERROR", f"Media file remains saved at: {temp_path}")
return False
new_size = os.path.getsize(final_path)
saved = original_size - new_size
self._db_execute(
"UPDATE media SET status = 'done', container = 'mkv', saved_bytes = ?, filepath = ?, basename = ? WHERE filepath = ?",
(saved, final_path, os.path.basename(final_path), filepath),
)
self._log("SUCCESS", f"Remuxed {basename} to MKV")
return True
self._log("ERROR", f"Remux failed for {basename}")
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
return False
# If already HEVC MKV with acceptable audio, skip unless force_reencode requested
if is_hevc_video and container == "mkv" and audio_ok and not force_reencode:
self._db_execute("UPDATE media SET status = 'done' WHERE filepath = ?", (filepath,))
return True
if use_custom_temp:
temp_path = os.path.join(temp_dir, f"{unique_id}.mkv")
else:
temp_path = os.path.splitext(filepath)[0] + ".hevc_tmp.mkv"
duration = self._get_duration(filepath)
with self._lock:
self.active_encodes[basename] = {
"file": basename,
"fps": 0.0,
"speed": "0x",
"bitrate": "0kbits/s",
"progress": 0.0,
"eta": "calculating...",
"original_size": original_size,
"original_size_human": _human_size(original_size),
"duration": duration or 0,
}
# If requested hardware encoder is not available, fall back to libx265
if encoder and encoder != "libx265":
if not self._encoder_available(encoder):
self._log("WARN", f"Requested encoder '{encoder}' not available on this system. Falling back to libx265.")
encoder = "libx265"
v_opts = ["-c:v", encoder]
if encoder == "hevc_nvenc":
nvenc_preset = self._nvenc_preset(preset)
v_opts.extend([
"-preset", nvenc_preset,
"-rc", "vbr_hq",
"-cq", str(crf),
"-b:v", "0",
"-rc-lookahead", "32",
"-multipass", "fullres",
])
elif encoder == "hevc_amf":
v_opts.extend(["-rc", "0", "-qv", str(crf), "-quality", preset])
elif encoder == "hevc_qsv":
v_opts.extend(["-global_quality", str(crf), "-preset", preset])
else:
v_opts.extend(["-crf", str(crf), "-preset", preset])
cmd = [
ffmpeg_cmd,
"-y",
"-i",
filepath,
] + v_opts + [
]
if resolved_audio_codec == "an":
cmd += ["-an"]
else:
cmd += ["-c:a", resolved_audio_codec]
# Detect problematic subtitle codecs (e.g., mov_text) and drop subtitles if present
subtitle_codecs = self._get_subtitle_codecs(filepath)
problematic_subs = {"mov_text"}
if any(sc in problematic_subs for sc in subtitle_codecs):
self._log("WARN", f"Dropping unsupported subtitle codecs ({', '.join(subtitle_codecs)}) for {basename}")
cmd += ["-sn"]
else:
cmd += ["-c:s", "copy"]
cmd += [
"-progress",
"pipe:1",
"-nostats",
"-loglevel",
"error",
temp_path,
]
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.DEVNULL, text=True, bufsize=1
)
progress_buf: dict = {}
for raw_line in process.stdout:
if self._stop_event.is_set():
process.kill()
break
line = raw_line.strip()
if "=" not in line:
continue
key, _, val = line.partition("=")
progress_buf[key.strip()] = val.strip()
if key.strip() == "progress":
fps = _safe_float(progress_buf.get("fps", "0"))
speed_str = progress_buf.get("speed", "0x") or "0x"
bitrate = progress_buf.get("bitrate", "0kbits/s") or "0kbits/s"
try:
out_us = int(progress_buf.get("out_time_us", "0"))
except (ValueError, TypeError):
out_us = 0
time_done = out_us / 1_000_000
pct = 0.0
eta = "calculating..."
if duration and duration > 0:
pct = min(100.0, (time_done / duration) * 100.0)
speed_val = _safe_float(speed_str.replace("x", ""))
if speed_val > 0:
remaining = (duration - time_done) / speed_val
eta = f"{int(remaining // 60)}m {int(remaining % 60)}s"
with self._lock:
if basename in self.active_encodes:
self.active_encodes[basename].update(
{
"fps": round(fps, 1),
"speed": speed_str,
"bitrate": bitrate,
"progress": round(pct, 1),
"eta": eta,
}
)
progress_buf = {}
process.wait()
with self._lock:
self.active_encodes.pop(basename, None)
if self._stop_event.is_set():
if os.path.exists(temp_path):
os.remove(temp_path)
return False
if process.returncode != 0 or not os.path.exists(temp_path):
self._log("ERROR", f"Encode failed (exit {process.returncode}): {basename}")
if os.path.exists(temp_path):
os.remove(temp_path)
return False
new_size = os.path.getsize(temp_path)
if new_size >= original_size:
self._log("WARN", f"New file is NOT smaller — keeping anyway: {basename}")
# Confirm it's hevc and mkv
probe_codec, probe_container, _probe_audio_codec = self._get_video_info(temp_path)
if probe_codec == "hevc" and probe_container == "mkv" and new_size > 0:
try:
if os.path.exists(filepath) and filepath != final_path:
os.remove(filepath)
except Exception as e:
self._log("ERROR", f"Failed to delete original file {filepath}: {e}")
try:
# Use safe move to handle cross-filesystem moves without metadata copy issues
self._safe_move(temp_path, final_path)
except Exception as e:
self._log("ERROR", f"CRITICAL: Failed to move temp file {temp_path} to final path {final_path}: {e}")
self._log("ERROR", f"Media file remains saved at: {temp_path}")
return False
saved_bytes = max(0, original_size - new_size)
if new_size >= original_size:
self._log("WARN", f"Encoded output is larger than source for {basename}; recording 0 B saved.")
self._log("SUCCESS", f"Encoded {basename} -> Saved {_human_size(saved_bytes)}")
self._db_execute(
"UPDATE media SET status = 'done', codec = 'hevc', container = 'mkv', saved_bytes = ?, filepath = ?, basename = ? WHERE filepath = ?",
(saved_bytes, final_path, os.path.basename(final_path), filepath),
)
# Fire Discord webhook if configured
discord_webhook = getattr(self, "_discord_webhook", "")
if discord_webhook:
_send_discord_webhook(discord_webhook, os.path.basename(final_path), saved_bytes)
return True
else:
self._log("ERROR", f"Verification failed for encoded file {basename}. Codec: {probe_codec}, Container: {probe_container}, Size: {new_size} B. Discarding.")
if os.path.exists(temp_path):
os.remove(temp_path)
return False
def start_scan(self, config: dict):
if self._worker_thread and self._worker_thread.is_alive():
return {"error": "Worker already running"}
self._worker_thread = threading.Thread(target=self._run_scan, args=(config,), daemon=True, name="hevc-scan")
self._worker_thread.start()
return {"ok": True}
def start_process(self, config: dict):
if self._worker_thread and self._worker_thread.is_alive():
return {"error": "Worker already running"}
self._worker_thread = threading.Thread(target=self._run_process, args=(config,), daemon=True, name="hevc-process")
self._worker_thread.start()
return {"ok": True}
def pause(self):
self._pause_event.clear()
self.state = WorkerState.PAUSED
self._log("INFO", "Pausing — current encodes finish before queue halts.")
def resume(self):
self._pause_event.set()
if self._worker_thread and self._worker_thread.is_alive():
self.state = WorkerState.RUNNING
self._log("INFO", "Resumed.")
def stop(self):
self._stop_event.set()
self._pause_event.set()
self._log("INFO", "Stop signal sent.")
def clear_database(self):
with sqlite3.connect(DB_FILE) as conn:
conn.execute("DELETE FROM media")
with self._lock:
self.active_encodes.clear()
self._log("INFO", "Database cleared.")
def get_status(self) -> dict:
with self._lock:
state_val = self.state.value
total = self._db_fetchone("SELECT COUNT(*) FROM media")[0]
pending = self._db_fetchone("SELECT COUNT(*) FROM media WHERE status='pending'")[0]
done = self._db_fetchone("SELECT COUNT(*) FROM media WHERE status='done'")[0]
ignored = self._db_fetchone("SELECT COUNT(*) FROM media WHERE status='ignored'")[0]
saved = self._db_fetchone("SELECT SUM(CASE WHEN saved_bytes > 0 THEN saved_bytes ELSE 0 END) FROM media")[0] or 0
# Additional stats for redesign
total_original_bytes = self._db_fetchone("SELECT SUM(original_size) FROM media")[0] or 0
done_original_bytes = self._db_fetchone("SELECT SUM(original_size) FROM media WHERE status='done'")[0] or 0
compression_ratio = 0.0
if done_original_bytes > 0:
compression_ratio = round((saved / done_original_bytes) * 100, 1)
recent = self._db_fetchall("SELECT basename, saved_bytes FROM media WHERE status='done' ORDER BY rowid DESC LIMIT 20")
recent_done = [{"file": r[0], "saved_human": _human_size(max(0, r[1] or 0))} for r in recent]
with self._lock:
active = list(self.active_encodes.values())
return {
"state": state_val,
"recommended_threads": self._recommended_workers(),
"total_files": total,
"queued_count": pending,
"done_count": done,
"skipped_count": ignored,
"total_saved_bytes": saved,
"total_saved_human": _human_size(saved),
"total_original_bytes": total_original_bytes,
"total_original_human": _human_size(total_original_bytes),
"done_original_bytes": done_original_bytes,
"done_original_human": _human_size(done_original_bytes),
"compression_ratio": compression_ratio,
"active_encodes": active,
"recent_done": recent_done,
}
def get_logs(self, since_index: int = 0) -> list:
with self._log_lock:
return list(self.log_buffer)[since_index:]
def _human_size(n: int) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def _safe_float(s, default=0.0) -> float:
try:
return float(s)
except (TypeError, ValueError):
return default
def _send_discord_webhook(webhook_url: str, filename: str, saved_bytes: int):
try:
import datetime
timestamp = datetime.datetime.utcnow().isoformat() + "Z"
payload = {
"embeds": [
{
"title": "🎥 HEVC Process Completed",
"color": 3066993, # Emerald green (#2ECC71)
"description": f"Successfully processed and compressed **{filename}**.",
"fields": [
{
"name": "File Name",
"value": f"`{filename}`",
"inline": False
},
{
"name": "Space Saved",
"value": f"**{_human_size(saved_bytes)}**",
"inline": True
}
],
"timestamp": timestamp
}
]
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
webhook_url,
data=data,
headers={
"Content-Type": "application/json",
"User-Agent": "HEVC-Dashboard-Worker"
},
method="POST"
)
with urllib.request.urlopen(req, timeout=10) as response:
pass
except Exception as e:
try:
worker._log("WARN", f"Failed to send Discord webhook: {e}")
except Exception:
print(f"[Discord Webhook Error] Failed to send webhook: {e}", flush=True)
worker = HEVCWorker()