目录

使用LangChain自定义模型的流式输出

最近使用阿里云模型服务灵积,上面有很多开源的大模型提供接口调用,阿里自家通义千问也在,申请体验了一下,结果并不那么尽如人意,参看之前的文章,本文先不讨论大模型质量问题,本文讨论一下如果通过LangChain来调用灵积接口,并且通过gradio构建一个web服务demo。

开通阿里云灵积服务,并且申请使用通义千问模型,生成API-KEY, 这些前面的文章都有介绍,耐心等待审核通过。

自定义模型

通过langchain文档, https://python.langchain.com/docs/modules/model_io/models/llms/custom_llm ,想要自定义大语言模型,需要继承langchain.llms.base.LLM 类,并且重写_llm_type , _call, _identifying_params 方法。我们先来实现一下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from langchain.llms.base import LLM  
from typing import Optional, List, Any, Mapping  
from langchain.callbacks.manager import CallbackManagerForLLMRun  
from http import HTTPStatus  
import dashscope  
from dashscope import Generation  
import json  
  
dashscope.api_key = "api-key"  
  

class DashLLM(LLM):  
    model:str = "qwen-turbo"  
  
    @property  
    def _llm_type(self) -> str:  
        return "dashllm"  
  
    def _call(  
            self,  
            prompt: str,  
            stop: Optional[List[str]] = None,  
            run_manager: Optional[CallbackManagerForLLMRun] = None,  
            **kwargs: Any,  
    ) -> str:  
        if stop is not None:  
            raise ValueError("stop kwargs are not permitted.")  
        response = Generation.call(  
            model=self.model,  
            prompt=prompt  
        )  
        if response.status_code != HTTPStatus.OK:  
            return f"请求失败,失败信息为:{response.message}"  
        return response.output.text  
  
  
    @property  
    def _identifying_params(self) -> Mapping[str, Any]:  
        """Get the identifying parameters."""  
        return {"model": self.model}  
  
if __name__ == '__main__':  
    qw = DashLLM()  
    print(qw.predict("北京有什么好吃的?"))

代码解释:

1)_llm_type 方法返回一个字符串,表示该自定义模型的名字。
2)_identifying_params 需要返回一个字典,这里还不需要返回的内容,所以先随便返回点内容。
3)_identifying_params_llm_type 需要使用@property 装饰,表示这两个方法可以是属性。
4)_call 方法,需要返回一个字符串,这里使用Generation.call方法,从灵积平台获取AI响应,其结构为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
    "status_code": 200,
    "request_id": "89858455-c1a5-9de1-916c-f43d7477a334",
    "code": "",
    "message": "",
    "output": {
        "text": "xxx",
        "finish_reason": "stop",
        "choices": null
    },
    "usage": {
        "input_tokens": 27,
        "output_tokens": 73
    }
}

response.output.text 为AI生成的内容,我们最终将这个内容返回。
5)dashscope.api_key = "api-key" 记得设置api-key。

运行上面的代码,可以得到AI的输出: 北京有很多美味的特色小吃,例如烤鸭、焦圈、豆汁、炒肝、麻豆腐、锅包肉等等。此外,北京还有各种各样的餐馆,提供各种精美的菜肴,例如烤鸭、涮羊肉、麻辣烫等等。

使用gradio构建chat聊天窗口

上面我们只能在代码上写死用户的输入,更多的时候,我们需要构建一个web应用,让用户可以在浏览器上和AI进行交流,接下来我们使用gradio的ChatInterface组件来搭建一个聊天窗口。新建gradioqwen.py 文件,写入以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langchain.llms.base import LLM  
from typing import Optional, List, Any, Mapping  
from langchain.callbacks.manager import CallbackManagerForLLMRun  
from http import HTTPStatus  
import dashscope  
from dashscope import Generation  
from langchain.schema import AIMessage, HumanMessage  
import gradio as gr  
  
dashscope.api_key = "api-key"  
  
  
class DashLLM(LLM):  
    """DashLLM 类与上面的代码是一样的,这里省略了"""
    
  
  
qwllm = DashLLM()  
  
# 注释1  
def qwen_response(message, history):  
    messages = []  
    for msg in history:  
        messages.append(HumanMessage(content=msg[0]))  
        messages.append(AIMessage(content=msg[1]))  
    messages.append(HumanMessage(content=message))  
    response = qwllm.predict_messages(messages)  
    return response.content  
  
# 注释2  
gr.ChatInterface(qwen_response).launch(server_port=8888)

代码解释:

1) DashLLM这个自定义类代码和之前是一样的,没有任何修改。
2)在注释1处,新定义了qwen_response函数,这个函数接收两个参数,第一个是本次用户提问的问题,第二个历史消息列表,格式为:

1
2
3
4
[
	["用户消息1" "AI回答1"], 
	["用户消息2" "AI回答2"]
]

3) 使用 qwllm.predict_messages 方法与AI进行交互,langchain 会将历史消息列表转换为字符串,格式为:

1
2
3
Human: 你好
AI: 你好!很高兴为你提供帮助。
Human: 北京有什么好吃的

