Dify 工作流 API 调用:从第一次请求到流式响应

写在前面

如果你已经在 Dify 的画布上拖拽过节点、连过线、点过”运行”按钮,那你对工作流的搭建应该不陌生了。但工作流建好之后,怎么让它在自己的业务系统里跑起来?总不能每次都打开 Dify 界面手动点吧——这时候就需要用到工作流 API。

这篇笔记会以”用 API 调用一个已发布的工作流”为主线,从拿到 API Key 开始,一路走到发起请求、处理响应、上传文件、跑通代码,把整个调用链路串起来。所有例子都围绕同一个场景展开,方便你对照着一步步操作。

参考来源:Dify 官方 API 文档 - https://docs.dify.ai/api-reference


一、调用之前:两件必须先做的事

在写第一行代码之前,有两件事必须先搞定,否则请求发出去只会收到报错。

第一件:拿到 API Key。 进入你的工作流应用,点左侧菜单的”API 访问”,在”API 密钥”页面创建一个 Key。这个 Key 就是你的通行证,每次请求都要带上它。

有一点要特别注意:工作流的 API Key 和知识库的 API Key 不是一回事。工作流 API Key 只能调用对应的工作流,知识库 API Key 则能操作所有知识库,权限范围不同。所以拿到 Key 之后要确认你用的是哪一个,别混了。

另外,API Key 务必保存在服务器端,不要写死在前端代码里,更不要提交到代码仓库。这把钥匙一旦泄露,别人就能随意调用你的工作流。

第二件:发布工作流。 工作流在编辑器里保存不等于发布,只有点了右上角的”发布”按钮,它才能通过 API 调用。如果你改了工作流的逻辑但忘了重新发布,API 调用的还是旧版本,这个问题很容易踩坑。


二、第一次调用:发起一个最简单的请求

准备工作做好了,现在来发第一个请求。工作流 API 的端点只有一个:

1
POST /v1/workflows/run

所有调用都走这一个地址,不管是传文本、传文件,还是流式响应,区别全在请求体里。

一个最简单的调用长这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
curl --request POST \
--url http://你的服务地址/v1/workflows/run \
--header 'Authorization: Bearer {你的API_KEY}' \
--header 'Content-Type: application/json' \
--data '{
"inputs": {
"name": "张三",
"age": 28,
"gender": "男"
},
"response_mode": "blocking",
"user": "user_001"
}'

请求头里的 Authorization: Bearer {API_KEY} 是认证方式,每个请求都必须带。请求体里有三个必填参数,逐个说一下。

inputs 是工作流的输入,对应你在”开始”节点里定义的变量。变量名必须和开始节点里的一模一样——如果你在开始节点定义了一个叫 name 的变量,这里就得传 "name": "张三",写成 "姓名": "张三" 就会报错。怎么确认变量名?打开工作流编辑器,看开始节点的配置就行。

response_mode 决定响应方式,有两种选择:blocking(阻塞)和 streaming(流式)。这个概念很重要,下一节专门讲。

user 是调用者的标识,由你自己定义,只要保证在同一个应用内唯一就行。Dify 会根据这个标识隔离数据和文件,不同 user 的文件互不可见。


三、阻塞还是流式:两种响应模式的抉择

response_mode 是调用工作流时最容易纠结的参数。用点外卖来打个比方:

阻塞模式(blocking) 就像你在餐厅点餐后坐在座位上等,厨房做完一整道菜端上来,你才开始吃。好处是你拿到的是完整结果,处理起来简单;坏处是如果这道菜要做很久,你就一直干等着,中间没有任何反馈。而且 Dify 云端有 100 秒超时限制,工作流执行超过 100 秒,请求就会被中断。

流式模式(streaming) 就像吃火锅,食材一盘盘上,你边涮边吃。工作流每执行完一步就给你推一个事件,你可以实时看到进度,LLM 生成的内容也能逐字输出,体验更流畅。而且流式模式没有超时限制,长时间运行的工作流也能正常跑完。

两种模式怎么选?一个简单的原则:工作流执行快(几秒内)、不需要实时展示进度,用阻塞模式;工作流耗时长、或者需要把 LLM 的输出逐字展示给用户,用流式模式。

流式模式的调用只需要把 response_mode 改成 "streaming",其他参数不变:

1
2
3
4
5
6
7
8
9
10
11
12
13
curl --request POST \
--url http://你的服务地址/v1/workflows/run \
--header 'Authorization: Bearer {你的API_KEY}' \
--header 'Content-Type: application/json' \
--data '{
"inputs": {
"name": "张三",
"age": 28,
"gender": "男"
},
"response_mode": "streaming",
"user": "user_001"
}'

但流式模式的响应格式和阻塞模式完全不同,需要专门处理,后面会详细讲。


四、文件上传:最容易出错的环节

如果你的工作流开始节点里定义了文件类型的变量,那调用时就需要传文件。这是整个 API 调用中最容易出错的环节,因为文件传法和普通变量不一样。

先理清一个前提:文件变量在 Dify 里有两种传入方式——远程 URL本地文件上传。用寄快递来比喻:远程 URL 就像你告诉快递员”去那个仓库取”,快递员自己去拿;本地文件上传就像你亲手把包裹交给快递员,他直接带走。

方式一:远程 URL

最简单的情况,如果你的文件已经有一个可访问的 URL,直接传就行:

1
2
3
4
5
6
7
8
{
"inputs": {
"name": "张三",
"word": "https://example.com/document.pdf"
},
"response_mode": "blocking",
"user": "user_001"
}

如果开始节点定义的变量类型是”文件”(而不是文本),格式要稍微变一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"inputs": {
"documents": [
{
"type": "document",
"transfer_method": "remote_url",
"url": "https://example.com/file.pdf"
}
]
},
"response_mode": "blocking",
"user": "user_001"
}

这里 type 是文件类型(document、image、audio、video),transfer_method 是传递方式,url 是文件地址。三个字段缺一不可。

方式二:本地文件上传

如果文件在本地,没有公网 URL,就需要分两步走:先上传文件拿到 ID,再在工作流调用中引用这个 ID。

第一步:上传文件。 这个接口用的是 multipart/form-data 格式,和前面的 JSON 格式不同:

1
2
3
4
curl -X POST 'http://你的服务地址/v1/files/upload' \
--header 'Authorization: Bearer {你的API_KEY}' \
--form 'file=@"/path/to/你的文件.docx"' \
--form 'user=user_001'

注意 user 的值要和后续调用工作流时传的 user 一致,否则 Dify 找不到这个文件。

上传成功后,返回里最重要的就是文件 ID:

1
2
3
4
5
6
{
"id": "0b6d1f80-4e90-463e-9889-d9b600ca8aac",
"name": "word表格读取测试.docx",
"size": 16708,
"extension": "docx"
}

第二步:在工作流调用中引用文件 ID。 把上传返回的 id 填到 upload_file_id 里:

1
2
3
4
5
6
7
8
9
10
11
{
"inputs": {
"word": {
"type": "document",
"transfer_method": "local_file",
"upload_file_id": "0b6d1f80-4e90-463e-9889-d9b600ca8aac"
}
},
"response_mode": "blocking",
"user": "user_001"
}

这里最容易犯的错有两个:一是变量名和开始节点定义的不一致,二是 user 和上传文件时的不一致。两个都会导致文件找不到,调用失败。

多个文件怎么传

如果工作流接受文件列表,把多个文件对象放在数组里就行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
{
"inputs": {
"documents": [
{
"type": "document",
"transfer_method": "remote_url",
"url": "https://example.com/file1.pdf"
},
{
"type": "image",
"transfer_method": "local_file",
"upload_file_id": "0b6d1f80-4e90-463e-9889-d9b600ca8aac"
}
]
},
"response_mode": "blocking",
"user": "user_001"
}

远程 URL 和本地文件可以混用,每个文件独立指定类型和传递方式。

文件类型支持哪些格式

type 字段决定了 Dify 怎么处理这个文件,不同类型支持的格式也不同:document 类型支持 TXT、PDF、DOCX、XLSX、PPTX 等文档格式;image 类型支持 JPG、PNG、GIF、WEBP 等图片格式;audio 类型支持 MP3、WAV、M4A 等音频格式;video 类型支持 MP4、MOV 等视频格式。如果文件格式不在上述范围内,可以用 custom 类型。


五、响应里有什么:读懂返回结果

请求发出去了,工作流跑完了,返回的结果长什么样?这取决于你选的响应模式。

阻塞模式的响应

