I’m working on a Python script that processes video files, and I’m facing an issue with handling missing metadata and properly substituting the value of the MAX_RESOLUTION
variable. Can you help me with this?
In my current code, I’m trying to get the height and width of the video from the video_metadata
dictionary, and then use the min()
function to determine the shorter side. If the shorter side is greater than MAX_RESOLUTION
, I want to scale the video accordingly using the ffmpeg_cmd
list.
Here’s the initial code:
height = video_metadata["height"]
width = video_metadata["width"]
shorter_side = min(width, height)
if shorter_side > MAX_RESOLUTION:
ffmpeg_cmd.extend(
[
"-vf",
f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}",
]
)
This works fine when the video_metadata
dictionary contains the "height"
and "width"
keys, but what if they’re not present? I want to have proper error handling in case the metadata isn’t found.
I’ve tried the following approach:
try:
height = video_metadata["height"]
width = video_metadata["width"]
shorter_side = min(width, height)
if shorter_side > MAX_RESOLUTION:
ffmpeg_cmd.extend(
[
"-vf",
f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}".replace('MAX_RESOLUTION', str(MAX_RESOLUTION)),
]
)
except KeyError as e:
raise Exception("Error: 'height' or 'width' key not found in video_metadata.")
Is this a better way to handle the missing metadata? And is there a more concise way to substitute the actual value of MAX_RESOLUTION
in the ffmpeg_cmd
list?
I’d appreciate any suggestions or insights you can provide to help me improve my code and handle these issues effectively.
import os
import sys
import subprocess
import re
import json
import pprint
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional, Iterator
VIDEO_CODEC = "hevc"
AUDIO_CODEC = "aac"
CRF_VALUE = 30
PRESET = "slow"
MAX_RESOLUTION = 540
MAX_FRAME_RATE = 24
MAX_VIDEO_BITRATE = 512000 # 500k = 500 * 1024
MAX_AUDIO_BITRATE = 65536 # 64k = 64 * 1024
ALLOWED_VIDEO_EXTENSIONS = (".mp4", ".mkv", ".avi", ".mov", ".m4v", ".wmv", ".vob")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("reduce_video_size.log", mode="w"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
class VideoMetadataError(Exception):
"""Raised when there's an issue with video metadata."""
pass
class ConversionError(Exception):
"""Raised when there's an issue during the conversion process."""
pass
pixel_format_10bit_regex = re.compile("10le$")
pixel_format_12bit_regex = re.compile("12le$")
class PixelFormat:
__slots__ = ("_pix_fmt", "_is_10bit", "_is_12bit")
def __init__(self, pix_fmt):
self._pix_fmt = pix_fmt
self._is_10bit = pixel_format_10bit_regex.search(pix_fmt) is not None
self._is_12bit = pixel_format_12bit_regex.search(pix_fmt) is not None
@property
def pixel_format(self):
return self._pix_fmt
@property
def is_10bit(self):
return self._is_10bit
@property
def is_12bit(self):
return self._is_12bit
@property
def is_8bit(self):
return not (self._is_10bit or self._is_12bit)
def __str__(self):
return self._pix_fmt
def main():
logger.info("Starting main function")
if len(sys.argv) < 2:
print("Usage: python script.py <directory>")
sys.exit(1)
target_dir = sys.argv[1]
logger.info(f"Target directory: {target_dir}")
for file_path, video_metadata, audio_metadata in get_video_files_with_metadata(
target_dir
):
process_file(file_path, video_metadata, audio_metadata)
logger.info("Main function completed")
def get_video_files_with_metadata(directory: str) -> Iterator[Tuple[str, Dict]]:
"""Yields tuples containing the filepath and FFmpeg metadata for video files in the given directory."""
logger.info(f"get_video_files_with_metadata({directory})")
for filepath in get_video_filepaths(directory):
logger.info(f"Processing file: {filepath}")
video_metadata, audio_metadata = get_ffmpeg_metadata(filepath)
if is_valid_video(video_metadata):
logger.info(f"Video file found: {filepath}")
yield filepath, video_metadata, audio_metadata
else:
logger.info(f"Skipping file: {filepath}")
def get_video_filepaths(directory: str) -> Iterator[str]:
"""Yields filepaths of video files in the given directory."""
logger.info(f"get_video_filepaths({directory})")
for root, _, files in os.walk(directory):
for filename in files:
if filename.lower().endswith(ALLOWED_VIDEO_EXTENSIONS):
res = os.path.join(root, filename)
logger.info(f"Found video file: {res}")
yield res
def get_ffmpeg_metadata(filepath: str) -> Tuple[Dict, Dict]:
"""Returns FFmpeg metadata for the given filepath as a tuple of (video_metadata, audio_metadata)."""
logger.info(f"get_ffmpeg_metadata({filepath})")
try:
output = subprocess.check_output(
[
"ffprobe",
"-v",
"error",
"-show_streams",
"-print_format",
"json",
filepath,
]
)
metadata = json.loads(output.decode("utf-8"))
video_metadata = next(
(
stream
for stream in metadata["streams"]
if stream["codec_type"] == "video"
),
{},
)
audio_metadata = next(
(
stream
for stream in metadata["streams"]
if stream["codec_type"] == "audio"
),
{},
)
logger.info(
f"FFmpeg metadata:\nVideo:\n{pprint.pformat(video_metadata)}\nAudio:\n{pprint.pformat(audio_metadata)}"
)
return video_metadata, audio_metadata
except Exception as e:
logger.error(f"Error processing file {filepath}.\n{e}")
return {}, {}
def is_valid_video(metadata: Dict) -> bool:
"""Checks if the metadata indicates a valid video with frames."""
res = int(metadata.get("nb_frames", 0)) > 0 or "nb_frames" not in metadata
logger.info(f"is_valid_video(metadata: Dict) -> {res}")
return res
def process_file(file_path: str, video_metadata: str, audio_metadata: str) -> None:
logger.info(f"process_file({file_path}, metadata: str")
filename = os.path.basename(file_path)
logger.info(f"Processing file: {filename}")
try:
if should_convert(file_path, video_metadata, audio_metadata):
logger.info(f"Conversion needed for {filename}")
output_path = get_unique_path(file_path, ".h265.mkv")
ffmpeg_cmd = prepare_ffmpeg_command(
file_path, output_path, video_metadata, audio_metadata
)
convert_file(file_path, output_path, ffmpeg_cmd)
else:
logger.info(f"No conversion needed for {filename}")
except VideoMetadataError as e:
logger.error(str(e))
except ConversionError as e:
logger.error(str(e))
except Exception as e:
logger.error(
f"Unexpected error occurred while processing file {filename}: {str(e)}"
)
raise
def prepare_ffmpeg_command(
file_path: str, output_path: str, video_metadata: Dict, audio_metadata: Dict
) -> List[str]:
try:
ffmpeg_cmd = ["ffmpeg", "-i", file_path]
ffmpeg_cmd = push_encode_video_args_to_command(ffmpeg_cmd, video_metadata)
ffmpeg_cmd = push_encode_audio_args_to_command(ffmpeg_cmd, audio_metadata)
ffmpeg_cmd = push_change_frame_rate_args_to_command(ffmpeg_cmd, video_metadata)
ffmpeg_cmd.append(output_path)
logger.info(
f"prepare_ffmpeg_command(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
except Exception as e:
raise VideoMetadataError(f"Error preparing FFmpeg command\n{str(e)}")
def push_encode_video_args_to_command(
ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
ffmpeg_cmd.append("-c:v")
crf = str(CRF_VALUE)
if get_bitdepth(video_metadata).is_10bit:
ffmpeg_cmd.extend(["libx265", "-x265-params", f"crf={crf}:profile=main10"])
else:
ffmpeg_cmd.extend(["libx265", "-crf", crf])
ffmpeg_cmd.extend(
[
"-maxrate",
bitrate_to_string(MAX_VIDEO_BITRATE),
"-preset",
PRESET,
]
)
if "bit_rate" in video_metadata:
target_bitrate = min(int(video_metadata["bit_rate"]), MAX_VIDEO_BITRATE)
ffmpeg_cmd.extend(["-b:v", bitrate_to_string(target_bitrate)])
try:
height = video_metadata["height"]
width = video_metadata["width"]
shorter_side = min(width, height)
if shorter_side > MAX_RESOLUTION:
ffmpeg_cmd.extend(
[
"-vf",
f"scale={'MAX_RESOLUTION:-2' if width > height else '-2:MAX_RESOLUTION'}".replace(
"MAX_RESOLUTION", str(MAX_RESOLUTION)
),
]
)
except KeyError as e:
raise VideoMetadataError(
"Error: 'height' or 'width' key not found in video_metadata."
)
logger.info(
f"push_encode_video_args(ffmpeg_cmd: List[str], video_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
def push_encode_audio_args_to_command(
ffmpeg_cmd: List[str], audio_metadata: Dict
) -> List[str]:
ffmpeg_cmd.extend(["-c:a", AUDIO_CODEC])
if "sample_rate" in audio_metadata:
sample_rate = min(int(audio_metadata["sample_rate"]), MAX_AUDIO_BITRATE)
ffmpeg_cmd.extend(["-b:a", bitrate_to_string(sample_rate)])
logger.info(
f"push_encode_audio_args_to_command(ffmpeg_cmd: List[str], audio_metadata: Dict) -> {ffmpeg_cmd}"
)
return ffmpeg_cmd
def push_change_frame_rate_args_to_command(
ffmpeg_cmd: List[str], video_metadata: Dict
) -> List[str]:
if (
"r_frame_rate" in video_metadata
and calculate_fps(video_metadata["r_frame_rate"]) > MAX_FRAME_RATE
):
ffmpeg_cmd.extend(["-r", str(MAX_FRAME_RATE)])
return ffmpeg_cmd
def get_bitdepth(video_metadata: Dict) -> PixelFormat:
res = PixelFormat(video_metadata["pix_fmt"])
logger.info(f"Video.get_bitdepth() -> {res}")
return res
def get_codec(metadata: Dict) -> str:
codec = metadata["codec_name"]
logger.info(f"get_codec() -> {codec}")
return codec
def bitrate_to_string(bitrate: int) -> str:
return f"{bitrate // 1024}k"
def calculate_fps(frame_rate: str) -> int:
logger.info(f"calculate_fps(frame_rate: {frame_rate})")
numerator, denominator = map(float, frame_rate.split("/"))
fps = numerator // denominator if denominator != 0 else 0
logger.info(f"Calculated FPS: {fps}")
return fps
def should_convert(file_path: str, video_metadata: Dict, audio_metadata: Dict) -> bool:
try:
video_width = video_metadata.get("width", 0)
video_height = video_metadata.get("height", 0)
fps = calculate_fps(video_metadata.get("r_frame_rate", "0/1"))
video_codec = get_codec(video_metadata)
video_bitrate = int(video_metadata.get("bit_rate", 0))
audio_bitrate = int(audio_metadata.get("bit_rate", 0))
except Exception as e:
logger.error(f"Error getting metadata in should_convert\n{e}")
return any(
[
video_codec not in ["hevc", "h265"],
fps > MAX_FRAME_RATE,
min(video_width, video_height) > MAX_RESOLUTION,
video_bitrate > MAX_VIDEO_BITRATE,
audio_bitrate > MAX_AUDIO_BITRATE,
]
)
def convert_file(file_path: str, output_filename: str, ffmpeg_cmd: List[str]) -> None:
try:
logger.info(f"Executing FFmpeg command: {' '.join(ffmpeg_cmd)}")
subprocess.run(ffmpeg_cmd, check=True)
logger.info("Conversion completed successfully.")
except Exception as e:
raise ConversionError(f"An error occurred during conversion.\n{str(e)}")
def get_unique_path(file_path: str | Path, suffix: str) -> str:
file_path = Path(file_path)
base = remove_number_suffix(file_path.stem)
return generate_unique_path(file_path.parent, base, suffix)
def remove_number_suffix(base: str) -> str:
re_marker = re.compile(r"(?P<prefix>.*)$\d+$$")
if mo := re_marker.match(base):
return mo.group("prefix")
return base
def generate_unique_path(parent: Path, base: str, suffix: str) -> str:
count = 2
while True:
new_path = parent / f"{base}({count}){suffix}"
if not new_path.exists():
return str(new_path)
count += 1
if __name__ == "__main__":
main()