LangChain(七)——Callback

LangChain模块架构图 [1]

一、必不可缺的 Callback 回调系统

Callback 回调系统让我们可以连接到 LLM 应用的各个阶段,这对于日志记录、监控、流传输等非常有用。LangChain提供了一些Callback处理程序,在langchain/callbacks模块中找到。

1.1 最基本的Callback

StdOutCallbackHandler是一个最基本的处理程序,将所有事件记录到stdout,当对象上的verbose标志被设置为 true时,即使没有明确传入,StdOutCallbackHandler也会被调用。

callbacks 参数在整个 API(Model、Chain、Agent、Tool 等)的大多数对象上的两个不同位置可用。

  • 可以在构造器位置上接入 Callback(但不能跨对象使用)
  • 也可以在模块对象的 invoke() /ainvoke() / batch() / abatch() 实例方法中发起请求时绑定,但仅对该请求有效,以及它包含的所有子请求,回调通过 config 参数传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.chains import LLMChain
from langchain_openai import OpenAI
from langchain_core.prompts import PromptTemplate

handler = StdOutCallbackHandler()
llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")

# 初始化chain时设置
chain = LLMChain(llm=llm, prompt=prompt, callbacks=[handler])
chain.invoke({"number":2})

# verbose=true
chain = LLMChain(llm=llm, prompt=prompt, verbose=True)
chain.invoke({"number":2})

# 调用是设置
chain = LLMChain(llm=llm, prompt=prompt)
chain.invoke({"number":2}, {"callbacks":[handler]})


config = {
'callbacks' : [handler]
}

chain = prompt | chain
chain.invoke({"number":2}, config=config)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

> Entering new LLMChain chain...
Prompt after formatting:
1 + 2 =

> Finished chain.


> Entering new LLMChain chain...
Prompt after formatting:
1 + 2 =

> Finished chain.


> Entering new LLMChain chain...
Prompt after formatting:
1 + 2 =

> Finished chain.


> Entering new RunnableSequence chain...


> Entering new PromptTemplate chain...

> Finished chain.


> Entering new LLMChain chain...
Prompt after formatting:
1 + text='1 + 2 = ' =

> Finished chain.

> Finished chain.
{'number': StringPromptValue(text='1 + 2 = '), 'text': "') \n\n\n1 + 2 = 3"}

1.2 异步回调

当使用异步API时,建议使用AsyncCallbackHandler以避免阻塞运行循环。异步方法运行llm/chain/tool/agent时使用同步CallbackHandler,也是可以运行的。但是如果你的CallbackHandler不是线程安全的,就会导致问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import asyncio
from typing import Any, Dict, List

from langchain.callbacks.base import AsyncCallbackHandler, BaseCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_core.outputs import LLMResult
from langchain_openai import ChatOpenAI


class MyCustomSyncHandler(BaseCallbackHandler):
def on_llm_new_token(self, token: str, **kwargs) -> None:
print(f"Sync handler being called in a `thread_pool_executor`: token: {token}")


class MyCustomAsyncHandler(AsyncCallbackHandler):
"""Async callback handler that can be used to handle callbacks from langchain."""

async def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when chain starts running."""
print("zzzz....")
await asyncio.sleep(0.3)
class_name = serialized["name"]
print("Hi! I just woke up. Your llm is starting")

async def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when chain ends running."""
print("zzzz....")
await asyncio.sleep(0.3)
print("Hi! I just woke up. Your llm is ending")


# To enable streaming, we pass in `streaming=True` to the ChatModel constructor
# Additionally, we pass in a list with our custom handler
chat = ChatOpenAI(
max_tokens=25,
streaming=True,
callbacks=[MyCustomSyncHandler(), MyCustomAsyncHandler()],
)

