Compare commits

..

3 commits
0.0.2 ... main

Author SHA1 Message Date
207f69fa4b
make ffmpeg use bash for docker compat
Some checks failed
Publish Python distribution to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to PyPI (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Sign the Python distribution with Sigstore and upload them to GitHub Release (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to TestPyPI (push) Has been cancelled
2025-06-11 15:43:22 -04:00
b00bacc071
Add smooth transitions via xfade, hwaccel decoding 2025-05-29 14:34:51 -04:00
Micha Albert
88d1aa0446 Use AAC in output 2025-03-19 14:14:01 -04:00
2 changed files with 213 additions and 38 deletions

View file

@ -1,8 +1,8 @@
[project] [project]
name = "highlight_video_maker" name = "highlight_video_maker"
version = "0.0.2" version = "0.1.0"
authors = [{ name = "Micha Albert", email = "micha@2231.tech" }] authors = [{ name = "Micha Albert", email = "micha@2231.tech" }]
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them" description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions"
readme = "README.md" readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
classifiers = [ classifiers = [

View file

@ -1,11 +1,12 @@
import concurrent.futures import concurrent.futures
from logging import Logger, getLevelNamesMapping
import math import math
import random import random
import shutil
import subprocess import subprocess
from collections import Counter from collections import Counter
from logging import Logger, getLevelNamesMapping
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Any, Dict, Generator, List
import click import click
@ -13,6 +14,22 @@ from .logger import get_logger
logger: Logger logger: Logger
XFADE_TRANSITIONS = [
"fade",
"slideleft",
"slidedown",
"smoothup",
"smoothleft",
"circleopen",
"diagtl",
"horzopen",
"fadegrays",
"pixelize",
"hrwind",
"diagbl",
"diagtr",
]
@click.group() @click.group()
@click.option( @click.option(
@ -36,13 +53,53 @@ def cli(log_level: str):
IN_DIR: Path IN_DIR: Path
OUT_DIR: Path OUT_DIR: Path
CACHE_DIR = Path("/tmp/video-maker-cache") CACHE_DIR = Path("/tmp/video-maker-cache")
THREADS = 16 THREADS = 12
MIN_SEGMENT_LENGTH = 3.5 MIN_SEGMENT_LENGTH = 5
MAX_SEGMENT_LENGTH = 7.5 MAX_SEGMENT_LENGTH = 9
MAX_SEGMENT_PADDING = 6 MAX_SEGMENT_PADDING = 6
def run_in_bash(
cmd: str,
capture_output=False,
check=False,
text=False,
shell=False,
):
return subprocess.run(
f"/usr/bin/bash -c '{cmd}'",
capture_output=capture_output,
check=check,
text=text,
shell=shell,
)
def nonrepeating_generator(source, desired_length):
"""
Creates a generator that yields one item from `source`
that is not equal to the last item yielded, up to
`desired_length` times.
"""
if not source:
return
if len(source) == 1 and desired_length > 1:
raise ValueError("Cannot avoid repetition with only one unique string.")
prev = None
count = 0
while count < desired_length:
choices = [s for s in source if s != prev]
if not choices:
raise ValueError("No valid choices left to avoid repetition.")
current = random.choice(choices)
yield current
prev = current
count += 1
def seconds_to_timestamp(seconds: float): def seconds_to_timestamp(seconds: float):
"""Converts total seconds to a timestamp (HH:MM:SS.ms).""" """Converts total seconds to a timestamp (HH:MM:SS.ms)."""
hours = int(seconds // 3600) hours = int(seconds // 3600)
@ -57,7 +114,7 @@ def get_video_duration(file: Path):
logger.debug(f"Getting file length for {file}") logger.debug(f"Getting file length for {file}")
try: try:
return float( return float(
subprocess.run( run_in_bash(
f'ffprobe -v error -show_entries format=duration -of csv=p=0 "{file}"', f'ffprobe -v error -show_entries format=duration -of csv=p=0 "{file}"',
capture_output=True, capture_output=True,
check=True, check=True,
@ -78,11 +135,7 @@ def generate_segment_lengths(file_length: float) -> List[float]:
if remaining_length <= MAX_SEGMENT_PADDING: if remaining_length <= MAX_SEGMENT_PADDING:
segment_lengths.append(remaining_length) segment_lengths.append(remaining_length)
break break
segment_lengths.append( segment_lengths.append(random.uniform(MIN_SEGMENT_LENGTH, MAX_SEGMENT_LENGTH))
random.uniform(
MIN_SEGMENT_LENGTH, min(MAX_SEGMENT_LENGTH, remaining_length)
)
)
logger.debug(f"Generated segment lengths: {segment_lengths}") logger.debug(f"Generated segment lengths: {segment_lengths}")
return segment_lengths return segment_lengths
@ -95,7 +148,7 @@ def split_video_segment(
): ):
"""Splits a video into segments using ffmpeg.""" """Splits a video into segments using ffmpeg."""
logger.debug(f"Splitting {file_name} - segment {idx}") logger.debug(f"Splitting {file_name} - segment {idx}")
subprocess.run( run_in_bash(
f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} " f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} "
f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" ' f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" '
f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"', f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"',
@ -107,15 +160,95 @@ def split_video_segment(
def get_amplitude_of_segment(clip: Path): def get_amplitude_of_segment(clip: Path):
"""Extracts the mean audio amplitude of a video segment.""" """Extracts the mean audio amplitude of a video segment."""
logger.debug(f"Analyzing amplitude for clip: {clip}") logger.debug(f"Analyzing amplitude for clip: {clip}")
res = subprocess.run( res = run_in_bash(
f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -', f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
).stderr ).stderr
logger.debug(res)
return float(res.decode().split("mean_volume: ")[1].split(" dB")[0]) return float(res.decode().split("mean_volume: ")[1].split(" dB")[0])
def build_input_flags(video_files: List[str]) -> str:
return " ".join(f'-i "{video}"' for video in video_files)
def build_preprocess_filters(
video_files: List[str],
) -> tuple[list[str], List[str], List[str]]:
filters: List[str] = []
video_labels: List[str] = []
audio_labels: List[str] = []
for i in range(len(video_files)):
filters.append(
f"[{i}:v]format=yuv420p,scale=1280:720,setpts=PTS-STARTPTS,fps=30[v{i}];"
)
filters.append(f"[{i}:a]aresample=async=1[a{i}];")
video_labels.append(f"v{i}")
audio_labels.append(f"a{i}")
return filters, video_labels, audio_labels
def build_transition_filters_dynamic(
filter_gen: Generator[str, Any, None],
video_labels: List[str],
audio_labels: List[str],
durations: List[float],
fade_duration: float = 1.0,
) -> tuple[List[str], List[str], str, str]:
vf_filters: List[str] = []
af_filters: List[str] = []
offset = 0.0
for i in range(len(video_labels) - 1):
transition = next(filter_gen)
offset += durations[i] - fade_duration
out_v = f"vxf{i+1}"
out_a = f"acf{i+1}"
vf_filters.append(
f"[{video_labels[i]}][{video_labels[i+1]}]xfade="
f"transition={transition}:duration={fade_duration}:offset={offset:.2f}[{out_v}];"
)
video_labels[i + 1] = out_v
af_filters.append(
f"[{audio_labels[i]}][{audio_labels[i+1]}]acrossfade="
f"d={fade_duration}:c1=tri:c2=tri[{out_a}];"
)
audio_labels[i + 1] = out_a
return vf_filters, af_filters, video_labels[-1], audio_labels[-1]
def assemble_filter_complex(
pre_filters: List[str],
xfade_filters: List[str],
audio_fades: List[str],
) -> str:
return "\n".join(pre_filters + xfade_filters + audio_fades)
def run_ffmpeg_command(
input_flags: str, filter_complex: str, output_file: Path, final_audio_label: str
) -> None:
cmd: str = f"""
ffmpeg -y {input_flags} \
-filter_complex "{filter_complex}" \
-map "[vxf{filter_complex.split("vxf")[-1].split("];")[0]}]" \
-map "[{final_audio_label}]" \
-c:v libx264 -preset slow \
-c:a aac -b:a 128k "{output_file}"
"""
# the .split()[-1].split() lunacy gets the index of the final VXF
# filter so that FFmpeg knows where to map the video output.
# TODO: remove that mess and put the same logic in
# build_transition_filters_dynamic
run_in_bash(cmd, shell=True, check=True, capture_output=True)
@cli.command() @cli.command()
@click.option( @click.option(
"--input-dir", "--input-dir",
@ -145,11 +278,29 @@ def get_amplitude_of_segment(clip: Path):
'or start with "./".', 'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path), type=click.Path(exists=False, resolve_path=True, path_type=Path),
) )
@click.option(
"--decode-options",
help="Options to pass to FFmpeg for some decode operations."
"While optional, proper use of this option will significantly"
"reduce processing time. Note that inclusion of any encoding options"
"will cause this program to fail.",
type=str,
default="",
)
@click.option(
"--num-segs",
help="Total number of segments to concatenate in the output."
"Controls the length of the final video.",
type=int,
default=10,
)
def run( def run(
input_dir: Path, input_dir: Path,
watermark_image: Path, watermark_image: Path,
horiz_output_file: Path, horiz_output_file: Path,
vert_output_file: Path, vert_output_file: Path,
decode_options: str,
num_segs: int,
): ):
"""Main function that orchestrates the video processing pipeline.""" """Main function that orchestrates the video processing pipeline."""
logger.info("Starting video processing pipeline.") logger.info("Starting video processing pipeline.")
@ -205,10 +356,8 @@ def run(
representative_video_audio_levels[seg] = representative_video_audio_futures[ representative_video_audio_levels[seg] = representative_video_audio_futures[
seg seg
].result() ].result()
highest = dict(Counter(representative_video_audio_levels).most_common(num_segs))
highest = dict(Counter(representative_video_audio_levels).most_common(10))
loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()] loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()]
for video in raw_videos[2]: for video in raw_videos[2]:
out_folder = Path(CACHE_DIR, "loudest", Path(video).stem) out_folder = Path(CACHE_DIR, "loudest", Path(video).stem)
out_folder.mkdir(parents=True, exist_ok=True) out_folder.mkdir(parents=True, exist_ok=True)
@ -219,46 +368,72 @@ def run(
seg, seg,
out_folder.parent, out_folder.parent,
) )
video_files: List[str] = []
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f: with open(str(Path(CACHE_DIR, "list.txt")), "w") as f:
for seg in loudest_seg_indexes: for seg in loudest_seg_indexes:
random_seg = Path(random.choice(raw_videos[2])) random_seg = Path(random.choice(raw_videos[2]))
f.write( vid_path = Path(
f"file '{Path(CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix)}'\n" CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix
)
f.write(f"file '{vid_path}'\n")
video_files.append(str(vid_path.resolve()))
filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs)
input_flags: str = f"{decode_options} {build_input_flags(video_files)}"
pre_filters, vlabels, alabels = build_preprocess_filters(video_files)
durations = [get_video_duration(Path(vf)) for vf in video_files]
vfades, afades, final_v, final_a = build_transition_filters_dynamic(
filter_gen, vlabels, alabels, durations, 0.5
) )
full_filter: str = assemble_filter_complex(pre_filters, vfades, afades)
logger.info("Creating unmarked video...")
run_ffmpeg_command(
output_file=CACHE_DIR
/ "out-unmarked.mp4", # This file will have all the transitions without the overlayed logo
input_flags=input_flags,
filter_complex=full_filter,
final_audio_label=final_a,
)
logger.info("Creating horizontal video...") logger.info("Creating horizontal video...")
# Horizontal Pipeline: Concatenate clips and overlay a semitransparent watermark.
subprocess.run( # Horizontal Pipeline: Take unmarked file and add a semitransparent watermark.
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \ run_in_bash(
-filter_complex " f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; -filter_complex " \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p [1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
" -c:a copy "{horiz_output_file}"''', [0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{horiz_output_file}"''',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
) )
logger.info("Creating vertical video...") logger.info("Creating vertical video...")
# Vertical Pipeline: Concatenate, crop (zoom), split & blur for a vertical aspect ratio,
# Vertical Pipeline: Crop (zoom), split & blur unmarked file for a vertical aspect ratio,
# then overlay a centered, opaque watermark at the bottom. # then overlay a centered, opaque watermark at the bottom.
subprocess.run( run_in_bash(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \ f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " -filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; [0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; [zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; [copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; [blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p [vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a copy "{vert_output_file}"''', " -c:a aac -b:a 128k "{vert_output_file}"''',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
) )
logger.info("Video processing pipeline completed.") logger.info("Video processing pipeline completed.")
logger.info("Cleaning up temporary files...")
shutil.rmtree(CACHE_DIR)
if __name__ == "__main__": if __name__ == "__main__":