Compare commits

..

2 commits
0.0.3 ... main

Author SHA1 Message Date
207f69fa4b
make ffmpeg use bash for docker compat
Some checks failed
Publish Python distribution to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to PyPI (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Sign the Python distribution with Sigstore and upload them to GitHub Release (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to TestPyPI (push) Has been cancelled
2025-06-11 15:43:22 -04:00
b00bacc071
Add smooth transitions via xfade, hwaccel decoding 2025-05-29 14:34:51 -04:00
2 changed files with 213 additions and 38 deletions

View file

@ -1,8 +1,8 @@
[project]
name = "highlight_video_maker"
version = "0.0.2"
version = "0.1.0"
authors = [{ name = "Micha Albert", email = "micha@2231.tech" }]
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them"
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions"
readme = "README.md"
requires-python = ">=3.12"
classifiers = [

View file

@ -1,11 +1,12 @@
import concurrent.futures
from logging import Logger, getLevelNamesMapping
import math
import random
import shutil
import subprocess
from collections import Counter
from logging import Logger, getLevelNamesMapping
from pathlib import Path
from typing import Dict, List
from typing import Any, Dict, Generator, List
import click
@ -13,6 +14,22 @@ from .logger import get_logger
logger: Logger
XFADE_TRANSITIONS = [
"fade",
"slideleft",
"slidedown",
"smoothup",
"smoothleft",
"circleopen",
"diagtl",
"horzopen",
"fadegrays",
"pixelize",
"hrwind",
"diagbl",
"diagtr",
]
@click.group()
@click.option(
@ -36,13 +53,53 @@ def cli(log_level: str):
IN_DIR: Path
OUT_DIR: Path
CACHE_DIR = Path("/tmp/video-maker-cache")
THREADS = 16
THREADS = 12
MIN_SEGMENT_LENGTH = 3.5
MAX_SEGMENT_LENGTH = 7.5
MIN_SEGMENT_LENGTH = 5
MAX_SEGMENT_LENGTH = 9
MAX_SEGMENT_PADDING = 6
def run_in_bash(
cmd: str,
capture_output=False,
check=False,
text=False,
shell=False,
):
return subprocess.run(
f"/usr/bin/bash -c '{cmd}'",
capture_output=capture_output,
check=check,
text=text,
shell=shell,
)
def nonrepeating_generator(source, desired_length):
"""
Creates a generator that yields one item from `source`
that is not equal to the last item yielded, up to
`desired_length` times.
"""
if not source:
return
if len(source) == 1 and desired_length > 1:
raise ValueError("Cannot avoid repetition with only one unique string.")
prev = None
count = 0
while count < desired_length:
choices = [s for s in source if s != prev]
if not choices:
raise ValueError("No valid choices left to avoid repetition.")
current = random.choice(choices)
yield current
prev = current
count += 1
def seconds_to_timestamp(seconds: float):
"""Converts total seconds to a timestamp (HH:MM:SS.ms)."""
hours = int(seconds // 3600)
@ -57,7 +114,7 @@ def get_video_duration(file: Path):
logger.debug(f"Getting file length for {file}")
try:
return float(
subprocess.run(
run_in_bash(
f'ffprobe -v error -show_entries format=duration -of csv=p=0 "{file}"',
capture_output=True,
check=True,
@ -78,11 +135,7 @@ def generate_segment_lengths(file_length: float) -> List[float]:
if remaining_length <= MAX_SEGMENT_PADDING:
segment_lengths.append(remaining_length)
break
segment_lengths.append(
random.uniform(
MIN_SEGMENT_LENGTH, min(MAX_SEGMENT_LENGTH, remaining_length)
)
)
segment_lengths.append(random.uniform(MIN_SEGMENT_LENGTH, MAX_SEGMENT_LENGTH))
logger.debug(f"Generated segment lengths: {segment_lengths}")
return segment_lengths
@ -95,7 +148,7 @@ def split_video_segment(
):
"""Splits a video into segments using ffmpeg."""
logger.debug(f"Splitting {file_name} - segment {idx}")
subprocess.run(
run_in_bash(
f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} "
f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" '
f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"',
@ -107,15 +160,95 @@ def split_video_segment(
def get_amplitude_of_segment(clip: Path):
"""Extracts the mean audio amplitude of a video segment."""
logger.debug(f"Analyzing amplitude for clip: {clip}")
res = subprocess.run(
res = run_in_bash(
f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -',
shell=True,
check=True,
capture_output=True,
).stderr
logger.debug(res)
return float(res.decode().split("mean_volume: ")[1].split(" dB")[0])
def build_input_flags(video_files: List[str]) -> str:
return " ".join(f'-i "{video}"' for video in video_files)
def build_preprocess_filters(
video_files: List[str],
) -> tuple[list[str], List[str], List[str]]:
filters: List[str] = []
video_labels: List[str] = []
audio_labels: List[str] = []
for i in range(len(video_files)):
filters.append(
f"[{i}:v]format=yuv420p,scale=1280:720,setpts=PTS-STARTPTS,fps=30[v{i}];"
)
filters.append(f"[{i}:a]aresample=async=1[a{i}];")
video_labels.append(f"v{i}")
audio_labels.append(f"a{i}")
return filters, video_labels, audio_labels
def build_transition_filters_dynamic(
filter_gen: Generator[str, Any, None],
video_labels: List[str],
audio_labels: List[str],
durations: List[float],
fade_duration: float = 1.0,
) -> tuple[List[str], List[str], str, str]:
vf_filters: List[str] = []
af_filters: List[str] = []
offset = 0.0
for i in range(len(video_labels) - 1):
transition = next(filter_gen)
offset += durations[i] - fade_duration
out_v = f"vxf{i+1}"
out_a = f"acf{i+1}"
vf_filters.append(
f"[{video_labels[i]}][{video_labels[i+1]}]xfade="
f"transition={transition}:duration={fade_duration}:offset={offset:.2f}[{out_v}];"
)
video_labels[i + 1] = out_v
af_filters.append(
f"[{audio_labels[i]}][{audio_labels[i+1]}]acrossfade="
f"d={fade_duration}:c1=tri:c2=tri[{out_a}];"
)
audio_labels[i + 1] = out_a
return vf_filters, af_filters, video_labels[-1], audio_labels[-1]
def assemble_filter_complex(
pre_filters: List[str],
xfade_filters: List[str],
audio_fades: List[str],
) -> str:
return "\n".join(pre_filters + xfade_filters + audio_fades)
def run_ffmpeg_command(
input_flags: str, filter_complex: str, output_file: Path, final_audio_label: str
) -> None:
cmd: str = f"""
ffmpeg -y {input_flags} \
-filter_complex "{filter_complex}" \
-map "[vxf{filter_complex.split("vxf")[-1].split("];")[0]}]" \
-map "[{final_audio_label}]" \
-c:v libx264 -preset slow \
-c:a aac -b:a 128k "{output_file}"
"""
# the .split()[-1].split() lunacy gets the index of the final VXF
# filter so that FFmpeg knows where to map the video output.
# TODO: remove that mess and put the same logic in
# build_transition_filters_dynamic
run_in_bash(cmd, shell=True, check=True, capture_output=True)
@cli.command()
@click.option(
"--input-dir",
@ -145,11 +278,29 @@ def get_amplitude_of_segment(clip: Path):
'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path),
)
@click.option(
"--decode-options",
help="Options to pass to FFmpeg for some decode operations."
"While optional, proper use of this option will significantly"
"reduce processing time. Note that inclusion of any encoding options"
"will cause this program to fail.",
type=str,
default="",
)
@click.option(
"--num-segs",
help="Total number of segments to concatenate in the output."
"Controls the length of the final video.",
type=int,
default=10,
)
def run(
input_dir: Path,
watermark_image: Path,
horiz_output_file: Path,
vert_output_file: Path,
decode_options: str,
num_segs: int,
):
"""Main function that orchestrates the video processing pipeline."""
logger.info("Starting video processing pipeline.")
@ -205,10 +356,8 @@ def run(
representative_video_audio_levels[seg] = representative_video_audio_futures[
seg
].result()
highest = dict(Counter(representative_video_audio_levels).most_common(10))
highest = dict(Counter(representative_video_audio_levels).most_common(num_segs))
loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()]
for video in raw_videos[2]:
out_folder = Path(CACHE_DIR, "loudest", Path(video).stem)
out_folder.mkdir(parents=True, exist_ok=True)
@ -219,46 +368,72 @@ def run(
seg,
out_folder.parent,
)
video_files: List[str] = []
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f:
for seg in loudest_seg_indexes:
random_seg = Path(random.choice(raw_videos[2]))
f.write(
f"file '{Path(CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix)}'\n"
vid_path = Path(
CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix
)
f.write(f"file '{vid_path}'\n")
video_files.append(str(vid_path.resolve()))
filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs)
input_flags: str = f"{decode_options} {build_input_flags(video_files)}"
pre_filters, vlabels, alabels = build_preprocess_filters(video_files)
durations = [get_video_duration(Path(vf)) for vf in video_files]
vfades, afades, final_v, final_a = build_transition_filters_dynamic(
filter_gen, vlabels, alabels, durations, 0.5
)
full_filter: str = assemble_filter_complex(pre_filters, vfades, afades)
logger.info("Creating unmarked video...")
run_ffmpeg_command(
output_file=CACHE_DIR
/ "out-unmarked.mp4", # This file will have all the transitions without the overlayed logo
input_flags=input_flags,
filter_complex=full_filter,
final_audio_label=final_a,
)
logger.info("Creating horizontal video...")
# Horizontal Pipeline: Concatenate clips and overlay a semitransparent watermark.
subprocess.run(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \
-filter_complex "
[1]format=rgba,colorchannelmixer=aa=0.5[logo];
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p
" -c:a aac -b:a 128k "{horiz_output_file}"''',
# Horizontal Pipeline: Take unmarked file and add a semitransparent watermark.
run_in_bash(
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{horiz_output_file}"''',
shell=True,
check=True,
capture_output=True,
)
logger.info("Creating vertical video...")
# Vertical Pipeline: Concatenate, crop (zoom), split & blur for a vertical aspect ratio,
# Vertical Pipeline: Crop (zoom), split & blur unmarked file for a vertical aspect ratio,
# then overlay a centered, opaque watermark at the bottom.
subprocess.run(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \
-filter_complex "
[0]crop=3/4*in_w:in_h[zoomed];
[zoomed]split[original][copy];
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred];
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert];
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p
" -c:a aac -b:a 128k "{vert_output_file}"''',
run_in_bash(
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a aac -b:a 128k "{vert_output_file}"''',
shell=True,
check=True,
capture_output=True,
)
logger.info("Video processing pipeline completed.")
logger.info("Cleaning up temporary files...")
shutil.rmtree(CACHE_DIR)
if __name__ == "__main__":