跳转到主要内容
当你需要使用大语言模型处理大规模、低实时性要求的任务时,Batch API 是理想选择。它支持通过文件批量提交任务,相比实时 API 调用可以节省 40% 的推理费用。
Batch API 支持 kimi-k2.6kimi-k2.5 模型。这些模型的 temperaturetop_p 等参数不可修改,请勿在请求 body 中设置这些参数。

创建批处理任务

上传 JSONL 文件并创建批处理任务

列出批处理任务

获取当前组织的批处理任务列表

获取任务详情

查询指定批处理任务的状态和详细信息

取消批处理任务

取消正在进行的批处理任务

使用流程

本指南通过一个文本分类的实例,展示 Batch API 的完整使用流程:

1. 构造输入文件

JSONL 文件中每行是一个独立的 JSON 对象,代表一个推理请求:
{"custom_id": "request-1", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "kimi-k2.6", "messages": [{"role": "system", "content": "你是一个文本分类助手"}, {"role": "user", "content": "请分类这段文本:人工智能正在改变世界"}]}}
字段是否必须说明
custom_id必须自定义请求标识,用于追踪结果,需在文件内唯一
method必须请求方法,固定为 POST
url必须请求地址,固定为 /v1/chat/completions
body必须请求体,与 Chat Completions API 参数一致
body 中的 model 必须是 kimi-k2.6kimi-k2.5。这些模型的 temperaturetop_pnpresence_penaltyfrequency_penalty 参数均不可修改,请勿在 body 中设置这些参数。
输入文件要求:
  • 文件必须为 .jsonl 格式,大小不能为空且不超过 100MB
  • 每行必须是合法的 JSON 对象,且包含 custom_idmethodurlbody 四个字段
  • custom_id 在文件内必须唯一
  • 所有行的 model 必须相同,一个批次只允许一个模型
  • method 固定为 POSTurl 固定为 /v1/chat/completions
  • 指定的模型必须存在且用户有访问权限

2. 上传文件

通过文件上传接口上传 JSONL 文件,purpose 必须设置为 "batch"
import os
from openai import OpenAI
from openai.types import FileObject

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

file_object: FileObject = client.files.create(
    file=open("batch_requests.jsonl", "rb"),
    purpose="batch",
)
print(file_object.id)  # 保存 file_id,下一步使用

3. 创建任务

调用创建批处理任务接口,传入 input_file_idcompletion_windowcompletion_window 建议根据数据量合理设置,较长的时间窗口可以提高任务完成率。
import os
from openai import OpenAI
from openai.types import Batch

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

batch: Batch = client.batches.create(
    input_file_id="your_file_id",
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(batch.id)  # 保存 batch_id,用于轮询状态

4. 等待完成

创建后任务进入 validating 状态,系统将异步校验输入文件。校验通过后进入 in_progress 状态开始执行。你可以通过获取任务详情接口轮询状态。
import os
import time
from openai import OpenAI
from openai.types import Batch

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

while True:
    batch: Batch = client.batches.retrieve("your_batch_id")
    completed: int = batch.request_counts.completed if batch.request_counts else 0
    total: int = batch.request_counts.total if batch.request_counts else 0
    print(f"状态: {batch.status} ({completed}/{total})")

    if batch.status == "completed":
        break
    elif batch.status in ("failed", "expired", "cancelled"):
        print(f"任务异常终止: {batch.status}")
        break

    time.sleep(10)

5. 处理结果

任务完成后,output_file_id 字段包含结果文件 ID,通过获取文件内容接口下载。如果有请求失败,error_file_id 包含错误文件 ID。
import json
import os
from openai import OpenAI

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

output = client.files.content("your_output_file_id")
for line in output.text.strip().split("\n"):
    result: dict = json.loads(line)
    custom_id: str = result["custom_id"]
    content: str = result["response"]["body"]["choices"][0]["message"]["content"]
    print(f"{custom_id}: {content}")
输出文件中每行对应一个请求的处理结果:
{
  "id": "request-1",
  "custom_id": "request-1",
  "response": {
    "status_code": 200,
    "request_id": "",
    "body": {
      "id": "chatcmpl-xxx",
      "object": "chat.completion",
      "created": 1711475054,
      "model": "kimi-k2.6",
      "choices": [
        {
          "index": 0,
          "message": {
            "role": "assistant",
            "content": "这段文本属于科技类。"
          },
          "finish_reason": "stop"
        }
      ],
      "usage": {
        "prompt_tokens": 30,
        "completion_tokens": 10,
        "total_tokens": 40
      }
    }
  },
  "error": null
}

完整代码示例

以下是将上述步骤串联起来的完整脚本,可直接复制运行:
import json
import os
import time
from pathlib import Path

from openai import OpenAI

MODEL = "kimi-k2.6"

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)


