跳转至

2.4 LangGraph 流式输出

在之前的文章中,我们一直在使用 app.invoke(inputs) 方法执行工作流,invoke 方法会把整个流的最终执行结果一次性的返回给调用端,如果工作流执行时间很长,用户需要等待的时间就会很长,体验会很差,目前主流的大模型都会提供SSE流式输出接口,LangChain 本身对于流式输出做了很好的封装,本文我们来讨论一下在LangGraph 中如何使用流式输出。

一、普通的流式输出

from langgraph.graph import add_messages, StateGraph, START, END  
from langchain_core.messages import ToolMessage, HumanMessage, AIMessage, BaseMessage  
from typing import Type, TypedDict, Annotated, List  
from langchain_openai import ChatOpenAI  

llm = ChatOpenAI(  
    model_name="qwen-turbo",  
    temperature=0.7,  
    max_tokens=1024,  
    top_p=1,  
    openai_api_key="sk-xxxx",  
    openai_api_base="https://dashscope.aliyuncs.com/compatible-mode/v1"  
)  


class State(TypedDict):  
    messages: Annotated[List[BaseMessage], add_messages]  


class TestNode:  
    def __init__(self, msg: str):  
        self.msg = msg  

    def __call__(self, state: State):  
        return {"messages": self.msg}  


def chatbot(state: State):  
    return {"messages": [llm.invoke(state["messages"])]}  


graph_builder = StateGraph(State)  

graph_builder.add_node("chatbot", chatbot)  
graph_builder.add_node("n2", TestNode("很高兴认识你"))  

graph_builder.add_edge(START, "chatbot")  
graph_builder.add_edge("chatbot", "n2")  
graph_builder.add_edge("n2", END)  
app = graph_builder.compile()  
app.get_graph().draw_mermaid_png(output_file_path="graph_tool.png")  

inputs = {"messages": [HumanMessage(content="你好")]}  
result = app.invoke(inputs)  
for i in result.get("messages"):  
    i.pretty_print()

image.png

1
2
3
result = app.invoke(inputs)  
for i in result.get("messages"):  
    i.pretty_print()

代码使用 result = app.invoke(inputs) ,result 为整个工作流最终的结果,也就是上面定义的 State 对象。

class State(TypedDict):  
    messages: Annotated[List[BaseMessage], add_messages]  

State 对象有个 messages 属性,我们使用for 循环来遍历 messages 中的元素,得到的输出为

1
2
3
4
5
6
7
8
9
================================ Human Message =================================

你好
================================== Ai Message ==================================

你好!很高兴为你提供帮助。
================================ Human Message =================================

很高兴认识你

pretty_print() 为langchain 中封装的方法,可以将大模型中的消息打印的相对好看一些。

通过上面代码结果,可以看到,最终的结果需要所有的节点都执行完毕才返回。接下来我们看一看如何使用流式输出,一步一步的将每个节点的输出依次打印。

1.1 使用stream进行流式调用

1
2
3
inputs = {"messages": [HumanMessage(content="你好")]}  
for i in app.stream(inputs):  
    print(i)

将上面的 result = app.invoke(inputs) 修改为 app.stream(inputs) ,这个方法会返回一个迭代器,Iterator[Union[dict[str, Any], Any]], 我们可以遍历这个迭代器来分别打印每个节点的结果输出。

