import base64
import json
import os
import subprocess
import tempfile
from pathlib import Path
from openai import OpenAI
tools = [{
"type": "function",
"function": {
"name": "watch_video_clip",
"description": "Watch a video file or a sub-clip of it. If start_time and end_time are not provided, the entire video will be returned.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The path to the video file to watch"
},
"start_time": {
"type": "number",
"description": "The start time of the clip in seconds (optional, defaults to 0)"
},
"end_time": {
"type": "number",
"description": "The end time of the clip in seconds (optional, defaults to end of video)"
}
},
"required": ["path"]
}
}
}]
def watch_video_clip(path: str, start_time: float | None = None, end_time: float | None = None) -> list[dict]:
"""
Watch a video file or a sub-clip of it.
Args:
path: The path to the video file to watch
start_time: The start time in seconds (optional, defaults to 0)
end_time: The end time in seconds (optional, defaults to end of video)
Returns:
A list of content blocks in MultiModal Tool API format
"""
video_path = Path(path)
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {path}")
# Get video duration if needed
if start_time is None and end_time is None:
# Return entire video
with open(path, "rb") as f:
video_base64 = base64.b64encode(f.read()).decode("utf-8")
return [
{"type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{video_base64}"}},
{"type": "text", "text": f"Full video: {video_path.name}"}
]
# Get video duration for defaults
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", path],
capture_output=True, text=True
)
duration = float(json.loads(probe.stdout)["format"]["duration"])
start_time = start_time or 0
end_time = end_time or duration
clip_duration = end_time - start_time
# Extract clip
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp:
tmp_path = tmp.name
try:
subprocess.run([
"ffmpeg", "-y", "-ss", str(start_time), "-i", path,
"-t", str(clip_duration), "-c:v", "libx264", "-c:a", "aac",
"-preset", "fast", "-crf", "23", "-movflags", "+faststart",
"-loglevel", "error", tmp_path
], check=True)
with open(tmp_path, "rb") as f:
video_base64 = base64.b64encode(f.read()).decode("utf-8")
return [
{"type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{video_base64}"}},
{"type": "text", "text": f"Clip from {video_path.name}: {start_time}s - {end_time}s"}
]
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url="https://api.moonshot.cn/v1"
)
def agent_loop(user_message: str):
"""Simple agent loop with multimodal tool support."""
messages = [
{"role": "system", "content": "You are a video analysis assistant. Use watch_video_clip to examine specific portions of videos."},
{"role": "user", "content": user_message}
]
while True:
response = client.chat.completions.create(
model="kimi-k2.6",
messages=messages,
tools=tools,
tool_choice="auto"
)
message = response.choices[0].message
messages.append(message.model_dump())
# No tool calls = done
if not message.tool_calls:
return message.content
# Execute tool calls
for tool_call in message.tool_calls:
if tool_call.function.name == "watch_video_clip":
args = json.loads(tool_call.function.arguments)
result = watch_video_clip(
path=args["path"],
start_time=args.get("start_time"),
end_time=args.get("end_time")
)
# Multimodal tool result
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result
})
# Usage
answer = agent_loop("分析 /path/to/test_video.mp4 这个视频的 8-13 秒发生了什么")
print(answer)