def create_input_jsonl() -> Path:
    """构造 JSONL 输入文件,每行是一个分类请求。"""
    texts: list[str] = [
        "哈姆雷特是莎士比亚最著名的悲剧作品之一",
        "科学家发现新的潜在宜居行星",
        "2024年人工智能发展报告",
        "如何制作一道美味的红烧肉",
        "最新iPhone发布会详细信息",
    ]

    requests: list[dict] = []
    for i, text in enumerate(texts):
        requests.append({
            "custom_id": f"text_{i}",
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": MODEL,
                "messages": [
                    {"role": "system", "content": "你是一个文本分类专家,请将文本分类为:文学类/新闻类/学术类/科技类/生活类"},
                    {"role": "user", "content": f"请对以下文本进行分类:{text}"},
                ],
            },
        })

    output_path = Path("classification_requests.jsonl")
    with output_path.open("w", encoding="utf-8") as f:
        for req in requests:
            f.write(json.dumps(req, ensure_ascii=False) + "\n")
    return output_path


# 1. 构造输入文件
input_file: Path = create_input_jsonl()

# 2. 上传文件
file_object = client.files.create(file=input_file, purpose="batch")
print(f"文件已上传: {file_object.id}")

# 3. 创建批处理任务
batch = client.batches.create(
    input_file_id=file_object.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(f"任务已创建: {batch.id}")

# 4. 轮询等待完成
while True:
    batch = client.batches.retrieve(batch.id)
    print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
    if batch.status == "completed":
        break
    elif batch.status in ("failed", "expired", "cancelled"):
        print(f"任务异常终止: {batch.status}")
        exit(1)
    time.sleep(10)

# 5. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
    data: dict = json.loads(line)
    print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")

Batch 状态说明

状态说明
validating已创建,正在校验输入数据
failed数据校验失败,任务终止
in_progress数据校验通过,正在执行
finalizing执行完毕,正在准备结果
completed结果准备完毕,任务完成
expired未在 completion_window 内完成
cancelling已发起取消,等待实际取消
cancelled取消完成,任务终止

任务管理

列出批处理任务

通过列出批处理任务接口查看当前组织下的所有批处理任务。
import os
from openai import OpenAI
from openai.pagination import SyncCursorPage
from openai.types import Batch

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

batches: SyncCursorPage[Batch] = client.batches.list(limit=10)
for batch in batches.data:
    print(f"{batch.id} - {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")

取消批处理任务

通过取消批处理任务接口取消正在进行的任务。仅 validatingin_progressfinalizing 状态的任务可以取消。取消后任务状态会先变为 cancelling,最终变为 cancelled
import os
from openai import OpenAI
from openai.types import Batch

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

batch: Batch = client.batches.cancel("your_batch_id")
print(f"状态: {batch.status}")  # cancelling

多模态 Batch 任务

Batch API 支持在输入文件中包含图片和视频内容。与文本任务的区别主要在于 构造输入文件 这一步,其余流程(上传、创建任务、轮询、处理结果)完全一致。
图片有两种传入方式:
  • base64 内嵌:将图片编码为 base64 直接写入 JSONL,适合小图片。注意 base64 会使体积膨胀约 33%,请关注 100MB 文件大小限制。
  • 文件引用:先通过文件接口上传图片(purpose="image"),然后在 JSONL 中通过 ms://<file_id> 引用,适合大图片或图片复用场景。
以下示例同时提供了两种构建方式,按需选择即可:
import base64
import json
import os
import time
from pathlib import Path

from openai import OpenAI
from openai.types import Batch, FileObject

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

MODEL = "kimi-k2.6"
PROMPT = "请分类这张图片:风景/人物/美食/建筑/其他"
SYSTEM = "你是一个图片分类助手"


def build_request_base64(custom_id: str, image_path: str) -> dict:
    """方式一:将图片编码为 base64 直接内嵌到 JSONL 中。
    适合小图片,无需额外上传步骤。"""
    with open(image_path, "rb") as f:
        image_data: str = base64.b64encode(f.read()).decode("utf-8")
    return {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": MODEL,
            "messages": [
                {"role": "system", "content": SYSTEM},
                {
                    "role": "user",
                    "content": [
                        {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_data}"}},
                        {"type": "text", "text": PROMPT},
                    ],
                },
            ],
        },
    }


def build_request_upload(custom_id: str, image_path: str) -> dict:
    """方式二:先上传图片获取 file_id,再通过 ms://<file_id> 引用。
    适合大图片或同一张图片被多个请求复用的场景。"""
    file_object: FileObject = client.files.create(
        file=open(image_path, "rb"),
        purpose="image",
    )
    print(f"图片已上传: {image_path} -> {file_object.id}")
    return {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": MODEL,
            "messages": [
                {"role": "system", "content": SYSTEM},
                {
                    "role": "user",
                    "content": [
                        {"type": "image_url", "image_url": {"url": f"ms://{file_object.id}"}},
                        {"type": "text", "text": PROMPT},
                    ],
                },
            ],
        },
    }


# ====== 在这里选择构建方式 ======
build_request = build_request_base64  # 或 build_request_upload
# ================================