1
2
3
4
5
# AIMessage 本身会有很多属性,这里用不到,先省略,使用 ... 代替,后面的代码同理
{'chatbot': {'messages': [AIMessage(content='你好!很高兴为你提供帮助。'...]}} 


{'n2': {'messages': '很高兴认识你'}}

通过返回结果,我们可以看出,返回的迭代对象,每个元素为一个字典对象,字典的key 为节点的 name, 值为该节点返回的内容。注意,这里只会返回 State 中定义的属性,如果节点中返回了额外的值,这里是不会返回的,如

1
2
3
4
5
6
class TestNode:  
    def __init__(self, msg: str):  
        self.msg = msg  

    def __call__(self, state: State):  
        return {"messages": self.msg, "timestamp": time.time()}

假如 TestNode 中返回了 timestampapp.stream(inputs) 的返回值中也不会包含 timestamp 值的。

1.2 StreamMode 不同参数的含义

stream 方法还有一个 stream_mode 参数,该参数有以下几个值可选

  • 'values': Emit all values of the state for each step.
  • 'updates': Emit only the node name(s) and updates
    that were returned by the node(s) after each step.- 'debug': Emit debug events for each step.
  • 'messages': Emit LLM messages token-by-token.
  • 'custom': Emit custom output write: StreamWriter kwarg of each node.

我们来分别看看各值的效果

values

values 为当每个节点执行结束以后,State 对象更新以后的值,如上面的State定义

class State(TypedDict):  
    messages: Annotated[List[BaseMessage], add_messages]

State 中有个messages 属性,这个属性的更新方法为 add_messages, 当__START__ 节点执行结束以后,__START__ 节点可以理解为调用 app.stream(inputs) 以后,当首次传入 inputs 以后,此时的State 对象经过 add_messages 方法更新以后的对象。此时State对象为

{'messages': [HumanMessage(content='你好', additional_kwargs={}, response_metadata={}, id='20122092-434c-42c5-a127-145a2c0540a6')]}

之后执行 chatbot 节点,该节点调用大模型,

def chatbot(state: State):  
    return {"messages": [llm.invoke(state["messages"])]}

返回大模型的回复,经过 add_messages 以后,将大模型回复的内容追加到 State 对象的 messages 中,此时的State对象为:(此处打印进行了省略和格式化)

1
2
3
4
5
6
7
{'messages': 
    [
        HumanMessage(content='你好', additional_kwargs={}, ...), 

        AIMessage(content='你好!有什么我能帮助你的吗?', ...})
    ]
}

该对象的messages 属性中此时包含两个元素 1. __START__ 节点用户输入的内容 HumanMessage(content='你好', ...) 2. chatbot 节点大模型返回的内容 AIMessage(content='你好!有什么我能帮助你的吗?' ...)

同理,在经过第三个节点 n2 以后,此时State对象的messages 为

  1. __START__ 节点用户输入的内容 HumanMessage(content='你好', ...)
  2. chatbot 节点大模型返回的内容 AIMessage(content='你好!有什么我能帮助你的吗?' ...)
  3. n2 节点返回的内容 HumanMessage(content='很高兴认识你', ...)

整个graph 的运行结果为, (做了简化处理)

{'messages': [
    HumanMessage(content='你好', ...)
]}


{'messages': [
    HumanMessage(content='你好', ...), 
    AIMessage(content='你好!有什么我能帮助你的吗?', ...)
]}


{'messages': [
    HumanMessage(content='你好', ...), 
    AIMessage(content='你好!有什么我能帮助你的吗?', ...), 
    HumanMessage(content='很高兴认识你', ...)
]}

updates

updates 为更新,这种模式下,会显示哪个节点更新了哪些内容,但是并不会将State所有的值返回,只返回更新的内容。

1
2
3
4
{'chatbot': {'messages': [AIMessage(content='你好!很高兴为你提供帮助。'...)]}}


{'n2': {'messages': '很高兴认识你'}}

第一次经过 __START__ 节点时,由于此时并没有更新操作,所以执行完 __START__ 节点以后, 不会有什么输出。

执行完 chatbot 节点以后,return {"messages": [llm.invoke(state["messages"])]} , 返回了大模型的输出,此时是需要更新State 中的 messages 的,更新的内容为 {'messages': [AIMessage(content='你好!很高兴为你提供帮助。'...)]}, 也就是chatbot 节点返回的内容。

同理执行完 n2 节点以后,返回的是 {'messages': '很高兴认识你'}

updates 模式下,可以理解为返回的是需要更新State的内容或者节点本身返回的内容。

实际应用时主要用到的是 values 和 updates ,还有几个模式,我们简单看一下

debug

debug 模式,会将每个节点执行时的输入和输出返回,以及一些运行时的信息,方便我们进行调试

{
    'type': 'task', 
    'timestamp': '2024-12-24T03:37:21.438331+00:00', 
    'step': 1, 
    'payload': {
        'id': '76b98a43-0d50-f270-b5c5-c09296f0b1ea', 
        'name': 'chatbot', 
        'input': {
            'messages': [
                HumanMessage(content='你好', ...)
            ]
        }, 
        'triggers': ['start:chatbot']
    }
}


{
    'type': 'task_result', 
    'timestamp': '2024-12-24T03:37:22.169144+00:00', 
    'step': 1, 
    'payload': {
        'id': '76b98a43-0d50-f270-b5c5-c09296f0b1ea', 
        'name': 'chatbot', 
        'error': None, 
        'result': [
            ('messages', [
                AIMessage(content='你好!很高兴为你提供帮助。', ...)
            ])
        ], 
        'interrupts': []
    }
}
...

