Compare commits

...

6 commits
0.0.1 ... main

Author SHA1 Message Date
207f69fa4b
make ffmpeg use bash for docker compat
Some checks failed
Publish Python distribution to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to PyPI (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Sign the Python distribution with Sigstore and upload them to GitHub Release (push) Has been cancelled
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to TestPyPI (push) Has been cancelled
2025-06-11 15:43:22 -04:00
b00bacc071
Add smooth transitions via xfade, hwaccel decoding 2025-05-29 14:34:51 -04:00
Micha Albert
88d1aa0446 Use AAC in output 2025-03-19 14:14:01 -04:00
Micha Albert
68041ce3cc fix help formatting and bump version 2025-03-10 14:57:36 -04:00
Micha Albert
e18a23cd1a fix dockerignore 2025-03-10 14:54:52 -04:00
Micha Albert
021101474e dockerize 2025-03-10 14:40:23 -04:00
4 changed files with 241 additions and 43 deletions

7
.dockerignore Normal file
View file

@ -0,0 +1,7 @@
.mypy_cache
.vscode
.github
.git
__pycache__
.venv
dist

16
Dockerfile Normal file
View file

@ -0,0 +1,16 @@
FROM python:3.13-slim
WORKDIR /usr/src/app
RUN apt-get update && apt-get install -y ffmpeg bash
RUN pip install --upgrade pip && \
pip install --upgrade build
RUN python -m venv .venv
COPY . .
RUN . ./.venv/bin/activate && pip install .
ENTRYPOINT [ "./.venv/bin/highlight-video-maker" ]

View file

@ -1,8 +1,8 @@
[project] [project]
name = "highlight_video_maker" name = "highlight_video_maker"
version = "0.0.1" version = "0.1.0"
authors = [{ name = "Micha Albert", email = "micha@2231.tech" }] authors = [{ name = "Micha Albert", email = "micha@2231.tech" }]
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them" description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions"
readme = "README.md" readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
classifiers = [ classifiers = [

View file

@ -1,11 +1,12 @@
import concurrent.futures import concurrent.futures
from logging import Logger, getLevelNamesMapping
import math import math
import random import random
import shutil
import subprocess import subprocess
from collections import Counter from collections import Counter
from logging import Logger, getLevelNamesMapping
from pathlib import Path from pathlib import Path
from typing import Dict, List from typing import Any, Dict, Generator, List
import click import click
@ -13,6 +14,22 @@ from .logger import get_logger
logger: Logger logger: Logger
XFADE_TRANSITIONS = [
"fade",
"slideleft",
"slidedown",
"smoothup",
"smoothleft",
"circleopen",
"diagtl",
"horzopen",
"fadegrays",
"pixelize",
"hrwind",
"diagbl",
"diagtr",
]
@click.group() @click.group()
@click.option( @click.option(
@ -20,7 +37,7 @@ logger: Logger
default="INFO", default="INFO",
type=str, type=str,
required=False, required=False,
metavar="Sets the logging verbosity. Choose between" help="Sets the logging verbosity. Choose between"
"DEBUG, INFO (default), WARNING, ERROR, or CRITICAL." "DEBUG, INFO (default), WARNING, ERROR, or CRITICAL."
"Can be uppercase or lowercase.", "Can be uppercase or lowercase.",
) )
@ -36,13 +53,53 @@ def cli(log_level: str):
IN_DIR: Path IN_DIR: Path
OUT_DIR: Path OUT_DIR: Path
CACHE_DIR = Path("/tmp/video-maker-cache") CACHE_DIR = Path("/tmp/video-maker-cache")
THREADS = 16 THREADS = 12
MIN_SEGMENT_LENGTH = 3.5 MIN_SEGMENT_LENGTH = 5
MAX_SEGMENT_LENGTH = 7.5 MAX_SEGMENT_LENGTH = 9
MAX_SEGMENT_PADDING = 6 MAX_SEGMENT_PADDING = 6
def run_in_bash(
cmd: str,
capture_output=False,
check=False,
text=False,
shell=False,
):
return subprocess.run(
f"/usr/bin/bash -c '{cmd}'",
capture_output=capture_output,
check=check,
text=text,
shell=shell,
)
def nonrepeating_generator(source, desired_length):
"""
Creates a generator that yields one item from `source`
that is not equal to the last item yielded, up to
`desired_length` times.
"""
if not source:
return
if len(source) == 1 and desired_length > 1:
raise ValueError("Cannot avoid repetition with only one unique string.")
prev = None
count = 0
while count < desired_length:
choices = [s for s in source if s != prev]
if not choices:
raise ValueError("No valid choices left to avoid repetition.")
current = random.choice(choices)
yield current
prev = current
count += 1
def seconds_to_timestamp(seconds: float): def seconds_to_timestamp(seconds: float):
"""Converts total seconds to a timestamp (HH:MM:SS.ms).""" """Converts total seconds to a timestamp (HH:MM:SS.ms)."""
hours = int(seconds // 3600) hours = int(seconds // 3600)
@ -57,7 +114,7 @@ def get_video_duration(file: Path):
logger.debug(f"Getting file length for {file}") logger.debug(f"Getting file length for {file}")
try: try:
return float( return float(
subprocess.run( run_in_bash(
f'ffprobe -v error -show_entries format=duration -of csv=p=0 "{file}"', f'ffprobe -v error -show_entries format=duration -of csv=p=0 "{file}"',
capture_output=True, capture_output=True,
check=True, check=True,
@ -78,11 +135,7 @@ def generate_segment_lengths(file_length: float) -> List[float]:
if remaining_length <= MAX_SEGMENT_PADDING: if remaining_length <= MAX_SEGMENT_PADDING:
segment_lengths.append(remaining_length) segment_lengths.append(remaining_length)
break break
segment_lengths.append( segment_lengths.append(random.uniform(MIN_SEGMENT_LENGTH, MAX_SEGMENT_LENGTH))
random.uniform(
MIN_SEGMENT_LENGTH, min(MAX_SEGMENT_LENGTH, remaining_length)
)
)
logger.debug(f"Generated segment lengths: {segment_lengths}") logger.debug(f"Generated segment lengths: {segment_lengths}")
return segment_lengths return segment_lengths
@ -95,7 +148,7 @@ def split_video_segment(
): ):
"""Splits a video into segments using ffmpeg.""" """Splits a video into segments using ffmpeg."""
logger.debug(f"Splitting {file_name} - segment {idx}") logger.debug(f"Splitting {file_name} - segment {idx}")
subprocess.run( run_in_bash(
f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} " f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} "
f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" ' f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" '
f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"', f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"',
@ -107,24 +160,104 @@ def split_video_segment(
def get_amplitude_of_segment(clip: Path): def get_amplitude_of_segment(clip: Path):
"""Extracts the mean audio amplitude of a video segment.""" """Extracts the mean audio amplitude of a video segment."""
logger.debug(f"Analyzing amplitude for clip: {clip}") logger.debug(f"Analyzing amplitude for clip: {clip}")
res = subprocess.run( res = run_in_bash(
f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -', f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
).stderr ).stderr
logger.debug(res)
return float(res.decode().split("mean_volume: ")[1].split(" dB")[0]) return float(res.decode().split("mean_volume: ")[1].split(" dB")[0])
def build_input_flags(video_files: List[str]) -> str:
return " ".join(f'-i "{video}"' for video in video_files)
def build_preprocess_filters(
video_files: List[str],
) -> tuple[list[str], List[str], List[str]]:
filters: List[str] = []
video_labels: List[str] = []
audio_labels: List[str] = []
for i in range(len(video_files)):
filters.append(
f"[{i}:v]format=yuv420p,scale=1280:720,setpts=PTS-STARTPTS,fps=30[v{i}];"
)
filters.append(f"[{i}:a]aresample=async=1[a{i}];")
video_labels.append(f"v{i}")
audio_labels.append(f"a{i}")
return filters, video_labels, audio_labels
def build_transition_filters_dynamic(
filter_gen: Generator[str, Any, None],
video_labels: List[str],
audio_labels: List[str],
durations: List[float],
fade_duration: float = 1.0,
) -> tuple[List[str], List[str], str, str]:
vf_filters: List[str] = []
af_filters: List[str] = []
offset = 0.0
for i in range(len(video_labels) - 1):
transition = next(filter_gen)
offset += durations[i] - fade_duration
out_v = f"vxf{i+1}"
out_a = f"acf{i+1}"
vf_filters.append(
f"[{video_labels[i]}][{video_labels[i+1]}]xfade="
f"transition={transition}:duration={fade_duration}:offset={offset:.2f}[{out_v}];"
)
video_labels[i + 1] = out_v
af_filters.append(
f"[{audio_labels[i]}][{audio_labels[i+1]}]acrossfade="
f"d={fade_duration}:c1=tri:c2=tri[{out_a}];"
)
audio_labels[i + 1] = out_a
return vf_filters, af_filters, video_labels[-1], audio_labels[-1]
def assemble_filter_complex(
pre_filters: List[str],
xfade_filters: List[str],
audio_fades: List[str],
) -> str:
return "\n".join(pre_filters + xfade_filters + audio_fades)
def run_ffmpeg_command(
input_flags: str, filter_complex: str, output_file: Path, final_audio_label: str
) -> None:
cmd: str = f"""
ffmpeg -y {input_flags} \
-filter_complex "{filter_complex}" \
-map "[vxf{filter_complex.split("vxf")[-1].split("];")[0]}]" \
-map "[{final_audio_label}]" \
-c:v libx264 -preset slow \
-c:a aac -b:a 128k "{output_file}"
"""
# the .split()[-1].split() lunacy gets the index of the final VXF
# filter so that FFmpeg knows where to map the video output.
# TODO: remove that mess and put the same logic in
# build_transition_filters_dynamic
run_in_bash(cmd, shell=True, check=True, capture_output=True)
@cli.command() @cli.command()
@click.option( @click.option(
"--input-dir", "--input-dir",
metavar="The input directory to get the source videos from.", help="The input directory to get the source videos from.",
type=click.Path(exists=True, resolve_path=True, path_type=Path), type=click.Path(exists=True, resolve_path=True, path_type=Path),
) )
@click.option( @click.option(
"--watermark-image", "--watermark-image",
metavar="The path of the watermark image " help="The path of the watermark image "
"to overlay over the final output. " "to overlay over the final output. "
"It must exist. " "It must exist. "
"It will not be scaled, so it should be " "It will not be scaled, so it should be "
@ -133,23 +266,41 @@ def get_amplitude_of_segment(clip: Path):
) )
@click.option( @click.option(
"--horiz-output-file", "--horiz-output-file",
metavar="The path to output the final video to. " help="The path to output the final video to. "
"It should not exist and must either be an absolute path " "It should not exist and must either be an absolute path "
'or start with "./".', 'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path), type=click.Path(exists=False, resolve_path=True, path_type=Path),
) )
@click.option( @click.option(
"--vert-output-file", "--vert-output-file",
metavar="The path to output the final video to. " help="The path to output the final video to. "
"It should not exist and must either be an absolute path " "It should not exist and must either be an absolute path "
'or start with "./".', 'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path), type=click.Path(exists=False, resolve_path=True, path_type=Path),
) )
@click.option(
"--decode-options",
help="Options to pass to FFmpeg for some decode operations."
"While optional, proper use of this option will significantly"
"reduce processing time. Note that inclusion of any encoding options"
"will cause this program to fail.",
type=str,
default="",
)
@click.option(
"--num-segs",
help="Total number of segments to concatenate in the output."
"Controls the length of the final video.",
type=int,
default=10,
)
def run( def run(
input_dir: Path, input_dir: Path,
watermark_image: Path, watermark_image: Path,
horiz_output_file: Path, horiz_output_file: Path,
vert_output_file: Path, vert_output_file: Path,
decode_options: str,
num_segs: int,
): ):
"""Main function that orchestrates the video processing pipeline.""" """Main function that orchestrates the video processing pipeline."""
logger.info("Starting video processing pipeline.") logger.info("Starting video processing pipeline.")
@ -205,10 +356,8 @@ def run(
representative_video_audio_levels[seg] = representative_video_audio_futures[ representative_video_audio_levels[seg] = representative_video_audio_futures[
seg seg
].result() ].result()
highest = dict(Counter(representative_video_audio_levels).most_common(num_segs))
highest = dict(Counter(representative_video_audio_levels).most_common(10))
loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()] loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()]
for video in raw_videos[2]: for video in raw_videos[2]:
out_folder = Path(CACHE_DIR, "loudest", Path(video).stem) out_folder = Path(CACHE_DIR, "loudest", Path(video).stem)
out_folder.mkdir(parents=True, exist_ok=True) out_folder.mkdir(parents=True, exist_ok=True)
@ -219,46 +368,72 @@ def run(
seg, seg,
out_folder.parent, out_folder.parent,
) )
video_files: List[str] = []
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f: with open(str(Path(CACHE_DIR, "list.txt")), "w") as f:
for seg in loudest_seg_indexes: for seg in loudest_seg_indexes:
random_seg = Path(random.choice(raw_videos[2])) random_seg = Path(random.choice(raw_videos[2]))
f.write( vid_path = Path(
f"file '{Path(CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix)}'\n" CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix
) )
f.write(f"file '{vid_path}'\n")
video_files.append(str(vid_path.resolve()))
filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs)
input_flags: str = f"{decode_options} {build_input_flags(video_files)}"
pre_filters, vlabels, alabels = build_preprocess_filters(video_files)
durations = [get_video_duration(Path(vf)) for vf in video_files]
vfades, afades, final_v, final_a = build_transition_filters_dynamic(
filter_gen, vlabels, alabels, durations, 0.5
)
full_filter: str = assemble_filter_complex(pre_filters, vfades, afades)
logger.info("Creating unmarked video...")
run_ffmpeg_command(
output_file=CACHE_DIR
/ "out-unmarked.mp4", # This file will have all the transitions without the overlayed logo
input_flags=input_flags,
filter_complex=full_filter,
final_audio_label=final_a,
)
logger.info("Creating horizontal video...") logger.info("Creating horizontal video...")
# Horizontal Pipeline: Concatenate clips and overlay a semitransparent watermark.
subprocess.run( # Horizontal Pipeline: Take unmarked file and add a semitransparent watermark.
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \ run_in_bash(
-filter_complex " f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; -filter_complex " \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p [1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
" -c:a copy "{horiz_output_file}"''', [0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{horiz_output_file}"''',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
) )
logger.info("Creating vertical video...") logger.info("Creating vertical video...")
# Vertical Pipeline: Concatenate, crop (zoom), split & blur for a vertical aspect ratio,
# Vertical Pipeline: Crop (zoom), split & blur unmarked file for a vertical aspect ratio,
# then overlay a centered, opaque watermark at the bottom. # then overlay a centered, opaque watermark at the bottom.
subprocess.run( run_in_bash(
f'''ffmpeg -y -f concat -safe 0 -i "{Path(CACHE_DIR, "list.txt")}" -i "{watermark_image}" \ f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " -filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; [0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; [zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; [copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; [blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p [vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a copy "{vert_output_file}"''', " -c:a aac -b:a 128k "{vert_output_file}"''',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
) )
logger.info("Video processing pipeline completed.") logger.info("Video processing pipeline completed.")
logger.info("Cleaning up temporary files...")
shutil.rmtree(CACHE_DIR)
if __name__ == "__main__": if __name__ == "__main__":