Initial commit of akmon project

This commit is contained in:
2026-01-20 08:04:15 +08:00
commit 77a2bab985
1309 changed files with 343305 additions and 0 deletions

57
video_splitter/README.md Normal file
View File

@@ -0,0 +1,57 @@
# 智能新闻拆条系统 (Smart News Splitter)
本项目演示了如何利用 **Qwen3-VL** 多模态大模型实现新闻节目的自动拆条、编目和入库。
## 项目结构
- `main.py`: 主程序入口协调预处理、AI分析和视频切割流程。
- `ai_agent.py`: 模拟 Qwen3-VL 的交互代理,负责视频内容理解和分段。
- `processor.py`: 封装 FFmpeg 操作,处理视频切割和音频提取。
- `requirements.txt`: 项目依赖。
## 核心流程
1. **预处理**: 提取视频音频,进行 ASR 转写(本项目使用模拟文本)。
2. **AI 分析**: 将视频帧和 ASR 文本发送给 Qwen3-VL模型返回结构化的新闻片段信息JSON
3. **物理切割**: 根据模型返回的时间戳,调用 FFmpeg 进行无损切割。
4. **自动编目**: 生成标题、摘要、标签和封面图,输出 `catalog_report.json`
## 快速开始
### 1. 安装依赖
确保系统已安装 [FFmpeg](https://ffmpeg.org/download.html) 并将其加入环境变量。
```bash
pip install -r requirements.txt
```
### 2. 运行拆条
准备一个新闻视频文件(例如 `news.mp4`),然后运行:
```bash
python main.py -i news.mp4 -o output_folder
```
> **注意**: 由于没有真实的 Qwen3-VL API Key`ai_agent.py` 目前返回的是模拟数据。在实际生产环境中,请替换 `analyze_video` 方法中的逻辑,接入 DashScope API。
## 实际接入指南
要接入真实的 Qwen3-VL请修改 `ai_agent.py`:
1. 安装 DashScope SDK: `pip install dashscope`
2. 配置 API Key: `dashscope.api_key = "YOUR_API_KEY"`
3. 构造多模态消息:
```python
messages = [
{
"role": "user",
"content": [
{"video": video_path},
{"text": "请分析这段视频,拆分出每条新闻..."}
]
}
]
```
4. 调用 `dashscope.MultiModalConversation.call()` 获取结果。

View File

@@ -0,0 +1,71 @@
import json
import time
from typing import List, Dict, Any
class QwenVLSmartSplitter:
"""
模拟 Qwen3-VL 多模态大模型的智能拆条代理
"""
def __init__(self, api_key: str = "mock_key"):
self.api_key = api_key
# 在实际生产中,这里会初始化 DashScope 或 OpenAI 客户端
# from dashscope import MultiModalConversation
def analyze_video(self, video_path: str, asr_text: str = "") -> List[Dict[str, Any]]:
"""
模拟调用 Qwen3-VL 分析视频内容并返回拆条信息
Args:
video_path: 视频文件路径
asr_text: (可选) 视频的语音转写文本,辅助模型理解
Returns:
List[Dict]: 拆条片段列表
"""
print(f"[*] 正在调用 Qwen3-VL 分析视频: {video_path} ...")
print(f"[*] 上下文 ASR 文本长度: {len(asr_text)} 字符")
# 模拟网络延迟
time.sleep(2)
# 模拟 Qwen3-VL 的输出 (JSON 格式)
# 实际场景中,这里会将视频帧和 Prompt 发送给模型
# Prompt: "请分析这段新闻视频,识别出其中包含的每一条独立新闻报道..."
mock_response = [
{
"start": "00:00:05",
"end": "00:02:10",
"title": "全县春耕生产工作会议召开",
"category": "时政",
"summary": "县委书记主持召开春耕生产工作会议,强调要抢抓农时,确保粮食安全。",
"tags": ["春耕", "农业", "时政"]
},
{
"start": "00:02:12",
"end": "00:05:45",
"title": "我县新增三家高新技术企业",
"category": "经济",
"summary": "科技局发布消息,我县三家企业通过国家高新技术企业认定,实现了零的突破。",
"tags": ["高新技术", "企业", "经济"]
},
{
"start": "00:05:50",
"end": "00:08:20",
"title": "交警部门开展电动车专项整治行动",
"category": "民生",
"summary": "为规范交通秩序,交警大队在主要路口开展电动车违规行为专项整治。",
"tags": ["交通", "整治", "民生"]
}
]
print("[+] Qwen3-VL 分析完成,识别出 3 个新闻片段。")
return mock_response
def generate_cover(self, video_path: str, timestamp: str) -> str:
"""
模拟智能封面生成
"""
# 实际逻辑:提取指定时间戳的帧,或者让模型选一帧
return f"{video_path}_cover_{timestamp.replace(':', '')}.jpg"

85
video_splitter/main.py Normal file
View File

@@ -0,0 +1,85 @@
import os
import json
import argparse
from ai_agent import QwenVLSmartSplitter
from processor import VideoProcessor
def main():
parser = argparse.ArgumentParser(description="基于 Qwen3-VL 的智能新闻拆条系统")
parser.add_argument("--input", "-i", required=True, help="输入视频文件路径")
parser.add_argument("--output_dir", "-o", default="output", help="输出目录")
args = parser.parse_args()
input_video = args.input
output_dir = args.output_dir
if not os.path.exists(input_video):
# 为了演示方便,如果文件不存在,我们创建一个假的空文件或者报错
# 这里我们报错
print(f"错误: 输入文件 {input_video} 不存在")
# 在演示模式下,我们可以生成一个模拟的空文件来跑通流程,但最好还是让用户提供
return
if not os.path.exists(output_dir):
os.makedirs(output_dir)
print("="*50)
print(" 智能新闻拆条系统 (Powered by Qwen3-VL)")
print("="*50)
processor = VideoProcessor()
ai_agent = QwenVLSmartSplitter()
# 1. 预处理:提取音频 (模拟)
print("\n[1/4] 视频预处理...")
audio_path = os.path.join(output_dir, "temp_audio.wav")
# processor.extract_audio(input_video, audio_path) # 实际运行时取消注释
print(f"[*] 音频已提取 (模拟): {audio_path}")
# 模拟 ASR 结果
mock_asr_text = "各位观众晚上好,欢迎收看今天的新闻联播...下面播报第一条新闻..."
# 2. AI 分析
print("\n[2/4] 多模态 AI 分析 (Qwen3-VL)...")
segments = ai_agent.analyze_video(input_video, mock_asr_text)
# 3. 执行拆条
print("\n[3/4] 执行智能拆条...")
results = []
for i, seg in enumerate(segments):
safe_title = seg['title'].replace(" ", "_")
filename = f"{i+1}_{safe_title}.mp4"
output_path = os.path.join(output_dir, filename)
# 调用 FFmpeg 切割
# 注意:如果没有真实的视频文件,这一步会失败。
# 为了演示代码逻辑,我们这里加个 try-catch 或者检查文件大小
if os.path.getsize(input_video) > 0:
success = processor.split_video(input_video, seg['start'], seg['end'], output_path)
else:
print(f"[*] (演示模式) 模拟切割: {seg['start']} -> {seg['end']} => {output_path}")
success = True
if success:
# 4. 自动编目信息整理
catalog_info = {
"file_path": output_path,
"title": seg['title'],
"category": seg['category'],
"summary": seg['summary'],
"tags": seg['tags'],
"cover_image": ai_agent.generate_cover(output_path, "00:00:01")
}
results.append(catalog_info)
# 输出最终报告
print("\n[4/4] 生成编目报告...")
report_path = os.path.join(output_dir, "catalog_report.json")
with open(report_path, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
print(f"\n[+] 拆条完成!共生成 {len(results)} 个片段。")
print(f"[+] 编目报告已保存至: {report_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,58 @@
import ffmpeg
import os
import sys
class VideoProcessor:
def __init__(self):
pass
def get_video_info(self, input_path: str):
try:
probe = ffmpeg.probe(input_path)
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
return video_stream
except ffmpeg.Error as e:
print(f"Error probing video: {e.stderr}", file=sys.stderr)
return None
def split_video(self, input_path: str, start_time: str, end_time: str, output_path: str):
"""
使用 FFmpeg 切割视频
Args:
input_path: 输入视频路径
start_time: 开始时间 (HH:MM:SS)
end_time: 结束时间 (HH:MM:SS)
output_path: 输出文件路径
"""
print(f"[*] 正在切割视频: {start_time} - {end_time} -> {output_path}")
try:
(
ffmpeg
.input(input_path, ss=start_time, to=end_time)
.output(output_path, c='copy') # Stream copy for speed and quality
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
print(f"[+] 切割成功: {output_path}")
return True
except ffmpeg.Error as e:
print(f"[-] 切割失败: {e.stderr.decode('utf8')}", file=sys.stderr)
return False
def extract_audio(self, input_path: str, output_path: str):
"""
提取音频用于 ASR
"""
try:
(
ffmpeg
.input(input_path)
.output(output_path, ac=1, ar=16000) # Mono, 16k for ASR
.overwrite_output()
.run(quiet=True)
)
return True
except ffmpeg.Error as e:
print(f"[-] 音频提取失败: {e.stderr.decode('utf8')}")
return False

View File

@@ -0,0 +1,3 @@
ffmpeg-python==0.2.0
requests==2.31.0
pydantic==2.5.2