阻塞模式返回的是一个完整的 JSON,包含工作流的执行结果和元信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"task_id": "d291496b-702b-4b98-8d6c-a03f3da36c54",
"workflow_run_id": "0b919f8b-4d12-4ad7-991c-09004090ed59",
"data": {
"id": "0b919f8b-4d12-4ad7-991c-09004090ed59",
"workflow_id": "0c7b25cf-2c99-486b-aa41-a4d22a489f26",
"status": "succeeded",
"outputs": {
"person": {"name": "张三", "age": 28, "gender": "男"},
"doc": "这是文档内容"
},
"error": null,
"elapsed_time": 2.69,
"total_tokens": 150,
"total_steps": 5
}
}

最核心的字段是 data.outputs,这就是工作流的输出结果,结构和你在工作流里定义的输出变量一致。data.status 告诉你执行状态:succeeded 是成功,failed 是失败,失败时看 data.error 里的错误信息。data.elapsed_timedata.total_tokens 分别是执行耗时和 Token 消耗,做性能优化时很有用。

顶层的 task_idworkflow_run_id 是这次执行的身份标识,需要停止任务或查询执行详情时会用到。

流式模式的响应

流式模式返回的不是一次性 JSON,而是一连串 SSE(Server-Sent Events)事件。每个事件都是一行 data: {...} 格式的数据,工作流每走一步就推一个事件过来:

1
2
3
4
5
6
7
8
9
data: {"event": "workflow_started", "task_id": "xxx", "data": {...}}

data: {"event": "node_started", "data": {"node_type": "start", "title": "Start", ...}}

data: {"event": "node_finished", "data": {"node_type": "llm", "outputs": {...}, ...}}

data: {"event": "text_chunk", "data": {"text": "你好"}}

data: {"event": "workflow_finished", "data": {"status": "succeeded", "outputs": {...}}}

这些事件类型就像工作流执行过程的”实况转播”:workflow_started 是开场哨,node_startednode_finished 是每个节点的进出记录,text_chunk 是 LLM 正在生成的文字片段,workflow_finished 是终场哨。

其中最常用的是 text_chunkworkflow_finished:前者让你实现逐字输出的打字机效果,后者给你最终的完整结果。如果你想调试某个节点的输入输出,就关注 node_finished 事件,它里面包含该节点的完整输入和输出。


六、用 Python 跑起来

前面用 curl 演示了各种调用方式,实际开发中肯定是用代码来调。下面给出两个 Python 示例,分别对应阻塞模式和流式模式。

阻塞模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import requests

API_KEY = "your_api_key_here"
API_URL = "http://你的服务地址/v1/workflows/run"

def call_workflow(name, age, gender):
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}

payload = {
"inputs": {"name": name, "age": age, "gender": gender},
"response_mode": "blocking",
"user": "user_python_client"
}

response = requests.post(API_URL, headers=headers, json=payload)

if response.status_code == 200:
result = response.json()
if result["data"]["status"] == "succeeded":
return result["data"]["outputs"]
else:
print(f"工作流执行失败: {result['data']['error']}")
return None
else:
print(f"请求失败: {response.status_code}, {response.text}")
return None

# 调用
result = call_workflow("张三", 28, "男")
print(result)

流式模式

流式模式的关键是逐行读取 SSE 事件,根据事件类型做不同处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import requests
import json

API_KEY = "your_api_key_here"
API_URL = "http://你的服务地址/v1/workflows/run"

def call_workflow_streaming(name, age, gender):
headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}

payload = {
"inputs": {"name": name, "age": age, "gender": gender},
"response_mode": "streaming",
"user": "user_streaming_client"
}

response = requests.post(API_URL, headers=headers, json=payload, stream=True)

for line in response.iter_lines():
if not line:
continue
line_str = line.decode("utf-8")
if not line_str.startswith("data: "):
continue

data = json.loads(line_str[6:])
event = data.get("event")

if event == "text_chunk":
# 逐字输出 LLM 生成的内容
print(data["data"]["text"], end="", flush=True)
elif event == "workflow_finished":
# 工作流执行完毕,拿到最终结果
print("\n\n工作流执行完成!")
print(f"最终输出: {data['data']['outputs']}")
break

# 调用
call_workflow_streaming("张三", 28, "男")

流式处理的套路是固定的:用 stream=True 打开流式连接,逐行读取,跳过空行和非 data: 开头的行,解析 JSON 后按 event 类型分发处理。上面的代码只处理了 text_chunkworkflow_finished 两种事件,如果你需要跟踪每个节点的执行情况,加上 node_startednode_finished 的处理逻辑就行。

带文件上传的调用

