Handling Missing Metadata and Substituting MAX_RESOLUTION in Python

I’m working on a Python script that processes video files, and I’m facing an issue with handling missing metadata and properly substituting the value of the MAX_RESOLUTION variable. Can you help me with this?

In my current code, I’m trying to get the height and width of the video from the video_metadata dictionary, and then use the min() function to determine the shorter side. If the shorter side is greater than MAX_RESOLUTION, I want to scale the video accordingly using the ffmpeg_cmd list.

Here’s the initial code:

    height = video_metadata["height"]
    width = video_metadata["width"]
    shorter_side = min(width, height)
    if shorter_side > MAX_RESOLUTION:
        ffmpeg_cmd.extend(
            [
                "-vf",
                f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}",
            ]
        )

This works fine when the video_metadata dictionary contains the "height" and "width" keys, but what if they’re not present? I want to have proper error handling in case the metadata isn’t found.

I’ve tried the following approach:

    try:
        height = video_metadata["height"]
        width = video_metadata["width"]
        shorter_side = min(width, height)
        if shorter_side > MAX_RESOLUTION:
            ffmpeg_cmd.extend(
                [
                    "-vf",
                    f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}".replace('MAX_RESOLUTION', str(MAX_RESOLUTION)),
                ]
            )
    except KeyError as e:
        raise Exception("Error: 'height' or 'width' key not found in video_metadata.")

Is this a better way to handle the missing metadata? And is there a more concise way to substitute the actual value of MAX_RESOLUTION in the ffmpeg_cmd list?

I’d appreciate any suggestions or insights you can provide to help me improve my code and handle these issues effectively.

import os
import sys
import subprocess
import re
import json
import pprint
import logging

from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Iterator


VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF_VALUE = 30
PRESET = "slow"
MAX_RESOLUTION = 540
MAX_FRAME_RATE = 24
MAX_VIDEO_BITRATE = 512000  # 500k = 500 * 1024
MAX_AUDIO_BITRATE = 65536  # 64k = 64 * 1024
ALLOWED_VIDEO_EXTENSIONS = (".mp4", ".mkv", ".avi", ".mov", ".m4v", ".wmv", ".vob")


logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("reduce_video_size.log", mode="w"),
        logging.StreamHandler(),
    ],
)
logger = logging.getLogger(__name__)


class VideoMetadataError(Exception):
    """Raised when there's an issue with video metadata."""

    pass


class ConversionError(Exception):
    """Raised when there's an issue during the conversion process."""

    pass


pixel_format_10bit_regex = re.compile("10le$")
pixel_format_12bit_regex = re.compile("12le$")


class PixelFormat:
    __slots__ = ("_pix_fmt", "_is_10bit", "_is_12bit")

    def __init__(self, pix_fmt):
        self._pix_fmt = pix_fmt
        self._is_10bit = pixel_format_10bit_regex.search(pix_fmt) is not None
        self._is_12bit = pixel_format_12bit_regex.search(pix_fmt) is not None

    @property
    def pixel_format(self):
        return self._pix_fmt

    @property
    def is_10bit(self):
        return self._is_10bit

    @property
    def is_12bit(self):
        return self._is_12bit

    @property
    def is_8bit(self):
        return not (self._is_10bit or self._is_12bit)

    def __str__(self):
        return self._pix_fmt


def main():
    logger.info("Starting main function")
    if len(sys.argv) < 2:
        print("Usage: python script.py <directory>")
        sys.exit(1)

    target_dir = sys.argv[1]
    logger.info(f"Target directory: {target_dir}")
    for file_path, video_metadata, audio_metadata in get_video_files_with_metadata(
        target_dir
    ):
        process_file(file_path, video_metadata, audio_metadata)
    logger.info("Main function completed")


def get_video_files_with_metadata(directory: str) -> Iterator[Tuple[str, Dict]]:
    """Yields tuples containing the filepath and FFmpeg metadata for video files in the given directory."""
    logger.info(f"get_video_files_with_metadata({directory})")
    for filepath in get_video_filepaths(directory):
        logger.info(f"Processing file: {filepath}")
        video_metadata, audio_metadata = get_ffmpeg_metadata(filepath)
        if is_valid_video(video_metadata):
            logger.info(f"Video file found: {filepath}")
            yield filepath, video_metadata, audio_metadata
        else:
            logger.info(f"Skipping file: {filepath}")