4)最终调用qwllm._call 方法,将生成的字符串传进来,与AI进行交互。
5)在注释2处,使用gr.ChatInterface(qwen_response).launch(server_port=8888) 来启动服务,监听8888端口,gr.ChatInterface 组件传入注释1处的函数名,注意这里是函数名,不能写成qwen_response() 。

使用 python gradioqwen.py 运行脚本,运行成功以后,在浏览器打开 https://127.0.0.1:8888,即可与通义千问进行聊天了。

https://yyxbloguse.oss-cn-beijing.aliyuncs.com/img/202309151612831.png

但是还是之前的问题,这个模型对于历史对话支持不好。

https://yyxbloguse.oss-cn-beijing.aliyuncs.com/img/202309151618958.png

使用流式接口返回

先抛开聊天质量不谈,我们来看下这个聊天应用,这个应用每次都是等待AI将内容全部返回才展示到页面上,有时如果生成的内容较多,那么用户等待的时间就会比较长,此时我们需要使用流式返回,让AI一边生成内容,一边将内容通过返回,这样就会缩短用户的首次等待时长,提升一点用户体验。

在langchain的官方文档中,并没有提及如何在自定义模型上使用流式返回,我通过查看源代码发现,在自定义模型类中,需要重写_stream 方法来实现流式返回。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from langchain.llms.base import LLM  
from typing import Optional, List, Any, Mapping, Iterator  
from langchain.callbacks.manager import CallbackManagerForLLMRun  
from langchain.schema.output import GenerationChunk  
from http import HTTPStatus  
import dashscope  
from dashscope import Generation  
from langchain.schema import AIMessage, HumanMessage  
import gradio as gr  
  
dashscope.api_key = "api-key"  
  
  
class DashLLM(LLM):  
    model: str = "qwen-turbo"  
	"""_llm_type , _call, _identifying_params 这三个函数不变,省略"""

	# 注释1
    def _stream(  
            self,  
            prompt: str,  
            stop: Optional[List[str]] = None,  
            run_manager: Optional[CallbackManagerForLLMRun] = None,  
            **kwargs: Any,  
    ) -> Iterator[GenerationChunk]:  
        responses = Generation.call(  
            model=self.model,  
            prompt=prompt,  
            stream=True  
        )  
        for response in responses:  
            if response.status_code == HTTPStatus.OK:  
                print(response)  
                text = response.output.text  
                yield GenerationChunk(  
                    text=text,  
                    generation_info=response.usage  
                )  
            else:  
                yield GenerationChunk(  
                    text=f"响应失败,失败信息为: {response.message}"  
                )  
  
  
qwllm = DashLLM()  
  
  
def qwen_response(message, history):  
    messages = []  
    for msg in history:  
        messages.append(HumanMessage(content=msg[0]))  
        messages.append(AIMessage(content=msg[1]))  
    messages.append(HumanMessage(content=message)) 
    # 注释2 
    for msg in qwllm.stream(messages):  
        yield msg  
  
# 注释3  
gr.ChatInterface(qwen_response).queue().launch(server_port=8888)

代码解释:

1)在注释1处,为DashLLM类中添加 _stream 方法,这个方法用于以流式调用时自定义模型类调用的方法,这个方法需要返回GenerationChunk类型的Iterator,也就是这里需要使用yield 关键字将结果返回,注意这里不能使用return,如果使用return则循环将结束。使用Generation.call 方法时也需要添加 stream=True 参数。
2)GenerationChunk 类有两个主要的属性,text,是AI生成的内容,也是后面需要在网页上展示的,generation_info为生成的额外信息,字典类型,用户可以自己随意定义,灵积平台返回的数据格式如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
	"status_code": 200, 
	"request_id": "e609c3e4-75d2-964f-b91b-8ff13d5716bb", 
	"code": "", 
	"message": "", 
	"output": {
		"text": "你好!", 
		"finish_reason": "null", 
		"choices": null
	}, 
	"usage": {
		"input_tokens": 23, 
		"output_tokens": 3
	}
}

我们需要将GenerationChunk的text属性值设置为response.output.text值,将generation_info属性值设置为response.usage的值。最后使用yield返回。
3)在注释2处,修改之前的qwen_response函数,之前是调用qwllm.predict_messages方法,这里需要修改为 qwllm.stream 方法,注意,这里是stream方法,而不是直接调用DashLLM类中的 _stream 方法。stream方法为DashLLM父类中的方法,langchain会将messages列表自动转换为字符串,然后在父类中的stream方法会调用子类的_stream 方法,由于_stream 方法返回的是 Iterator[GenerationChunk], 所以这里也需要使用for循环来遍历输出,注意这里也是需要使用yield返回,不能使用return。
4)在注释3处,使用gr.ChatInterface(qwen_response).queue().launch(server_port=8888) 来启动服务,这里和之前非流式的调用中间多了一个.queue()

启动服务,我们就可以体验到流式打字机效果的回复了!

https://yyxbloguse.oss-cn-beijing.aliyuncs.com/img/202309151718024.gif