await chat.agenerate([[HumanMessage(content="Tell me a joke")]])
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
zzzz....
Hi! I just woke up. Your llm is starting
Sync handler being called in a `thread_pool_executor`: token:
Sync handler being called in a `thread_pool_executor`: token: Why
Sync handler being called in a `thread_pool_executor`: token: couldn
Sync handler being called in a `thread_pool_executor`: token: 't
Sync handler being called in a `thread_pool_executor`: token: the
Sync handler being called in a `thread_pool_executor`: token: bicycle
Sync handler being called in a `thread_pool_executor`: token: find
Sync handler being called in a `thread_pool_executor`: token: its
Sync handler being called in a `thread_pool_executor`: token: way
Sync handler being called in a `thread_pool_executor`: token: home
Sync handler being called in a `thread_pool_executor`: token: ?
Sync handler being called in a `thread_pool_executor`: token: Because
Sync handler being called in a `thread_pool_executor`: token: it
Sync handler being called in a `thread_pool_executor`: token: lost
Sync handler being called in a `thread_pool_executor`: token: its
Sync handler being called in a `thread_pool_executor`: token: bearings
Sync handler being called in a `thread_pool_executor`: token: !
Sync handler being called in a `thread_pool_executor`: token:
zzzz....
Hi! I just woke up. Your llm is ending

1.3 自定义多callback handlers(写入文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from typing import Any, Dict, List, Union,Optional, TextIO, cast

from langchain.agents import AgentType, initialize_agent, load_tools
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.agents import AgentAction
from langchain_openai import OpenAI
from langchain_core.utils.input import print_text

from loguru import logger

logfile = "output.log"
logger.add(logfile, colorize=True, enqueue=True)


# 自定义CallbackHandler
class MyCustomHandlerOne(BaseCallbackHandler):
def __init__(
self, filename: str, mode: str = "a", color: Optional[str] = None
) -> None:
"""Initialize callback handler."""
self.file = cast(TextIO, open(filename, mode, encoding="utf-8"))
self.color = color

def __del__(self) -> None:
"""Destructor to cleanup when done."""
self.file.close()

def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
print_text( f"on_llm_start {serialized['name']}",end="\n", file=self.file)

def on_llm_new_token(self, token: str, **kwargs: Any) -> Any:
print_text(f"on_new_token {token}",end="\n", file=self.file)

def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> Any:
"""Run when LLM errors."""

def on_chain_start(
self, serialized: Dict[str, Any], inputs: Dict[str, Any], **kwargs: Any
) -> Any:
print_text(f"on_chain_start {serialized['name']}",end="\n", file=self.file)

def on_tool_start(
self, serialized: Dict[str, Any], input_str: str, **kwargs: Any
) -> Any:
print_text(f"on_tool_start {serialized['name']}",end="\n", file=self.file)

def on_agent_action(self, action: AgentAction, **kwargs: Any) -> Any:
print_text(f"on_agent_action {action}",end="\n", file=self.file)


class MyCustomHandlerTwo(BaseCallbackHandler):

def __init__(
self, filename: str, mode: str = "a", color: Optional[str] = None
) -> None:
"""Initialize callback handler."""
self.file = cast(TextIO, open(filename, mode, encoding="utf-8"))
self.color = color

def __del__(self) -> None:
"""Destructor to cleanup when done."""
self.file.close()

def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> Any:
print_text(f"on_llm_start (I'm the second handler!!) {serialized['name']}",end="\n", file=self.file)


handler1 = MyCustomHandlerOne(logfile)
handler2 = MyCustomHandlerTwo(logfile)

# callbacks handler2 只记录LLM
llm = OpenAI(temperature=0, streaming=True, callbacks=[handler2])
tools = load_tools(["llm-math"], llm=llm)
agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION)

config = {
'callbacks' : [handler1]
}
# Callbacks handler1 记录invoke整个过程
agent.invoke("What is 2 raised to the 0.235 power?", config=config)

输出

1
2
{'input': 'What is 2 raised to the 0.235 power?',
'output': '1.1769067372187674'}

查看文件内容