messages

messages 模式下,只会将大模型进行token_by_token流式返回

(
    AIMessageChunk(content='', ...), 
    {
        'langgraph_step': 1, 
        'langgraph_node': 'chatbot', 
        'langgraph_triggers': ['start:chatbot'], 
        'langgraph_path': ('__pregel_pull', 'chatbot'), 
        'langgraph_checkpoint_ns': 'chatbot:64c7c860-f07b-e943-95ca-e576e3984f88',
        'checkpoint_ns': 'chatbot:64c7c860-f07b-e943-95ca-e576e3984f88', 
        'ls_provider': 'openai', 
        'ls_model_name': 
        'qwen-turbo', 
        'ls_model_type': 
        'chat', 
        'ls_temperature': 0.7, 
        'ls_max_tokens': 1024
    }
)


(
    AIMessageChunk(content='你好', ...), 
    {...}
)

(
    AIMessageChunk(content='!', ...), 
    {...}
)

(
    AIMessageChunk(content='很高兴', ...), 
    {...}
)

...

custom

custom 为自定义消息事件,后面我们再详细讲解。

stream_mode 总结

  1. values 为每个节点执行完毕以后,State 对象更新以后的值
  2. updates 为每个节点执行完毕以后,需要更新的内容或者可以理解为节点返回的内容,但是只包含State 中定义的属性。
  3. debug 模式会将节点执行的输入和输出返回
  4. messages 模式只会返回大模型的流式输出

常用的模式为 updates和values, 如果我们只关心每个节点执行结束以后State 的值,那么需要使用 values 模式,如果想要获取每个节点的返回值,则需要使用 updates。

另外,stream_mode 也可以是列表形式,比如你既关心每个节点的返回,也关心State 更新以后的值,

for i in app.stream(inputs, stream_mode=["updates", "values"]):  
    print(i, "\n\n")
1
2
3
4
5
6
('values', {'messages': [HumanMessage(content='你好', ...)]})


('updates', {'chatbot': {'messages': [AIMessage(content='你好!很高兴为你提供帮助。'...)]}})

...

这种模式下,每个流式数据为一个tuple 1. 第一个元素为事件(模式)名称,如上面的 updates 或者 values 2. 第二个元素为事件的数据,和上面介绍的数据一样的。

stream_mode 默认为 updates, 虽然查看源代码,注释中有提到说默认是 values, 但是从代码的执行结果来看,默认的行为是 updates。

二、大模型的流式输出

上面介绍了使用 stream 方法来进行节点流式输出,并且通过 stream_mode 来控制流式输出的模式,LangGraph 作为一个大模型应用开发框架,我们会使用它开发很多基于LLM 的应用,这些应用不免会使用到大模型,为了提升用户体验,一般大模型会提供流式的输出。

通过上面的章节,stream_mode 为 messages 时,虽然可以将大模型进行 token_by_token 的输出,但是却不能同时输出节点的更新或者State 的values, 当然, 我们可以使用stream_mode=["updates", "messages"] 的方式来同时输出。

1
2
3
4
5
6
7
8
inputs = {"messages": [HumanMessage(content="你好")]}  
for i in app.stream(inputs, stream_mode=["updates", "messages"]):  
    if event == "messages":  
        message_data, graph_data = data  
        print(event, {graph_data.get("langgraph_node"): message_data.to_json()})
    else:  
        print(event, data)  
    print("\n")
