如何使用 FFmpeg 进行批量处理和自动化任务?
FFmpeg 可以通过脚本和工具实现批量处理,大大提高工作效率,适合处理大量视频文件。基本批量处理使用 Shell 脚本批量转码#!/bin/bash# 批量转码 MP4 文件for file in *.mp4; do output="output_${file}" ffmpeg -i "$file" -c:v libx264 -crf 23 -c:a aac -b:a 128k "$output"done# 批量转换格式for file in *.avi; do output="${file%.avi}.mp4" ffmpeg -i "$file" -c:v libx264 -crf 23 -c:a aac -b:a 128k "$output"done使用 find 命令批量处理# 递归处理目录中的所有视频find input_dir -name "*.mp4" -exec ffmpeg -i {} -c:v libx264 -crf 23 output_dir/{} \;# 批量提取音频find input_dir -name "*.mp4" -exec ffmpeg -i {} -vn -c:a copy output_dir/{/.}.aac \;并行处理使用 GNU Parallel# 安装 GNU Parallelbrew install parallel # macOSapt-get install parallel # Ubuntu/Debian# 并行处理 4 个文件find input_dir -name "*.mp4" | parallel -j 4 ffmpeg -i {} -c:v libx264 -crf 23 output_dir/{/.}.mp4# 显示进度find input_dir -name "*.mp4" | parallel --bar -j 4 ffmpeg -i {} -c:v libx264 -crf 23 output_dir/{/.}.mp4使用 xargs 并行处理# 并行处理 4 个文件find input_dir -name "*.mp4" | xargs -P 4 -I {} ffmpeg -i {} -c:v libx264 -crf 23 output_dir/{/.}.mp4Python 批量处理使用 subprocess 模块import subprocessimport osimport glob# 批量转码视频input_dir = "input"output_dir = "output"os.makedirs(output_dir, exist_ok=True)for input_file in glob.glob(f"{input_dir}/*.mp4"): filename = os.path.basename(input_file) output_file = os.path.join(output_dir, f"converted_{filename}") cmd = [ "ffmpeg", "-i", input_file, "-c:v", "libx264", "-crf", "23", "-c:a", "aac", "-b:a", "128k", output_file ] subprocess.run(cmd, check=True) print(f"Processed: {filename}")使用 multiprocessing 并行处理import subprocessimport osimport globfrom multiprocessing import Pooldef process_video(input_file): output_dir = "output" filename = os.path.basename(input_file) output_file = os.path.join(output_dir, f"converted_{filename}") cmd = [ "ffmpeg", "-i", input_file, "-c:v", "libx264", "-crf", "23", "-c:a", "aac", "-b:a", "128k", output_file ] subprocess.run(cmd, check=True) return filename# 并行处理input_files = glob.glob("input/*.mp4")with Pool(processes=4) as pool: results = pool.map(process_video, input_files) for result in results: print(f"Processed: {result}")PowerShell 批量处理Windows PowerShell 脚本# 批量转码视频$inputDir = "input"$outputDir = "output"New-Item -ItemType Directory -Force -Path $outputDirGet-ChildItem -Path $inputDir -Filter "*.mp4" | ForEach-Object { $inputFile = $_.FullName $outputFile = Join-Path $outputDir "converted_$($_.Name)" ffmpeg -i $inputFile -c:v libx264 -crf 23 -c:a aac -b:a 128k $outputFile Write-Host "Processed: $($_.Name)"}高级批量处理根据文件大小调整参数#!/bin/bashfor file in *.mp4; do size=$(stat -f%z "$file" 2>/dev/null || stat -c%s "$file") size_mb=$((size / 1024 / 1024)) if [ $size_mb -gt 100 ]; then # 大文件使用更高的压缩率 ffmpeg -i "$file" -c:v libx264 -crf 28 -preset slow "compressed_$file" else # 小文件使用默认参数 ffmpeg -i "$file" -c:v libx264 -crf 23 -preset medium "compressed_$file" fidone根据分辨率调整参数#!/bin/bashfor file in *.mp4; do # 获取视频分辨率 resolution=$(ffprobe -v error -select_streams v:0 -show_entries stream=width,height -of csv=s=x:p=0 "$file") if [[ $resolution == "1920x1080" ]]; then # 1080p 视频降低到 720p ffmpeg -i "$file" -vf scale=1280:720 -c:v libx264 -crf 23 "720p_$file" elif [[ $resolution == "3840x2160" ]]; then # 4K 视频降低到 1080p ffmpeg -i "$file" -vf scale=1920:1080 -c:v libx264 -crf 23 "1080p_$file" else # 其他分辨率保持不变 ffmpeg -i "$file" -c:v libx264 -crf 23 "compressed_$file" fidone批量生成缩略图#!/bin/bashfor file in *.mp4; do filename="${file%.*}" # 在 10% 处截取缩略图 ffmpeg -i "$file" -ss 00:00:05 -vframes 1 -vf scale=320:180 "${filename}_thumb.jpg" echo "Generated thumbnail for: $file"done错误处理与日志带错误处理的批量处理#!/bin/bashlog_file="processing.log"error_count=0for file in *.mp4; do echo "Processing: $file" | tee -a "$log_file" if ffmpeg -i "$file" -c:v libx264 -crf 23 "output_$file" 2>> "$log_file"; then echo "Success: $file" | tee -a "$log_file" else echo "Error: $file" | tee -a "$log_file" ((error_count++)) fidoneecho "Total errors: $error_count" | tee -a "$log_file"Python 带错误处理import subprocessimport osimport globimport logginglogging.basicConfig(filename='processing.log', level=logging.INFO)def process_video(input_file): try: output_file = f"output_{os.path.basename(input_file)}" cmd = ["ffmpeg", "-i", input_file, "-c:v", "libx264", "-crf", "23", output_file] subprocess.run(cmd, check=True, capture_output=True) logging.info(f"Success: {input_file}") return True except subprocess.CalledProcessError as e: logging.error(f"Error: {input_file} - {e.stderr.decode()}") return False# 批量处理input_files = glob.glob("*.mp4")success_count = sum(process_video(f) for f in input_files)error_count = len(input_files) - success_countlogging.info(f"Total: {len(input_files)}, Success: {success_count}, Errors: {error_count}")自动化任务定时任务(Cron)# 添加到 crontab# 每天凌晨 2 点执行批量处理0 2 * * * /path/to/batch_process.sh >> /var/log/ffmpeg_batch.log 2>&1监控目录自动处理import timeimport osfrom watchdog.observers import Observerfrom watchdog.events import FileSystemEventHandlerclass VideoHandler(FileSystemEventHandler): def on_created(self, event): if event.src_path.endswith('.mp4'): print(f"New video detected: {event.src_path}") process_video(event.src_path)def process_video(input_file): output_file = f"output_{os.path.basename(input_file)}" cmd = ["ffmpeg", "-i", input_file, "-c:v", "libx264", "-crf", "23", output_file] subprocess.run(cmd)if __name__ == "__main__": event_handler = VideoHandler() observer = Observer() observer.schedule(event_handler, path='input_dir', recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()批量处理最佳实践使用并行处理:充分利用多核 CPU 提高处理速度添加错误处理:记录错误日志,便于排查问题监控资源使用:避免同时处理过多文件导致系统资源耗尽测试小批量:在大规模处理前先测试小批量使用进度显示:了解处理进度,便于估算完成时间备份原始文件:处理前备份重要文件批量处理可以大幅提高工作效率,但需要注意系统资源管理和错误处理。