# 1. 构造输入文件
images: list[str] = ["image1.png", "image2.png", "image3.png"]
requests: list[dict] = [build_request(f"img-{i}", path) for i, path in enumerate(images)]

input_path = Path("image_batch_requests.jsonl")
with input_path.open("w", encoding="utf-8") as f:
    for req in requests:
        f.write(json.dumps(req, ensure_ascii=False) + "\n")

# 2. 上传 JSONL 并创建任务
file_object: FileObject = client.files.create(file=input_path, purpose="batch")
batch: Batch = client.batches.create(
    input_file_id=file_object.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(f"任务已创建: {batch.id}")

# 3. 轮询等待完成
while True:
    batch = client.batches.retrieve(batch.id)
    print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
    if batch.status == "completed":
        break
    elif batch.status in ("failed", "expired", "cancelled"):
        print(f"任务异常终止: {batch.status}")
        exit(1)
    time.sleep(10)

# 4. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
    data: dict = json.loads(line)
    print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")
视频有两种传入方式:
  • base64 内嵌:将视频编码为 base64 直接写入 JSONL,适合小视频。注意 base64 会使体积膨胀约 33%,请关注 100MB 文件大小限制。
  • 文件引用:先通过文件接口上传视频(purpose="video"),然后在 JSONL 中通过 ms://<file_id> 引用,适合大视频或视频复用场景。
以下示例同时提供了两种构建方式,按需选择即可:
import base64
import json
import os
import time
from pathlib import Path

from openai import OpenAI
from openai.types import Batch, FileObject

MODEL = "kimi-k2.6"

client = OpenAI(
    api_key=os.environ.get("MOONSHOT_API_KEY"),
    base_url=os.environ.get("MOONSHOT_BASE_URL", "https://api.moonshot.cn/v1"),
)

PROMPT = "请总结这个视频的主要内容"
SYSTEM = "你是一个视频内容分析助手"


def build_request_base64(custom_id: str, video_path: str) -> dict:
    """方式一:将视频编码为 base64 直接内嵌到 JSONL 中。
    适合小视频,无需额外上传步骤。"""
    with open(video_path, "rb") as f:
        video_data: str = base64.b64encode(f.read()).decode("utf-8")
    return {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": MODEL,
            "messages": [
                {"role": "system", "content": SYSTEM},
                {
                    "role": "user",
                    "content": [
                        {"type": "video_url", "video_url": {"url": f"data:video/mp4;base64,{video_data}"}},
                        {"type": "text", "text": PROMPT},
                    ],
                },
            ],
        },
    }


def build_request_upload(custom_id: str, video_path: str) -> dict:
    """方式二:先上传视频获取 file_id,再通过 ms://<file_id> 引用。
    适合大视频或同一个视频被多个请求复用的场景。"""
    file_object: FileObject = client.files.create(
        file=open(video_path, "rb"),
        purpose="video",
    )
    print(f"视频已上传: {video_path} -> {file_object.id}")
    return {
        "custom_id": custom_id,
        "method": "POST",
        "url": "/v1/chat/completions",
        "body": {
            "model": MODEL,
            "messages": [
                {"role": "system", "content": SYSTEM},
                {
                    "role": "user",
                    "content": [
                        {"type": "video_url", "video_url": {"url": f"ms://{file_object.id}"}},
                        {"type": "text", "text": PROMPT},
                    ],
                },
            ],
        },
    }


# ====== 在这里选择构建方式 ======
build_request = build_request_base64  # 或 build_request_upload
# ================================

# 1. 构造输入文件
videos: list[str] = ["video1.mp4", "video2.mp4", "video3.mp4"]
requests: list[dict] = [build_request(f"video-{i}", path) for i, path in enumerate(videos)]

input_path = Path("video_batch_requests.jsonl")
with input_path.open("w", encoding="utf-8") as f:
    for req in requests:
        f.write(json.dumps(req, ensure_ascii=False) + "\n")

# 2. 上传 JSONL 并创建任务
batch_file: FileObject = client.files.create(file=input_path, purpose="batch")
batch: Batch = client.batches.create(
    input_file_id=batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
)
print(f"任务已创建: {batch.id}")

# 3. 轮询等待完成
while True:
    batch = client.batches.retrieve(batch.id)
    print(f"状态: {batch.status} ({batch.request_counts.completed}/{batch.request_counts.total})")
    if batch.status == "completed":
        break
    elif batch.status in ("failed", "expired", "cancelled"):
        print(f"任务异常终止: {batch.status}")
        exit(1)
    time.sleep(10)

# 4. 处理结果
output = client.files.content(batch.output_file_id)
for line in output.text.strip().split("\n"):
    data: dict = json.loads(line)
    print(f"{data['custom_id']}: {data['response']['body']['choices'][0]['message']['content']}")

扩展建议

  • 根据实际数据量调整 completion_window,较大的数据集建议设置 3d7d
  • 轮询间隔建议 10-60 秒,避免频繁请求
  • 结果处理可以根据需求写入数据库或生成报告
  • 建议对大文件做分批处理,每个文件控制在合理大小