messages {'chatbot': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessageChunk'], 'kwargs': {'content': '', 'type': 'AIMessageChunk', 'id': 'run-b70f5aa4-46b9-4470-a485-c3230437ffe7', 'tool_calls': [], 'invalid_tool_calls': []}}}



messages {'chatbot': {'lc': 1, 'type': 'constructor', 'id': ['langchain', 'schema', 'messages', 'AIMessageChunk'], 'kwargs': {'content': '你好', 'type': 'AIMessageChunk', 'id': 'run-b70f5aa4-46b9-4470-a485-c3230437ffe7', 'tool_calls': [], 'invalid_tool_calls': []}}}

...

updates {'chatbot': {'messages': [AIMessage(content='你好!有什么我能帮助你的吗?', additional_kwargs={}, response_metadata={'finish_reason': 'stop', 'model_name': 'qwen-turbo'}, id='run-b70f5aa4-46b9-4470-a485-c3230437ffe7')]}}



updates {'n2': {'messages': '很高兴认识你'}}

使用 stream_mode 为 ["updates", "messages"] 可以同时返回大模型的token_by_token 输出以及节点执行结束以后需要updates的输出。此方式虽然可以暂时解决同时输出大模型流式和节点流式,但是局限性也很明显,这里推荐使用另外一个方法,astream_events, 很奇怪的是,这个方法是异步的,但是 LangChain 却没有定义 stream_events 方法,一般 LangChain 会同时定义同步和异步方法,如 invokeainvoke, streamastream, 但是这个方法只有astream_events。且官方文档中介绍,在LangGraph 中其实没有必要使用的,

Use the astream_events API to access custom data and intermediate outputs from LLM applications built entirely with LCEL.

While this API is available for use with LangGraph as well, it is usually not necessary when working with LangGraph, as the stream and astream methods provide comprehensive streaming capabilities for LangGraph graphs.

但是我觉得这个方法还是很灵活的,所以这里也简单的介绍一下它的使用方法吧。

astream_events

我们可以将State 的更新与大模型的执行等操作理解为“事件” events, 每一种操作都是一种事件,如当节点开始执行,节点执行结束,收到大模型的流式输出,调用工具等等,这些可以理解为事件,LangChain 内部定义了很多事件,详情可参考langchain官方文档 https://python.langchain.com/docs/how_to/streaming/#event-reference

当使用 astream_events,会得到很多输出,这里应该只关注我们想要的事件,比如节点执行结束,和大模型流式输出。修改一下原来的代码,改为异步调用

async def main():  
    inputs = {"messages": [HumanMessage(content="你好")]}  
    async for event in app.astream_events(inputs, version="v2"):  
        kind = event.get("event")  
        data = event.get("data")  
        name = event.get("name")  
        if name == "_write":  
            continue  
        if kind == "on_chain_end":  
            ydata = {  
                "kind": kind,  
                "name": name,  
                "data": data  
            }  
        elif kind == "on_chat_model_stream":  
            ydata = {  
                "kind": kind,  
                "name": name,  
                "node": event.get("metadata").get("langgraph_node"),  
                "data": event["data"]["chunk"].content  
            }  
        else:  
            continue  
        print(ydata)

此时,我们可以很大程度的自定义数据的输出

1
2
3
4
5
6
7
8
9
{'kind': 'on_chain_end', 'name': '__start__', 'data': {'output': {'messages': [HumanMessage(content='你好', additional_kwargs={}, response_metadata={})]}, 'input': {'messages': [HumanMessage(content='你好', additional_kwargs={}, response_metadata={})]}}}

{'kind': 'on_chat_model_stream', 'name': 'ChatOpenAI', 'node': 'chatbot', 'data': ''}
{'kind': 'on_chat_model_stream', 'name': 'ChatOpenAI', 'node': 'chatbot', 'data': '你好'}
{'kind': 'on_chat_model_stream', 'name': 'ChatOpenAI', 'node': 'chatbot', 'data': '!'}

...
{'kind': 'on_chain_end', 'name': 'chatbot', 'data': {'output': {'messages': [AIMessage(content='你好!有什么我能帮助你的吗?', ...)]}}}
{'kind': 'on_chain_end', 'name': 'n2', 'data': {'output': {'messages': '很高兴认识你'}, 'input': .....}}}

三、注意事项

需要使用 python >= 3.11 , 在 python 3.11 以下版本,stream 方法不会对大模型进行流式输出

When using python 3.8, 3.9, or 3.10, please ensure you manually pass the RunnableConfig through to the llm when invoking it like so: llm.ainvoke(..., config). The stream method collects all events from your nested code using a streaming tracer passed as a callback. In 3.11 and above, this is automatically handled via contextvar's; prior to 3.11, asyncio's tasks lacked proper contextvar support, meaning that the callbacks will only propagate if you manually pass the config through. We do this in the call_model method below.

四、总结

本文内容比较多,主要围绕LangGraph 流式输出话题进行 1. 使用 stream 方法流式输出各节点的执行结果 2. 详细介绍了 stream 方法中 stream_mode 的几个参数及其流式输出效果 3. 使用 astream_events 方法同时处理多个事件,达到即流式输出节点结果,同时流式输出大模型调用 4. 注意使用流式的话需要python >= 3.11 版本

下期预告

我们利用目前介绍的点,边,图构建一个旅行助手应用