1
2
3
4
5
6
7
8
9
10
from ansi2html import Ansi2HTMLConverter
from IPython.display import HTML, display

with open("output.log", "r") as f:
content = f.read()

conv = Ansi2HTMLConverter()
html = conv.convert(content, full=True)

display(HTML(html))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
on_chain_start AgentExecutor
on_chain_start LLMChain
on_llm_start OpenAI
on_llm_start (I'm the second handler!!) OpenAI
on_new_token I
on_new_token should
on_new_token use
on_new_token a
on_new_token calculator
on_new_token to
on_new_token solve
on_new_token this
on_new_token .

on_new_token Action
on_new_token :
on_new_token Calculator
on_new_token

on_new_token Action
on_new_token Input
on_new_token :
on_new_token
on_new_token 2
on_new_token ^
on_new_token
on_new_token 0
on_new_token .
on_new_token 235
on_new_token

on_new_token
on_agent_action tool='Calculator' tool_input='2 ^ 0.235\n' log=' I should use a calculator to solve this.\nAction: Calculator\nAction Input: 2 ^ 0.235\n'
on_tool_start Calculator
on_chain_start LLMMathChain
on_chain_start LLMChain
on_llm_start OpenAI
on_llm_start (I'm the second handler!!) OpenAI
on_new_token

on_new_token ```text
on_new_token

on_new_token 2
on_new_token **
on_new_token
on_new_token 0
on_new_token .
on_new_token 235
on_new_token

on_new_token ```

on_new_token ...
on_new_token num
on_new_token expr
on_new_token .evaluate
on_new_token ("
on_new_token 2
on_new_token **
on_new_token
on_new_token 0
on_new_token .
on_new_token 235
on_new_token ")
on_new_token ...

on_new_token
on_chain_start LLMChain
on_llm_start OpenAI
on_llm_start (I'm the second handler!!) OpenAI
on_new_token I
on_new_token now
on_new_token know
on_new_token the
on_new_token final
on_new_token answer
on_new_token .

on_new_token Final Answer:
on_new_token
on_new_token 1.
on_new_token 1769067372187674

更多Callback说明请参考:https://python.langchain.com/docs/modules/callbacks/

二、LangSmith

LangSmith 是一个用于调试、测试、评估和监控大语言模型(LLM)应用程序的统一平台,由 LangChain 公司推出。

LangSmith 平台[2]

LangSmith平台功能概览

先去LangSmith官网申请API_KEY,申请过后会有一些免费额度。然后通过环境变量可以接入LangSmith

1
2
3
4
import os
os.environ["LANGCHAIN_TRACING_V2"] = "false"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.langchain.plus"
os.environ["LANGCHAIN_API_KEY"] = "......" # replace dots with your api key

然后,就可以执行LangChain对应的代码,之后就会在LangSmith生成对应的Trace,Project默认为default:

LangSmith官网预览

LangSmith 核心功能

  • 调试:LangSmith 可以查看事件链中的每个步骤的模型输入输出。这可以方便地试验新链和新提示,找到问题根源,如意外结果、错误或延迟。同时可以查看延迟和 Token 使用情况来定位调用性能问题。

  • 测试:LangSmith 可以跟踪数据样本或上传自定义数据集。然后可以针对数据集运行链和提示,手动检查输入输出或者自动化测试。许多团队发现手工检查有助建立对 LLM 交互的直观感受,从而提出更好的优化思路。

  • 评估:LangSmith 无缝集成开源评估模块,支持规则评估和 LLM 自评估。LLM 辅助评估有潜力大幅降低成本。

  • 监控:LangSmith 可以主动跟踪性能指标、模型链性能、调试问题、用户交互体验等,从而持续优化产品。

  • 统一平台:LangSmith 整合上述功能,让团队无需组装各种工具组合,可以集中在核心应用创造上。

更多功能,请参考官方文档:https://docs.smith.langchain.com/

三、 LangFuse

Langfuse 是一个开源的 LLM 工程平台,可帮助团队协作调试、分析和迭代其 LLM 应用程序。

使用Langfuse,可以去申请官方搭建好的平台,也可以自己搭建平台,通过以下方法搭建:

1
2
3
4
5
6
# Clone the Langfuse repository
git clone https://github.com/langfuse/langfuse.git
cd langfuse

# Start the server and database
docker compose up

如有报错,可将 docker-compose.yml做如下修改试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
version: "3.5"

services:

db:
image: postgres
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
ports:
- 5432:5432
volumes:
- database_data:/var/lib/postgresql/data

langfuse-server:
image: ghcr.io/langfuse/langfuse:latest
depends_on:
- db
ports:
- "3000:3000"
environment:
- DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres
- NEXTAUTH_SECRET=mysecret
- SALT=mysalt
- NEXTAUTH_URL=http://localhost:3000
- TELEMETRY_ENABLED=${TELEMETRY_ENABLED:-true}
- LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES=${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-false}

volumes:
database_data:
driver: local

启动完成后,在浏览器中打开 http://localhost:3000,注册帐号并登录。然后新建一个project,在新建的project下的setting里新建一个API KEY,复制对应代码

1
2
3
4
5
6
from langfuse.callback import CallbackHandler
langfuse_handler = CallbackHandler(
public_key="********************",
secret_key="*******************",
host="http://localhost:3000"
)

或者

1
2
3
os.environ["LANGFUSE_PUBLIC_KEY"] = "*******************"
os.environ["LANGFUSE_SECRET_KEY"] = "*******************"
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"

运行代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langchain_core.callbacks import StdOutCallbackHandler
from langchain.chains import LLMChain
from langchain_openai import OpenAI
from langchain_core.prompts import PromptTemplate
from langfuse.callback import CallbackHandler


langfuse_handler = CallbackHandler()
llm = OpenAI()
prompt = PromptTemplate.from_template("1 + {number} = ")

config = {
'callbacks' : [langfuse_handler]
}

chain = prompt | chain
chain.invoke({"number":2}, config=config)

然后再去 http://localhost:3000,在Tracing下的Traces下就可以看到对应的Trace:

Langfuse预览

Langfuse 核心功能

  • 开发:方便的观测和日志调试能力,使得开发者能够观察并调试应用程序,管理和部署提示信息。

  • 监控与分析:提供应用使用情况的洞见,包括成本、延迟和质量的追踪,以及评分和用户反馈的收集。

  • 测试:在发布新版本前,能够测试应用行为和性能,帮助开发者确保新版本的稳定性。

更多功能,请参考官方文档:https://langfuse.com/docs

四、LangServer

LangServe 用于将 Chain 或者 Runnable 部署成一个 REST API 服务。

1
2
# 安装 LangServe
!pip install "langserve[all]"

Server端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
from fastapi import FastAPI
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langserve import add_routes
import uvicorn

app = FastAPI(
title="LangChain Server",
version="1.0",
description="A simple api server using Langchain's Runnable interfaces",
)

model = ChatOpenAI()
prompt = ChatPromptTemplate.from_template("讲一个关于{topic}的笑话")
add_routes(
app,
prompt | model,
path="/joke",
)

if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=9999)

Client端

1
2
3
4
5
6
7
import requests

response = requests.post(
"http://localhost:9999/joke/invoke",
json={'input': {'topic': '小明'}}
)
print(response.json()['output']['content'])

输出

1
小明去买鞋,店员问他要什么号的鞋,小明说:“我要一双能跑的。”店员愣了一下,然后递给他一双运动鞋,小明接过来说:“谢谢,我要去追女生。”店员顿时笑喷了。

更多功能请参考:https://python.langchain.com/docs/langserve/

LangChain LangServer LangSmith[3]

参考


LangChain(七)——Callback
https://mztchaoqun.com.cn/posts/D31_LangChain_Callback/
作者
mztchaoqun
发布于
2024年8月6日
许可协议