Compare commits

...

3 commits

Author SHA1 Message Date
Micha Albert
07f98fa269 bump version again (sorry)
Some checks are pending
Publish Python distribution to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to PyPI (push) Blocked by required conditions
Publish Python distribution to PyPI and TestPyPI / Sign the Python distribution with Sigstore and upload them to GitHub Release (push) Blocked by required conditions
Publish Python distribution to PyPI and TestPyPI / Publish Python distribution to TestPyPI (push) Blocked by required conditions
2026-01-16 15:33:02 -05:00
Micha Albert
a3578d77e4 bump version 2026-01-16 15:32:00 -05:00
Micha Albert
f7a14942ac feat: refactor video processing to analyze amplitude directly from streams, extract segments on demand, and add dedicated horizontal/vertical rendering functions. 2026-01-16 15:26:25 -05:00
4 changed files with 162 additions and 128 deletions

5
.gitignore vendored
View file

@ -174,3 +174,8 @@ cython_debug/
# PyPI configuration file # PyPI configuration file
.pypirc .pypirc
vid-ins/
video-maker-cache/
*.mp4
*.webm

View file

@ -1,6 +1,6 @@
[project] [project]
name = "highlight_video_maker" name = "highlight_video_maker"
version = "0.1.0" version = "0.2.3"
authors = [{ name = "Micha Albert", email = "micha@2231.tech" }] authors = [{ name = "Micha Albert", email = "micha@2231.tech" }]
description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions" description = "A utility to take several video inputs, take the loudest points, and create a compilation of them with smooth transitions"
readme = "README.md" readme = "README.md"

View file

@ -31,11 +31,17 @@ def get_logger(level: int = logging.INFO):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(level) logger.setLevel(level)
if not logger.hasHandlers():
# create console handler with a higher log level # create console handler with a higher log level
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setLevel(level) console_handler.setLevel(level)
console_handler.setFormatter(CustomFormatter()) console_handler.setFormatter(CustomFormatter())
logger.addHandler(console_handler) logger.addHandler(console_handler)
else:
# If handlers exist, just update the level if needed
logger.setLevel(level)
for handler in logger.handlers:
handler.setLevel(level)
return logger return logger

View file

