Add smooth transitions via xfade, hwaccel decoding

This commit is contained in:
Micha R. Albert 2025-05-29 14:34:51 -04:00
parent 88d1aa0446
commit bb69d828d6
Signed by: mra
SSH key fingerprint: SHA256:2JB0fGfy7m2HQXAzvSXXKm7wPTj9Z60MOjFOQGM2Y/E
2 changed files with 192 additions and 33 deletions

View file

@ -1,8 +1,8 @@
[project]
name = "highlight_video_maker"
version = "0.0.2"
version = "0.1.0"
authors = [{ name = "Micha Albert", email = "micha@2231.tech" }]
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them"
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions"
readme = "README.md"
requires-python = ">=3.12"
classifiers = [

View file

@ -1,11 +1,12 @@
import concurrent.futures
from logging import Logger, getLevelNamesMapping
import math
import random
import shutil
import subprocess
from collections import Counter
from logging import Logger, getLevelNamesMapping
from pathlib import Path
from typing import Dict, List
from typing import Any, Dict, Generator, List
import click
@ -13,6 +14,22 @@ from .logger import get_logger
logger: Logger
XFADE_TRANSITIONS = [
"fade",
"slideleft",
"slidedown",
"smoothup",
"smoothleft",
"circleopen",
"diagtl",
"horzopen",
"fadegrays",
"pixelize",
"hrwind",
"diagbl",
"diagtr",
]
@click.group()
@click.option(
@ -36,13 +53,37 @@ def cli(log_level: str):
IN_DIR: Path
OUT_DIR: Path
CACHE_DIR = Path("/tmp/video-maker-cache")
THREADS = 16
THREADS = 12
MIN_SEGMENT_LENGTH = 3.5
MAX_SEGMENT_LENGTH = 7.5
MIN_SEGMENT_LENGTH = 5
MAX_SEGMENT_LENGTH = 9
MAX_SEGMENT_PADDING = 6
def nonrepeating_generator(source, desired_length):
"""
Creates a generator that yields one item from `source`
that is not equal to the last item yielded, up to
`desired_length` times.
"""
if not source:
return
if len(source) == 1 and desired_length > 1:
raise ValueError("Cannot avoid repetition with only one unique string.")
prev = None
count = 0
while count < desired_length:
choices = [s for s in source if s != prev]
if not choices:
raise ValueError("No valid choices left to avoid repetition.")
current = random.choice(choices)
yield current
prev = current
count += 1
def seconds_to_timestamp(seconds: float):
"""Converts total seconds to a timestamp (HH:MM:SS.ms)."""
hours = int(seconds // 3600)
@ -78,11 +119,7 @@ def generate_segment_lengths(file_length: float) -> List[float]:
if remaining_length <= MAX_SEGMENT_PADDING:
segment_lengths.append(remaining_length)
break
segment_lengths.append(
random.uniform(
MIN_SEGMENT_LENGTH, min(MAX_SEGMENT_LENGTH, remaining_length)
)
)
segment_lengths.append(random.uniform(MIN_SEGMENT_LENGTH, MAX_SEGMENT_LENGTH))
logger.debug(f"Generated segment lengths: {segment_lengths}")
return segment_lengths
@ -113,9 +150,89 @@ def get_amplitude_of_segment(clip: Path):
check=True,
capture_output=True,
).stderr
logger.debug(res)
return float(res.decode().split("mean_volume: ")[1].split(" dB")[0])
def build_input_flags(video_files: List[str]) -> str:
return " ".join(f'-i "{video}"' for video in video_files)
def build_preprocess_filters(
video_files: List[str],
) -> tuple[list[str], List[str], List[str]]:
filters: List[str] = []
video_labels: List[str] = []
audio_labels: List[str] = []
for i in range(len(video_files)):
filters.append(
f"[{i}:v]format=yuv420p,scale=1280:720,setpts=PTS-STARTPTS,fps=30[v{i}];"
)
filters.append(f"[{i}:a]aresample=async=1[a{i}];")
video_labels.append(f"v{i}")
audio_labels.append(f"a{i}")
return filters, video_labels, audio_labels
def build_transition_filters_dynamic(
filter_gen: Generator[str, Any, None],
video_labels: List[str],
audio_labels: List[str],
durations: List[float],
fade_duration: float = 1.0,
) -> tuple[List[str], List[str], str, str]:
vf_filters: List[str] = []
af_filters: List[str] = []
offset = 0.0
for i in range(len(video_labels) - 1):
transition = next(filter_gen)
offset += durations[i] - fade_duration
out_v = f"vxf{i+1}"
out_a = f"acf{i+1}"
vf_filters.append(
f"[{video_labels[i]}][{video_labels[i+1]}]xfade="
f"transition={transition}:duration={fade_duration}:offset={offset:.2f}[{out_v}];"
)
video_labels[i + 1] = out_v
af_filters.append(
f"[{audio_labels[i]}][{audio_labels[i+1]}]acrossfade="
f"d={fade_duration}:c1=tri:c2=tri[{out_a}];"
)
audio_labels[i + 1] = out_a
return vf_filters, af_filters, video_labels[-1], audio_labels[-1]
def assemble_filter_complex(
pre_filters: List[str],
xfade_filters: List[str],
audio_fades: List[str],
) -> str:
return "\n".join(pre_filters + xfade_filters + audio_fades)
def run_ffmpeg_command(
input_flags: str, filter_complex: str, output_file: Path, final_audio_label: str
) -> None:
cmd: str = f"""
ffmpeg -y {input_flags} \
-filter_complex "{filter_complex}" \
-map "[vxf{filter_complex.split("vxf")[-1].split("];")[0]}]" \
-map "[{final_audio_label}]" \
-c:v libx264 -preset slow \
-c:a aac -b:a 128k "{output_file}"
"""
# the .split()[-1].split() lunacy gets the index of the final VXF
# filter so that FFmpeg knows where to map the video output.
# TODO: remove that mess and put the same logic in
# build_transition_filters_dynamic
subprocess.run(cmd, shell=True, check=True, capture_output=True)
@cli.command()
@click.option(
"--input-dir",
@ -145,11 +262,29 @@ def get_amplitude_of_segment(clip: Path):
'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path),
)
@click.option(
"--decode-options",
help="Options to pass to FFmpeg for some decode operations."
"While optional, proper use of this option will significantly"
"reduce processing time. Note that inclusion of any encoding options"
"will cause this program to fail.",
type=str,
default="",
)
@click.option(
"--num-segs",
help="Total number of segments to concatenate in the output."
"Controls the length of the final video.",
type=int,
default=10,
)
def run(
input_dir: Path,
watermark_image: Path,
horiz_output_file: Path,
vert_output_file: Path,
decode_options: str,
num_segs: int,
):
"""Main function that orchestrates the video processing pipeline."""
logger.info("Starting video processing pipeline.")
@ -205,10 +340,8 @@ def run(
representative_video_audio_levels[seg] = representative_video_audio_futures[
seg
].result()
highest = dict(Counter(representative_video_audio_levels).most_common(10))
highest = dict(Counter(representative_video_audio_levels).most_common(num_segs))
loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()]
for video in raw_videos[2]:
out_folder = Path(CACHE_DIR, "loudest", Path(video).stem)
out_folder.mkdir(parents=True, exist_ok=True)
@ -219,22 +352,45 @@ def run(
seg,
out_folder.parent,
)
video_files: List[str] = []
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f:
for seg in loudest_seg_indexes:
random_seg = Path(random.choice(raw_videos[2]))
f.write(
f"file '{Path(CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix)}'\n"
vid_path = Path(
CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix
)
f.write(f"file '{vid_path}'\n")
video_files.append(str(vid_path.resolve()))
filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs)
input_flags: str = f"{decode_options} {build_input_flags(video_files)}"
pre_filters, vlabels, alabels = build_preprocess_filters(video_files)
durations = [get_video_duration(Path(vf)) for vf in video_files]
vfades, afades, final_v, final_a = build_transition_filters_dynamic(
filter_gen, vlabels, alabels, durations, 0.5
)
full_filter: str = assemble_filter_complex(pre_filters, vfades, afades)
logger.info("Creating unmarked video...")
run_ffmpeg_command(
output_file=CACHE_DIR
/ "out-unmarked.mp4", # This file will have all the transitions without the overlayed logo
input_flags=input_flags,
filter_complex=full_filter,
final_audio_label=final_a,
)
logger.info("Creating horizontal video...")
# Horizontal Pipeline: Concatenate clips and overlay a semitransparent watermark.
# Horizontal Pipeline: Take unmarked file and add a semitransparent watermark.
subprocess.run(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \
-filter_complex "
[1]format=rgba,colorchannelmixer=aa=0.5[logo];
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{horiz_output_file}"''',
shell=True,
check=True,
@ -242,16 +398,17 @@ def run(
)
logger.info("Creating vertical video...")
# Vertical Pipeline: Concatenate, crop (zoom), split & blur for a vertical aspect ratio,
# Vertical Pipeline: Crop (zoom), split & blur unmarked file for a vertical aspect ratio,
# then overlay a centered, opaque watermark at the bottom.
subprocess.run(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \
-filter_complex "
[0]crop=3/4*in_w:in_h[zoomed];
[zoomed]split[original][copy];
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred];
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert];
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a aac -b:a 128k "{vert_output_file}"''',
shell=True,
check=True,
@ -259,6 +416,8 @@ def run(
)
logger.info("Video processing pipeline completed.")
logger.info("Cleaning up temporary files...")
shutil.rmtree(CACHE_DIR)
if __name__ == "__main__":