Files
akmon/video_splitter/main.py
2026-01-20 08:04:15 +08:00

86 lines
3.2 KiB
Python

import os
import json
import argparse
from ai_agent import QwenVLSmartSplitter
from processor import VideoProcessor
def main():
parser = argparse.ArgumentParser(description="基于 Qwen3-VL 的智能新闻拆条系统")
parser.add_argument("--input", "-i", required=True, help="输入视频文件路径")
parser.add_argument("--output_dir", "-o", default="output", help="输出目录")
args = parser.parse_args()
input_video = args.input
output_dir = args.output_dir
if not os.path.exists(input_video):
# 为了演示方便,如果文件不存在,我们创建一个假的空文件或者报错
# 这里我们报错
print(f"错误: 输入文件 {input_video} 不存在")
# 在演示模式下,我们可以生成一个模拟的空文件来跑通流程,但最好还是让用户提供
return
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print("="*50)
print(" 智能新闻拆条系统 (Powered by Qwen3-VL)")
print("="*50)
processor = VideoProcessor()
ai_agent = QwenVLSmartSplitter()
# 1. 预处理:提取音频 (模拟)
print("\n[1/4] 视频预处理...")
audio_path = os.path.join(output_dir, "temp_audio.wav")
# processor.extract_audio(input_video, audio_path) # 实际运行时取消注释
print(f"[*] 音频已提取 (模拟): {audio_path}")
# 模拟 ASR 结果
mock_asr_text = "各位观众晚上好,欢迎收看今天的新闻联播...下面播报第一条新闻..."
# 2. AI 分析
print("\n[2/4] 多模态 AI 分析 (Qwen3-VL)...")
segments = ai_agent.analyze_video(input_video, mock_asr_text)
# 3. 执行拆条
print("\n[3/4] 执行智能拆条...")
results = []
for i, seg in enumerate(segments):
safe_title = seg['title'].replace(" ", "_")
filename = f"{i+1}_{safe_title}.mp4"
output_path = os.path.join(output_dir, filename)
# 调用 FFmpeg 切割
# 注意:如果没有真实的视频文件,这一步会失败。
# 为了演示代码逻辑,我们这里加个 try-catch 或者检查文件大小
if os.path.getsize(input_video) > 0:
success = processor.split_video(input_video, seg['start'], seg['end'], output_path)
else:
print(f"[*] (演示模式) 模拟切割: {seg['start']} -> {seg['end']} => {output_path}")
success = True
if success:
# 4. 自动编目信息整理
catalog_info = {
"file_path": output_path,
"title": seg['title'],
"category": seg['category'],
"summary": seg['summary'],
"tags": seg['tags'],
"cover_image": ai_agent.generate_cover(output_path, "00:00:01")
}
results.append(catalog_info)
# 输出最终报告
print("\n[4/4] 生成编目报告...")
report_path = os.path.join(output_dir, "catalog_report.json")
with open(report_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n[+] 拆条完成!共生成 {len(results)} 个片段。")
print(f"[+] 编目报告已保存至: {report_path}")
if __name__ == "__main__":
main()