Batch API 支持
kimi-k2.6 和 kimi-k2.5 模型。这些模型的 temperature、top_p 等参数不可修改,请勿在请求 body 中设置这些参数。创建批处理任务
上传 JSONL 文件并创建批处理任务
列出批处理任务
获取当前组织的批处理任务列表
获取任务详情
查询指定批处理任务的状态和详细信息
取消批处理任务
取消正在进行的批处理任务
使用流程
本指南通过一个文本分类的实例,展示 Batch API 的完整使用流程:1. 构造输入文件
JSONL 文件中每行是一个独立的 JSON 对象,代表一个推理请求:{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "kimi-k2.6", "messages": [{"role": "system", "content": "你是一个文本分类助手"}, {"role": "user", "content": "请分类这段文本:人工智能正在改变世界"}]}}
| 字段 | 是否必须 | 说明 |
|---|---|---|
custom_id | 必须 | 自定义请求标识,用于追踪结果,需在文件内唯一 |
method | 必须 | 请求方法,固定为 POST |
url | 必须 | 请求地址,固定为 /v1/chat/completions |
body | 必须 | 请求体,与 Chat Completions API 参数一致 |
body 中的 model 必须是 kimi-k2.6 或 kimi-k2.5。这些模型的 temperature、top_p、n、presence_penalty、frequency_penalty 参数均不可修改,请勿在 body 中设置这些参数。输入文件要求:
- 文件必须为
.jsonl格式,大小不能为空且不超过 100MB - 每行必须是合法的 JSON 对象,且包含
custom_id、method、url、body四个字段 custom_id在文件内必须唯一- 所有行的
model必须相同,一个批次只允许一个模型 method固定为POST,url固定为/v1/chat/completions- 指定的模型必须存在且用户有访问权限
2. 上传文件
通过文件上传接口上传 JSONL 文件,purpose 必须设置为 "batch"。
import os
from openai import OpenAI
from openai.types import FileObject
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
file_object: FileObject = client.files.create(
file=open("batch_requests.jsonl", "rb"),
purpose="batch",
)
print(file_object.id) # 保存 file_id,下一步使用
3. 创建任务
调用创建批处理任务接口,传入input_file_id 和 completion_window。completion_window 建议根据数据量合理设置,较长的时间窗口可以提高任务完成率。
import os
from openai import OpenAI
from openai.types import Batch
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
batch: Batch = client.batches.create(
input_file_id="your_file_id",
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(batch.id) # 保存 batch_id,用于轮询状态
4. 等待完成
创建后任务进入validating 状态,系统将异步校验输入文件。校验通过后进入 in_progress 状态开始执行。你可以通过获取任务详情接口轮询状态。
import os
import time
from openai import OpenAI
from openai.types import Batch
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
while True:
batch: Batch = client.batches.retrieve("your_batch_id")
completed: int = batch.request_counts.completed if batch.request_counts else 0
total: int = batch.request_counts.total if batch.request_counts else 0
print(f"状态: {batch.status} ({completed}/{total})")
if batch.status == "completed":
break
elif batch.status in ("failed", "expired", "cancelled"):
print(f"任务异常终止: {batch.status}")
break
time.sleep(10)
5. 处理结果
任务完成后,output_file_id 字段包含结果文件 ID,通过获取文件内容接口下载。如果有请求失败,error_file_id 包含错误文件 ID。
import json
import os
from openai import OpenAI
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
output = client.files.content("your_output_file_id")
for line in output.text.strip().split("\n"):
result: dict = json.loads(line)
custom_id: str = result["custom_id"]
content: str = result["response"]["body"]["choices"][0]["message"]["content"]
print(f"{custom_id}: {content}")
{
"id": "request-1",
"custom_id": "request-1",
"response": {
"status_code": 200,
"request_id": "",
"body": {
"id": "chatcmpl-xxx",
"object": "chat.completion",
"created": 1711475054,
"model": "kimi-k2.6",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "这段文本属于科技类。"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 30,
"completion_tokens": 10,
"total_tokens": 40
}
}
},
"error": null
}
完整代码示例
以下是将上述步骤串联起来的完整脚本,可直接复制运行:import json
import os
import time
from pathlib import Path
from openai import OpenAI
MODEL = "kimi-k2.6"
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
def create_input_jsonl() -> Path:
"""构造 JSONL 输入文件,每行是一个分类请求。"""
texts: list[str] = [
"哈姆雷特是莎士比亚最著名的悲剧作品之一",
"科学家发现新的潜在宜居行星",
"2024年人工智能发展报告",
"如何制作一道美味的红烧肉",
"最新iPhone发布会详细信息",
]
requests: list[dict] = []
for i, text in enumerate(texts):
requests.append({
"custom_id": f"text_{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": "你是一个文本分类专家,请将文本分类为:文学类/新闻类/学术类/科技类/生活类"},
{"role": "user", "content": f"请对以下文本进行分类:{text}"},
],
},
})
output_path = Path("classification_requests.jsonl")
with output_path.open("w", encoding="utf-8") as f:
for req in requests:
f.write(json.dumps(req, ensure_ascii=False) + "\n")
return output_path
# 1. 构造输入文件
input_file: Path = create_input_jsonl()
# 2. 上传文件
file_object = client.files.create(file=input_file, purpose="batch")
print(f"文件已上传: {file_object.id}")
# 3. 创建批处理任务
batch = client.batches.create(
input_file_id=file_object.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(f"任务已创建: {batch.id}")
# 4. 轮询等待完成
while True:
batch = client.batches.retrieve(batch.id)
print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
if batch.status == "completed":
break
elif batch.status in ("failed", "expired", "cancelled"):
print(f"任务异常终止: {batch.status}")
exit(1)
time.sleep(10)
# 5. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
data: dict = json.loads(line)
print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")
Batch 状态说明
| 状态 | 说明 |
|---|---|
validating | 已创建,正在校验输入数据 |
failed | 数据校验失败,任务终止 |
in_progress | 数据校验通过,正在执行 |
finalizing | 执行完毕,正在准备结果 |
completed | 结果准备完毕,任务完成 |
expired | 未在 completion_window 内完成 |
cancelling | 已发起取消,等待实际取消 |
cancelled | 取消完成,任务终止 |
任务管理
列出批处理任务
通过列出批处理任务接口查看当前组织下的所有批处理任务。import os
from openai import OpenAI
from openai.pagination import SyncCursorPage
from openai.types import Batch
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
batches: SyncCursorPage[Batch] = client.batches.list(limit=10)
for batch in batches.data:
print(f"{batch.id} - {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
取消批处理任务
通过取消批处理任务接口取消正在进行的任务。仅validating、in_progress、finalizing 状态的任务可以取消。取消后任务状态会先变为 cancelling,最终变为 cancelled。
import os
from openai import OpenAI
from openai.types import Batch
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
batch: Batch = client.batches.cancel("your_batch_id")
print(f"状态: {batch.status}") # cancelling
多模态 Batch 任务
Batch API 支持在输入文件中包含图片和视频内容。与文本任务的区别主要在于 构造输入文件 这一步,其余流程(上传、创建任务、轮询、处理结果)完全一致。图片批处理示例
图片批处理示例
图片有两种传入方式:
- base64 内嵌:将图片编码为 base64 直接写入 JSONL,适合小图片。注意 base64 会使体积膨胀约 33%,请关注 100MB 文件大小限制。
- 文件引用:先通过文件接口上传图片(
purpose="image"),然后在 JSONL 中通过ms://<file_id>引用,适合大图片或图片复用场景。
import base64
import json
import os
import time
from pathlib import Path
from openai import OpenAI
from openai.types import Batch, FileObject
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
MODEL = "kimi-k2.6"
PROMPT = "请分类这张图片:风景/人物/美食/建筑/其他"
SYSTEM = "你是一个图片分类助手"
def build_request_base64(custom_id: str, image_path: str) -> dict:
"""方式一:将图片编码为 base64 直接内嵌到 JSONL 中。
适合小图片,无需额外上传步骤。"""
with open(image_path, "rb") as f:
image_data: str = base64.b64encode(f.read()).decode("utf-8")
return {
"custom_id": custom_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}},
{"type": "text", "text": PROMPT},
],
},
],
},
}
def build_request_upload(custom_id: str, image_path: str) -> dict:
"""方式二:先上传图片获取 file_id,再通过 ms://<file_id> 引用。
适合大图片或同一张图片被多个请求复用的场景。"""
file_object: FileObject = client.files.create(
file=open(image_path, "rb"),
purpose="image",
)
print(f"图片已上传: {image_path} -> {file_object.id}")
return {
"custom_id": custom_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM},
{
"role": "user",
"content": [
{"type": "image_url", "image_url": {"url": f"ms://{file_object.id}"}},
{"type": "text", "text": PROMPT},
],
},
],
},
}
# ====== 在这里选择构建方式 ======
build_request = build_request_base64 # 或 build_request_upload
# ================================
# 1. 构造输入文件
images: list[str] = ["image1.png", "image2.png", "image3.png"]
requests: list[dict] = [build_request(f"img-{i}", path) for i, path in enumerate(images)]
input_path = Path("image_batch_requests.jsonl")
with input_path.open("w", encoding="utf-8") as f:
for req in requests:
f.write(json.dumps(req, ensure_ascii=False) + "\n")
# 2. 上传 JSONL 并创建任务
file_object: FileObject = client.files.create(file=input_path, purpose="batch")
batch: Batch = client.batches.create(
input_file_id=file_object.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(f"任务已创建: {batch.id}")
# 3. 轮询等待完成
while True:
batch = client.batches.retrieve(batch.id)
print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
if batch.status == "completed":
break
elif batch.status in ("failed", "expired", "cancelled"):
print(f"任务异常终止: {batch.status}")
exit(1)
time.sleep(10)
# 4. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
data: dict = json.loads(line)
print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")
视频批处理示例
视频批处理示例
视频有两种传入方式:
- base64 内嵌:将视频编码为 base64 直接写入 JSONL,适合小视频。注意 base64 会使体积膨胀约 33%,请关注 100MB 文件大小限制。
- 文件引用:先通过文件接口上传视频(
purpose="video"),然后在 JSONL 中通过ms://<file_id>引用,适合大视频或视频复用场景。
import base64
import json
import os
import time
from pathlib import Path
from openai import OpenAI
from openai.types import Batch, FileObject
MODEL = "kimi-k2.6"
client = OpenAI(
api_key=os.environ.get("MOONSHOT_API_KEY"),
base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)
PROMPT = "请总结这个视频的主要内容"
SYSTEM = "你是一个视频内容分析助手"
def build_request_base64(custom_id: str, video_path: str) -> dict:
"""方式一:将视频编码为 base64 直接内嵌到 JSONL 中。
适合小视频,无需额外上传步骤。"""
with open(video_path, "rb") as f:
video_data: str = base64.b64encode(f.read()).decode("utf-8")
return {
"custom_id": custom_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM},
{
"role": "user",
"content": [
{"type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{video_data}"}},
{"type": "text", "text": PROMPT},
],
},
],
},
}
def build_request_upload(custom_id: str, video_path: str) -> dict:
"""方式二:先上传视频获取 file_id,再通过 ms://<file_id> 引用。
适合大视频或同一个视频被多个请求复用的场景。"""
file_object: FileObject = client.files.create(
file=open(video_path, "rb"),
purpose="video",
)
print(f"视频已上传: {video_path} -> {file_object.id}")
return {
"custom_id": custom_id,
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": MODEL,
"messages": [
{"role": "system", "content": SYSTEM},
{
"role": "user",
"content": [
{"type": "video_url", "video_url": {"url": f"ms://{file_object.id}"}},
{"type": "text", "text": PROMPT},
],
},
],
},
}
# ====== 在这里选择构建方式 ======
build_request = build_request_base64 # 或 build_request_upload
# ================================
# 1. 构造输入文件
videos: list[str] = ["video1.mp4", "video2.mp4", "video3.mp4"]
requests: list[dict] = [build_request(f"video-{i}", path) for i, path in enumerate(videos)]
input_path = Path("video_batch_requests.jsonl")
with input_path.open("w", encoding="utf-8") as f:
for req in requests:
f.write(json.dumps(req, ensure_ascii=False) + "\n")
# 2. 上传 JSONL 并创建任务
batch_file: FileObject = client.files.create(file=input_path, purpose="batch")
batch: Batch = client.batches.create(
input_file_id=batch_file.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(f"任务已创建: {batch.id}")
# 3. 轮询等待完成
while True:
batch = client.batches.retrieve(batch.id)
print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
if batch.status == "completed":
break
elif batch.status in ("failed", "expired", "cancelled"):
print(f"任务异常终止: {batch.status}")
exit(1)
time.sleep(10)
# 4. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
data: dict = json.loads(line)
print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")
扩展建议
- 根据实际数据量调整
completion_window,较大的数据集建议设置3d或7d - 轮询间隔建议 10-60 秒,避免频繁请求
- 结果处理可以根据需求写入数据库或生成报告
- 建议对大文件做分批处理,每个文件控制在合理大小