@ -6,13 +6,13 @@ import subprocess
from collections import Counter from collections import Counter
from logging import Logger, getLevelNamesMapping from logging import Logger, getLevelNamesMapping
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Generator, List from typing import Any, Dict, Generator, List, Optional
import click import click
from .logger import get_logger from .logger import get_logger
logger: Logger logger: Logger = get_logger()
XFADE_TRANSITIONS = [ XFADE_TRANSITIONS = [
"fade", "fade",
@ -52,7 +52,7 @@ def cli(log_level: str):
IN_DIR: Path IN_DIR: Path
OUT_DIR: Path OUT_DIR: Path
CACHE_DIR = Path("/tmp/video-maker-cache") CACHE_DIR = Path("./video-maker-cache")
THREADS = 12 THREADS = 12
MIN_SEGMENT_LENGTH = 5 MIN_SEGMENT_LENGTH = 5
@ -68,7 +68,7 @@ def run_in_bash(
shell=False, shell=False,
): ):
return subprocess.run( return subprocess.run(
f"/usr/bin/bash -c '{cmd}'", f"""bash -c '{cmd}'""",
capture_output=capture_output, capture_output=capture_output,
check=check, check=check,
text=text, text=text,
@ -140,34 +140,21 @@ def generate_segment_lengths(file_length: float) -> List[float]:
return segment_lengths return segment_lengths
def split_video_segment( def get_amplitude_from_stream(file: Path, start: float, duration: float):
segment_lengths: List[float], """Extracts the mean audio amplitude of a video segment directly from stream."""
file_name: Path, logger.debug(f"Analyzing amplitude for {file} at {start} for {duration}s")
idx: int,
out_dir: Path = Path(CACHE_DIR),
):
"""Splits a video into segments using ffmpeg."""
logger.debug(f"Splitting {file_name} - segment {idx}")
run_in_bash(
f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(sum(segment_lengths[:idx]))} "
f'-to {seconds_to_timestamp(sum(segment_lengths[:idx]) + segment_lengths[idx])} -i "{file_name}" '
f'-c copy "{Path(out_dir, file_name.stem, str(idx) + file_name.suffix)}"',
check=True,
shell=True,
)
def get_amplitude_of_segment(clip: Path):
"""Extracts the mean audio amplitude of a video segment."""
logger.debug(f"Analyzing amplitude for clip: {clip}")
res = run_in_bash( res = run_in_bash(
f'ffmpeg -i "{Path(CACHE_DIR, clip)}" -filter:a volumedetect -f null -', f'ffmpeg -ss {start} -t {duration} -i "{file}" -filter:a volumedetect -f null -',
shell=True, shell=True,
check=True, check=True,
capture_output=True, capture_output=True,
).stderr ).stderr
logger.debug(res) logger.debug(res)
try:
return float(res.decode().split("mean_volume: ")[1].split(" dB")[0]) return float(res.decode().split("mean_volume: ")[1].split(" dB")[0])
except IndexError:
logger.warning(f"Could not determine volume for {file} segment. Defaulting to -91dB.")
return -91.0
def build_input_flags(video_files: List[str]) -> str: def build_input_flags(video_files: List[str]) -> str:
@ -242,12 +229,39 @@ def run_ffmpeg_command(
-c:v libx264 -preset slow \ -c:v libx264 -preset slow \
-c:a aac -b:a 128k "{output_file}" -c:a aac -b:a 128k "{output_file}"
""" """
# the .split()[-1].split() lunacy gets the index of the final VXF
# filter so that FFmpeg knows where to map the video output.
# TODO: remove that mess and put the same logic in
# build_transition_filters_dynamic
run_in_bash(cmd, shell=True, check=True, capture_output=True) run_in_bash(cmd, shell=True, check=True, capture_output=True)
def render_horizontal(decode_options: str, watermark_image: Path, output_file: Path):
logger.info("Creating horizontal video...")
run_in_bash(
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{output_file}"''',
shell=True,
check=True,
capture_output=True,
)
logger.info(f"Horizontal video created at {output_file}")
def render_vertical(decode_options: str, watermark_image: Path, output_file: Path):
logger.info("Creating vertical video...")
run_in_bash(
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a aac -b:a 128k "{output_file}"''',
shell=True,
check=True,
capture_output=True,
)
logger.info(f"Vertical video created at {output_file}")
@cli.command() @cli.command()
@click.option( @click.option(
@ -270,6 +284,7 @@ def run_ffmpeg_command(
"It should not exist and must either be an absolute path " "It should not exist and must either be an absolute path "
'or start with "./".', 'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path), type=click.Path(exists=False, resolve_path=True, path_type=Path),
required=False,
) )
@click.option( @click.option(
"--vert-output-file", "--vert-output-file",
@ -277,6 +292,7 @@ def run_ffmpeg_command(
"It should not exist and must either be an absolute path " "It should not exist and must either be an absolute path "
'or start with "./".', 'or start with "./".',
type=click.Path(exists=False, resolve_path=True, path_type=Path), type=click.Path(exists=False, resolve_path=True, path_type=Path),
required=False,
) )
@click.option( @click.option(
"--decode-options", "--decode-options",
@ -297,15 +313,22 @@ def run_ffmpeg_command(
def run( def run(
input_dir: Path, input_dir: Path,
watermark_image: Path, watermark_image: Path,
horiz_output_file: Path, horiz_output_file: Optional[Path],
vert_output_file: Path, vert_output_file: Optional[Path],
decode_options: str, decode_options: str,
num_segs: int, num_segs: int,
): ):
"""Main function that orchestrates the video processing pipeline.""" """Main function that orchestrates the video processing pipeline."""
logger.info("Starting video processing pipeline.") logger.info("Starting video processing pipeline.")
if not horiz_output_file and not vert_output_file:
logger.error("No output file specified! Provide --horiz-output-file or --vert-output-file (or both).")
# We could also use click constraints but checking here is fine for a quick refactor.
return
raw_videos = next(input_dir.walk()) raw_videos = next(input_dir.walk())
# Get the representative video (shortest video)
representative_video = min( representative_video = min(
(Path(raw_videos[0], p) for p in raw_videos[2]), key=get_video_duration (Path(raw_videos[0], p) for p in raw_videos[2]), key=get_video_duration
) )
@ -316,80 +339,93 @@ def run(
get_video_duration(representative_video) get_video_duration(representative_video)
) )
for vid in raw_videos[2]: # Pre-create cache dir
Path(CACHE_DIR, Path(vid).stem).resolve().mkdir(parents=True, exist_ok=True) Path(CACHE_DIR).mkdir(parents=True, exist_ok=True)
# Splitting videos into segments using multiprocessing # Analyze amplitudes of the representative video segments directly (without splitting)
with concurrent.futures.ProcessPoolExecutor(max_workers=THREADS) as split_executor: logger.info("Analyzing audio levels...")
try: segment_amplitudes: Dict[int, float] = {}
Path(CACHE_DIR, representative_video.stem).mkdir(
parents=True, exist_ok=True with concurrent.futures.ProcessPoolExecutor(max_workers=THREADS) as executor:
) futures = {}
except FileExistsError:
pass
for idx in range(len(representative_video_segments)): for idx in range(len(representative_video_segments)):
for vid in raw_videos[2]: start_time = sum(representative_video_segments[:idx])
split_executor.submit( duration = representative_video_segments[idx]
split_video_segment, futures[idx] = executor.submit(
representative_video_segments, get_amplitude_from_stream,
Path(raw_videos[0], vid).resolve(), representative_video,
idx, start_time,
duration
) )
# Computing amplitude for each segment for idx, future in futures.items():
representative_video_audio_futures: Dict[str, concurrent.futures.Future[float]] = {} segment_amplitudes[idx] = future.result()
with concurrent.futures.ProcessPoolExecutor( # Identify loudest segments
max_workers=THREADS highest = dict(Counter(segment_amplitudes).most_common(num_segs))
) as amplitude_executor: loudest_seg_indexes: List[int] = sorted(list(highest.keys()))
for split_vid in next(Path(CACHE_DIR, Path(representative_video).stem).walk())[
2
]:
representative_video_audio_futures[split_vid] = amplitude_executor.submit(
get_amplitude_of_segment,
Path(CACHE_DIR, Path(representative_video).stem, split_vid).resolve(),
)
representative_video_audio_levels: Dict[str, float] = {} logger.info("Extracting selected segments...")
# Collecting results
for seg in representative_video_audio_futures.keys():
representative_video_audio_levels[seg] = representative_video_audio_futures[
seg
].result()
highest = dict(Counter(representative_video_audio_levels).most_common(num_segs))
loudest_seg_indexes: List[int] = [int(str(Path(k).stem)) for k in highest.keys()]
for video in raw_videos[2]:
out_folder = Path(CACHE_DIR, "loudest", Path(video).stem)
out_folder.mkdir(parents=True, exist_ok=True)
for seg in loudest_seg_indexes:
split_video_segment(
representative_video_segments,
Path(raw_videos[0], video),
seg,
out_folder.parent,
)
video_files: List[str] = [] video_files: List[str] = []
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f:
for seg in loudest_seg_indexes: # We need a directory for the chosen segments
random_seg = Path(random.choice(raw_videos[2])) out_loudest_dir = Path(CACHE_DIR, "loudest")
vid_path = Path( out_loudest_dir.mkdir(parents=True, exist_ok=True)
CACHE_DIR, "loudest", random_seg.stem, str(seg) + random_seg.suffix
with open(str(Path(CACHE_DIR, "list.txt")), "w") as f, \
concurrent.futures.ProcessPoolExecutor(max_workers=THREADS) as split_executor:
future_to_vid_path = {}
for seg_idx in loudest_seg_indexes:
# Pick a random video for this segment index
random_vid_name = random.choice(raw_videos[2])
random_vid_path = Path(raw_videos[0], random_vid_name)
# Define output path for this specific clip
final_clip_path = out_loudest_dir / f"seg_{seg_idx}_{random_vid_path.stem}{random_vid_path.suffix}"
start_time = sum(representative_video_segments[:seg_idx])
duration = representative_video_segments[seg_idx]
future = split_executor.submit(
run_in_bash,
f"ffmpeg -nostats -loglevel 0 -y -ss {seconds_to_timestamp(start_time)} "
f'-t {duration} -i "{random_vid_path}" '
f'-c copy "{final_clip_path}"',
check=True,
shell=True
) )
f.write(f"file '{vid_path}'\n") future_to_vid_path[future] = final_clip_path
video_files.append(str(vid_path.resolve()))
f.write(f"file '{final_clip_path}'\n")
video_files.append(str(final_clip_path.resolve()))
# Wait for all splits to finish
for fut in concurrent.futures.as_completed(future_to_vid_path):
try:
fut.result()
except Exception as e:
logger.exception(f"Failed to extract segment: {e}")
filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs) filter_gen = nonrepeating_generator(XFADE_TRANSITIONS, num_segs)
input_flags: str = f"{decode_options} {build_input_flags(video_files)}" input_flags: str = f"{decode_options} {build_input_flags(video_files)}"
try:
pre_filters, vlabels, alabels = build_preprocess_filters(video_files) pre_filters, vlabels, alabels = build_preprocess_filters(video_files)
except IndexError:
logger.error("No video files generated? Aborting.")
return
durations = [get_video_duration(Path(vf)) for vf in video_files] durations = [get_video_duration(Path(vf)) for vf in video_files]
vfades, afades, final_v, final_a = build_transition_filters_dynamic( vfades, afades, final_v, final_a = build_transition_filters_dynamic(
filter_gen, vlabels, alabels, durations, 0.5 filter_gen, vlabels, alabels, durations, 0.5
) )
full_filter: str = assemble_filter_complex(pre_filters, vfades, afades) full_filter: str = assemble_filter_complex(pre_filters, vfades, afades)
logger.info("Creating unmarked video...") logger.info("Creating unmarked video (master)...")
run_ffmpeg_command( run_ffmpeg_command(
output_file=CACHE_DIR output_file=CACHE_DIR
@ -399,38 +435,25 @@ def run(
final_audio_label=final_a, final_audio_label=final_a,
) )
logger.info("Creating horizontal video...") # Parallelize Final Output
render_futures = []
# Horizontal Pipeline: Take unmarked file and add a semitransparent watermark. with concurrent.futures.ProcessPoolExecutor(max_workers=2) as render_executor:
run_in_bash( if horiz_output_file:
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \ render_futures.append(
-filter_complex " \ render_executor.submit(render_horizontal, decode_options, watermark_image, horiz_output_file)
[1]format=rgba,colorchannelmixer=aa=0.5[logo]; \
[0][logo]overlay=W-w-30:H-h-30:format=auto,format=yuv420p \
" -c:a aac -b:a 128k "{horiz_output_file}"''',
shell=True,
check=True,
capture_output=True,
) )
logger.info("Creating vertical video...") if vert_output_file:
render_futures.append(
# Vertical Pipeline: Crop (zoom), split & blur unmarked file for a vertical aspect ratio, render_executor.submit(render_vertical, decode_options, watermark_image, vert_output_file)
# then overlay a centered, opaque watermark at the bottom.
run_in_bash(
f'''ffmpeg -y {decode_options} -i "{CACHE_DIR / "out-unmarked.mp4"}" -i "{watermark_image}" \
-filter_complex " \
[0]crop=3/4*in_w:in_h[zoomed]; \
[zoomed]split[original][copy]; \
[copy]scale=-1:ih*(4/3)*(4/3),crop=w=ih*9/16,gblur=sigma=17:steps=5[blurred]; \
[blurred][original]overlay=(main_w-overlay_w)/2:(main_h-overlay_h)/2[vert]; \
[vert][1]overlay=(W-w)/2:H-h-30,format=yuv420p \
" -c:a aac -b:a 128k "{vert_output_file}"''',
shell=True,
check=True,
capture_output=True,
) )
for f in concurrent.futures.as_completed(render_futures):
try:
f.result()
except Exception as e:
logger.exception(f"Rendering failed: {e}")
logger.info("Video processing pipeline completed.") logger.info("Video processing pipeline completed.")
logger.info("Cleaning up temporary files...") logger.info("Cleaning up temporary files...")
shutil.rmtree(CACHE_DIR) shutil.rmtree(CACHE_DIR)