如果工作流需要文件输入,先上传再调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def upload_file(file_path, user):
"""上传本地文件,返回文件 ID"""
upload_url = "http://你的服务地址/v1/files/upload"
headers = {"Authorization": f"Bearer {API_KEY}"}

with open(file_path, "rb") as f:
files = {"file": f}
data = {"user": user}
response = requests.post(upload_url, headers=headers, files=files, data=data)

if response.status_code == 201:
return response.json()["id"]
else:
print(f"上传失败: {response.text}")
return None

def call_workflow_with_file(name, file_path):
user = "user_001"
file_id = upload_file(file_path, user)
if not file_id:
return None

headers = {
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"
}

payload = {
"inputs": {
"name": name,
"word": {
"type": "document",
"transfer_method": "local_file",
"upload_file_id": file_id
}
},
"response_mode": "blocking",
"user": user
}

response = requests.post(API_URL, headers=headers, json=payload)
return response.json()["data"]["outputs"] if response.status_code == 200 else None

# 调用
result = call_workflow_with_file("张三", "/path/to/document.docx")
print(result)

七、其他常用接口

除了核心的工作流调用接口,Dify 还提供了几个辅助接口,在特定场景下会用到。

停止正在执行的工作流POST /workflows/tasks/{task_id}/stop。只对流式模式有效,task_id 从流式事件的返回中获取。比如你发现工作流跑偏了,想中途叫停,就用这个接口。

获取工作流执行详情GET /workflows/runs/{workflow_run_id}。工作流跑完了想回看某次执行的详细输入输出,用这个接口查。workflow_run_id 从调用时的返回中获取。

获取工作流参数信息GET /parameters。返回工作流的输入参数名称、类型和默认值。在动态构建调用界面时很有用,不用硬编码参数名。

获取工作流日志GET /workflows/logs。查看工作流的历史执行记录,按时间倒序返回。

这几个接口的调用方式和主接口一样,带上 Authorization 头就行,这里不再赘述。


八、常见错误与踩坑指南

调用 API 的过程中,有几个错误特别常见,提前了解能少走弯路。

400 - invalid_param:参数有问题。最常见的原因是 inputs 里的变量名和开始节点定义的不一致,或者必填参数漏了。检查变量名时,直接打开工作流编辑器对照开始节点的配置。

400 - provider_not_initialize:模型凭据没配置。工作流里用到了某个模型(比如 GPT-4),但你的 Dify 里没配这个模型的 API Key。去”模型供应商”页面检查一下。

400 - provider_quota_exceeded:模型调用额度用完了。充值或者换一个有额度的模型。

401:API Key 不对或过期。重新确认 Key 是否正确,注意区分工作流 Key 和知识库 Key。

404:接口地址写错了,或者工作流没发布。

遇到报错时,先看 HTTP 状态码定位大类,再看返回的 error 字段里的具体信息,基本就能找到原因。


九、几个实用建议

关于 API 地址:文中的例子用的是 http://你的服务地址/v1。如果你用的是 Dify 云端,地址是 https://api.dify.ai/v1;如果是本地部署,换成你自己的服务地址。路径里的 /v1 不能少。

关于错误处理:生产环境调用时,务必加上超时和重试。网络抖动、服务短暂不可用都是常有的事,裸调用一旦超时整个链路就断了。Python 里用 requests 时可以设 timeout 参数,配合简单的重试逻辑就行。

关于并发:批量调用时控制好并发量,别一次性发太多请求把服务压垮。尤其是涉及 LLM 调用的工作流,每个请求都要消耗模型 Token,并发太高容易触发限流。

关于调试:调不通的时候,先在 Dify 界面上手动跑一遍工作流,确认工作流本身没问题;再用 curl 发一个最简单的请求,排除代码层面的干扰;最后逐步加上文件上传、流式处理等复杂逻辑。分步排查比一口气写完再调效率高得多。


写在最后

Dify 工作流 API 的核心其实就是一件事:POST /v1/workflows/run。所有复杂度都集中在这个接口的请求体里——输入变量怎么传、文件怎么上传、响应模式怎么选。理清了这三个问题,调用就不难了。

建议你照着这篇笔记,用自己的工作流和 API Key 把流程跑一遍:先发一个最简单的阻塞请求,确认能跑通;再试流式模式,感受逐字输出的效果;最后加上文件上传,把完整链路走通。亲手操作一遍,比看十遍文档都管用。

如果遇到接口参数不确定的情况,最权威的参考永远是官方文档,不同版本的 Dify 接口可能有细微差异,以你实际使用的版本为准。