def get_video_filepaths(directory: str) -> Iterator[str]:
    """Yields filepaths of video files in the given directory."""
    logger.info(f"get_video_filepaths({directory})")
    for root, _, files in os.walk(directory):
        for filename in files:
            if filename.lower().endswith(ALLOWED_VIDEO_EXTENSIONS):
                res = os.path.join(root, filename)
                logger.info(f"Found video file: {res}")
                yield res


def get_ffmpeg_metadata(filepath: str) -> Tuple[Dict, Dict]:
    """Returns FFmpeg metadata for the given filepath as a tuple of (video_metadata, audio_metadata)."""
    logger.info(f"get_ffmpeg_metadata({filepath})")
    try:
        output = subprocess.check_output(
            [
                "ffprobe",
                "-v",
                "error",
                "-show_streams",
                "-print_format",
                "json",
                filepath,
            ]
        )
        metadata = json.loads(output.decode("utf-8"))
        video_metadata = next(
            (
                stream
                for stream in metadata["streams"]
                if stream["codec_type"] == "video"
            ),
            {},
        )
        audio_metadata = next(
            (
                stream
                for stream in metadata["streams"]
                if stream["codec_type"] == "audio"
            ),
            {},
        )
        logger.info(
            f"FFmpeg metadata:\nVideo:\n{pprint.pformat(video_metadata)}\nAudio:\n{pprint.pformat(audio_metadata)}"
        )
        return video_metadata, audio_metadata
    except Exception as e:
        logger.error(f"Error processing file {filepath}.\n{e}")
        return {}, {}


def is_valid_video(metadata: Dict) -> bool:
    """Checks if the metadata indicates a valid video with frames."""
    res = int(metadata.get("nb_frames", 0)) > 0 or "nb_frames" not in metadata
    logger.info(f"is_valid_video(metadata: Dict) -> {res}")
    return res


def process_file(file_path: str, video_metadata: str, audio_metadata: str) -> None:
    logger.info(f"process_file({file_path}, metadata: str")
    filename = os.path.basename(file_path)
    logger.info(f"Processing file: {filename}")

    try:
        if should_convert(file_path, video_metadata, audio_metadata):
            logger.info(f"Conversion needed for {filename}")
            output_path = get_unique_path(file_path, ".h265.mkv")
            ffmpeg_cmd = prepare_ffmpeg_command(
                file_path, output_path, video_metadata, audio_metadata
            )
            convert_file(file_path, output_path, ffmpeg_cmd)
        else:
            logger.info(f"No conversion needed for {filename}")
    except VideoMetadataError as e:
        logger.error(str(e))
    except ConversionError as e:
        logger.error(str(e))
    except Exception as e:
        logger.error(
            f"Unexpected error occurred while processing file {filename}: {str(e)}"
        )
        raise


def prepare_ffmpeg_command(
    file_path: str, output_path: str, video_metadata: Dict, audio_metadata: Dict
) -> List[str]:
    try:
        ffmpeg_cmd = ["ffmpeg", "-i", file_path]
        ffmpeg_cmd = push_encode_video_args_to_command(ffmpeg_cmd, video_metadata)
        ffmpeg_cmd = push_encode_audio_args_to_command(ffmpeg_cmd, audio_metadata)
        ffmpeg_cmd = push_change_frame_rate_args_to_command(ffmpeg_cmd, video_metadata)
        ffmpeg_cmd.append(output_path)
        logger.info(
            f"prepare_ffmpeg_command(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> {ffmpeg_cmd}"
        )
        return ffmpeg_cmd
    except Exception as e:
        raise VideoMetadataError(f"Error preparing FFmpeg command\n{str(e)}")


def push_encode_video_args_to_command(
    ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
    ffmpeg_cmd.append("-c:v")
    crf = str(CRF_VALUE)
    if get_bitdepth(video_metadata).is_10bit:
        ffmpeg_cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
    else:
        ffmpeg_cmd.extend(["libx265", "-crf", crf])

    ffmpeg_cmd.extend(
        [
            "-maxrate",
            bitrate_to_string(MAX_VIDEO_BITRATE),
            "-preset",
            PRESET,
        ]
    )

    if "bit_rate" in video_metadata:
        target_bitrate = min(int(video_metadata["bit_rate"]), MAX_VIDEO_BITRATE)
        ffmpeg_cmd.extend(["-b:v", bitrate_to_string(target_bitrate)])

    try:
        height = video_metadata["height"]
        width = video_metadata["width"]
        shorter_side = min(width, height)
        if shorter_side > MAX_RESOLUTION:
            ffmpeg_cmd.extend(
                [
                    "-vf",
                    f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}".replace(
                        "MAX_RESOLUTION", str(MAX_RESOLUTION)
                    ),
                ]
            )
    except KeyError as e:
        raise VideoMetadataError(
            "Error: 'height' or 'width' key not found in video_metadata."
        )

    logger.info(
        f"push_encode_video_args(ffmpeg_cmd: List[str], video_metadata: Dict) -> {ffmpeg_cmd}"
    )
    return ffmpeg_cmd


def push_encode_audio_args_to_command(
    ffmpeg_cmd: List[str], audio_metadata: Dict
) -> List[str]:
    ffmpeg_cmd.extend(["-c:a", AUDIO_CODEC])

    if "sample_rate" in audio_metadata:
        sample_rate = min(int(audio_metadata["sample_rate"]), MAX_AUDIO_BITRATE)
        ffmpeg_cmd.extend(["-b:a", bitrate_to_string(sample_rate)])

    logger.info(
        f"push_encode_audio_args_to_command(ffmpeg_cmd: List[str], audio_metadata: Dict) -> {ffmpeg_cmd}"
    )
    return ffmpeg_cmd


def push_change_frame_rate_args_to_command(
    ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
    if (
        "r_frame_rate" in video_metadata
        and calculate_fps(video_metadata["r_frame_rate"]) > MAX_FRAME_RATE
    ):
        ffmpeg_cmd.extend(["-r", str(MAX_FRAME_RATE)])
    return ffmpeg_cmd


def get_bitdepth(video_metadata: Dict) -> PixelFormat:
    res = PixelFormat(video_metadata["pix_fmt"])
    logger.info(f"Video.get_bitdepth() -> {res}")
    return res


def get_codec(metadata: Dict) -> str:
    codec = metadata["codec_name"]
    logger.info(f"get_codec() -> {codec}")
    return codec


def bitrate_to_string(bitrate: int) -> str:
    return f"{bitrate // 1024}k"


def calculate_fps(frame_rate: str) -> int:
    logger.info(f"calculate_fps(frame_rate: {frame_rate})")
    numerator, denominator = map(float, frame_rate.split("/"))
    fps = numerator // denominator if denominator != 0 else 0
    logger.info(f"Calculated FPS: {fps}")
    return fps


def should_convert(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> bool:
    try:
        video_width = video_metadata.get("width", 0)
        video_height = video_metadata.get("height", 0)
        fps = calculate_fps(video_metadata.get("r_frame_rate", "0/1"))
        video_codec = get_codec(video_metadata)
        video_bitrate = int(video_metadata.get("bit_rate", 0))
        audio_bitrate = int(audio_metadata.get("bit_rate", 0))
    except Exception as e:
        logger.error(f"Error getting metadata in should_convert\n{e}")

    return any(
        [
            video_codec not in ["hevc", "h265"],
            fps > MAX_FRAME_RATE,
            min(video_width, video_height) > MAX_RESOLUTION,
            video_bitrate > MAX_VIDEO_BITRATE,
            audio_bitrate > MAX_AUDIO_BITRATE,
        ]
    )


def convert_file(file_path: str, output_filename: str, ffmpeg_cmd: List[str]) -> None:
    try:
        logger.info(f"Executing FFmpeg command: {' '.join(ffmpeg_cmd)}")
        subprocess.run(ffmpeg_cmd, check=True)
        logger.info("Conversion completed successfully.")
    except Exception as e:
        raise ConversionError(f"An error occurred during conversion.\n{str(e)}")


def get_unique_path(file_path: str | Path, suffix: str) -> str:
    file_path = Path(file_path)
    base = remove_number_suffix(file_path.stem)
    return generate_unique_path(file_path.parent, base, suffix)


def remove_number_suffix(base: str) -> str:
    re_marker = re.compile(r"(?P<prefix>.*)$\d+$$")
    if mo := re_marker.match(base):
        return mo.group("prefix")
    return base


def generate_unique_path(parent: Path, base: str, suffix: str) -> str:
    count = 2
    while True:
        new_path = parent / f"{base}({count}){suffix}"
        if not new_path.exists():
            return str(new_path)
        count += 1


if __name__ == "__main__":
    main()

The video must have the width and height otherwise it cannot be decoded.

Is the metadata not directly extracted from the video stream and therefore always present?