LangGraph(二)——Human-in-the-loop

一、Human-in-the-loop(人机交互)

在复杂的LLM应用中,一定程度的人工监督/批准/编辑功能还是比较重要的,LangGraph中有两种 Human-in-the-loop方式,分别是InterruptAuthorize

1.1 Interrupt

Interrupt是最简单的控制形式,LangGraph中用户可以在某个action执行之前后之后中断,并将状态进行保存(保存action执行之前或执行完成后的状态)。之后用户可以:

  • 从中断处恢复,就好像它没有中断一样
  • 向应用程序发送新输入,这将取消任何待处理的未来步骤,并开始处理新输入
  • 什么都不做,其他什么都不会运行

设置工具和Model

使用Tavily搜索工具

1
2
3
4
5
6
7
8
9
10
11
12
13
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.prebuilt import ToolExecutor
from langchain_openai import ChatOpenAI
from langchain_core.utils.function_calling import convert_to_openai_function

tools = [TavilySearchResults(max_results=1)]
tool_executor = ToolExecutor(tools)
#支持OpenAI function calling 的模型
model = ChatOpenAI(temperature=0, streaming=True)

#将tools转换成OpenAI function calling的格式
functions = [convert_to_openai_function(t) for t in tools]
model = model.bind_functions(functions)

定义graph节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from langgraph.prebuilt import ToolInvocation
import json
from langchain_core.messages import FunctionMessage


# 定义条件边选择逻辑
def should_continue(messages):
last_message = messages[-1]
# 没有function_call,就结束
if "function_call" not in last_message.additional_kwargs:
return "end"
# 否则继续
else:
return "continue"


#调用模型
def call_model(messages):
response = model.invoke(messages)
#返回 list, 结果将添加到已有的list中
return response


# 定义工具执行函数
def call_tool(messages):
# 在继续执行时,我们知道最后一条消息涉及的一个函数调用
last_message = messages[-1]
# 从function_call构造一个 ToolInvocation
action = ToolInvocation(
tool=last_message.additional_kwargs["function_call"]["name"],
tool_input=json.loads(
last_message.additional_kwargs["function_call"]["arguments"]
),
)
# 使用 tool_executor调用函数获取结果
response = tool_executor.invoke(action)
# 用结果创建一个 FunctionMessage
function_message = FunctionMessage(content=str(response), name=action.tool)
# 返回 list, 因为结果将添加到已有的list中
return function_message

定义一个graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from langgraph.graph import MessageGraph, END

#定义一个graph
workflow = MessageGraph()

# 定义循环节点
workflow.add_node("agent", call_model)
workflow.add_node("action", call_tool)

# 起始节点
workflow.set_entry_point("agent")

# 条件边
workflow.add_conditional_edges(
# 定义起始节点,这意味着条件边会在 agent调用后拿到
"agent",
# 接下来,传入一个函数,该函数将决定接下来调用哪个节点。
should_continue,
# 最后传入一个映射,key是string,value是其他节点,END是一个特殊节点表示图结束,调用should_continue后返回的结果来匹配key决定下一步走哪个
{
# 如果是tool则继续
"continue": "action",
# 否则结束
"end": END,
},
)

# tool调用之后,调用agent
workflow.add_edge("action", "agent")

interrupt设置

1
2
3
4
5
6
7
from langgraph.checkpoint.sqlite import SqliteSaver
from IPython.display import Image

memory = SqliteSaver.from_conn_string(":memory:") #checkpointer保存处
# 编译成 LangChain Runnable
app = workflow.compile(checkpointer=memory, interrupt_before=["action"]) #在action执行前终端
Image(app.get_graph().draw_png()) #预览graph

预览图像

进行交互

1
2
3
4
5
6
7
from langchain_core.messages import HumanMessage

thread = {"configurable": {"thread_id": "2"}}
inputs = [HumanMessage(content="hi! I'm bob")]
for event in app.stream(inputs, thread):
for v in event.values():
print(v)

输出

1
content='Hello Bob! How can I assist you today?' response_metadata={'finish_reason': 'stop'} id='run-e0454f42-6f8c-4d16-b776-4673ce0388d0-0'
1
2
3
4
inputs = [HumanMessage(content="what is my name?")]
for event in app.stream(inputs, thread):
for v in event.values():
print(v)

输出

1
content='Your name is Bob.' response_metadata={'finish_reason': 'stop'} id='run-367d9a0d-9f6f-4340-ba38-67a122f21284-0'

中断

1
2
3
4
inputs = [HumanMessage(content="what's the weather in sf now?")]
for event in app.stream(inputs, thread):
for v in event.values():
print(v)

输出

1
content='' additional_kwargs={'function_call': {'arguments': '{\n  "query": "weather in San Francisco now"\n}', 'name': 'tavily_search_results_json'}} response_metadata={'finish_reason': 'function_call'} id='run-f7bb00ac-7e77-456b-bb2b-725bd448034a-0'

在执行工具前中断了

恢复

stream传None就可以恢复之前中断继续执行

1
2
3
for event in app.stream(None, thread):
for v in event.values():
print(v)

输出

1
2
content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America\', \'lat\': 37.78, \'lon\': -122.42, \'tz_id\': \'America/Los_Angeles\', \'localtime_epoch\': 1714575630, \'localtime\': \'2024-05-01 8:00\'}, \'current\': {\'last_updated_epoch\': 1714575600, \'last_updated\': \'2024-05-01 08:00\', \'temp_c\': 10.0, \'temp_f\': 50.0, \'is_day\': 1, \'condition\': {\'text\': \'Sunny\', \'icon\': \'//cdn.weatherapi.com/weather/64x64/day/113.png\', \'code\': 1000}, \'wind_mph\': 6.9, \'wind_kph\': 11.2, \'wind_degree\': 280, \'wind_dir\': \'W\', \'pressure_mb\': 1017.0, \'pressure_in\': 30.02, \'precip_mm\': 0.0, \'precip_in\': 0.0, \'humidity\': 80, \'cloud\': 0, \'feelslike_c\': 9.3, \'feelslike_f\': 48.7, \'vis_km\': 16.0, \'vis_miles\': 9.0, \'uv\': 4.0, \'gust_mph\': 11.4, \'gust_kph\': 18.4}}"}]' name='tavily_search_results_json' id='39700dba-93a9-46bf-8ecf-a96d9e3a1619'
content='The current weather in San Francisco is sunny with a temperature of 10°C (50°F). The wind is blowing from the west at a speed of 6.9 mph. The humidity is 80% and there are no clouds in the sky.' response_metadata={'finish_reason': 'stop'} id='run-e0056cb7-3eb6-4923-87d2-30968762c170-0'

LangSmith流程展现,中断前:

LangSmith流程展现,恢复后:

1.2 Authorize

第二中方式是Authorize,我们可以提前定义应用程序在每次调用特定参与者时将控制权移交给我们。定义好之后,在调用工具之前,应用程序将暂停并要求我们进行确认,此时用户可以

  • 接受工具调用
  • 拒绝工具调用
  • 什么都不做

设置工具和Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langchain import hub
from langchain.agents import create_openai_functions_agent
from langchain_openai.chat_models import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

tools = [TavilySearchResults(max_results=1)]

# prompt
prompt = hub.pull("hwchase17/openai-functions-agent")

# LLM
llm = ChatOpenAI(model="gpt-3.5-turbo-1106", streaming=True)

# OpenAI Functions agent
agent_runnable = create_openai_functions_agent(llm, tools, prompt)

定义graph state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from typing import TypedDict, Annotated, List, Union
from langchain_core.agents import AgentAction, AgentFinish
from langchain_core.messages import BaseMessage
import operator


class AgentState(TypedDict):
# 输入string
input: str
# 之前的对话消息列表
chat_history: list[BaseMessage]
# Agent的执行结果,起初是None
agent_outcome: Union[AgentAction, AgentFinish, None]
# 观察结果和执行列表,operator.add意味着对这个状态的操作应该将新的值添加到现有的列表中,而不是覆盖它
intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add]

定义节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from langchain_core.agents import AgentFinish
from langgraph.prebuilt.tool_executor import ToolExecutor

# 用于调用工具,并返回结果
tool_executor = ToolExecutor(tools)


# 定义Agent
def run_agent(data):
agent_outcome = agent_runnable.invoke(data)
return {"agent_outcome": agent_outcome}

# 定义工具执行函数
def execute_tools(data):
# 获取Agent最近的结果
agent_action = data["agent_outcome"]
response = input(prompt=f"[y/n] continue with: {agent_action}?") #需要人工批准
if response == "n":
raise ValueError
output = tool_executor.invoke(agent_action)
return {"intermediate_steps": [(agent_action, str(output))]}


# 定义条件边选择逻辑
def should_continue(data):
# 如果Agent的执行结果是 AgentFinish, 就结束
if isinstance(data["agent_outcome"], AgentFinish):
return "end"
# 否则的话, 返回继续
else:
return "continue"

定义graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from langgraph.graph import END, StateGraph

# 定义graph
workflow = StateGraph(AgentState)

# 定义循环节点
workflow.add_node("agent", run_agent)
workflow.add_node("action", execute_tools)

# 起始节点
workflow.set_entry_point("agent")

# 条件边
workflow.add_conditional_edges(
# 定义起始节点,这意味着条件边会在 agent调用后拿到
"agent",
# 接下来,传入一个函数,该函数将决定接下来调用哪个节点。
should_continue,
#最后传入一个映射,key是string,value是其他节点,END是一个特殊节点表示图结束,调用should_continue后返回的结果来匹配key决定下一步走哪个
{
# 如果是tool则继续
"continue": "action",
#否则结束
"end": END,
},
)


# tool调用之后,调用agent
workflow.add_edge("action", "agent")

# 编译成 LangChain Runnable,
app = workflow.compile()

授权

1
2
3
4
inputs = {"input": "what is the weather in sf", "chat_history": []}
for s in app.stream(inputs):
print(list(s.values())[0])
print("----")

输出

1
2
3
4
5
6
{'agent_outcome': AgentActionMessageLog(tool='tavily_search_results_json', tool_input={'query': 'weather in San Francisco'}, log="\nInvoking: `tavily_search_results_json` with `{'query': 'weather in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"weather in San Francisco"}', 'name': 'tavily_search_results_json'}}, response_metadata={'finish_reason': 'function_call'}, id='run-43937c2a-a184-44e8-9307-9c9f00405545-0')])}
----
[y/n] continue with: tool='tavily_search_results_json' tool_input={'query': 'weather in San Francisco'} log="\nInvoking: `tavily_search_results_json` with `{'query': 'weather in San Francisco'}`\n\n\n" message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"weather in San Francisco"}', 'name': 'tavily_search_results_json'}}, response_metadata={'finish_reason': 'function_call'}, id='run-43937c2a-a184-44e8-9307-9c9f00405545-0')]?y(这个y为人工授权输入)
{'intermediate_steps': [(AgentActionMessageLog(tool='tavily_search_results_json', tool_input={'query': 'weather in San Francisco'}, log="\nInvoking: `tavily_search_results_json` with `{'query': 'weather in San Francisco'}`\n\n\n", message_log=[AIMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"weather in San Francisco"}', 'name': 'tavily_search_results_json'}}, response_metadata={'finish_reason': 'function_call'}, id='run-43937c2a-a184-44e8-9307-9c9f00405545-0')]), '[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America\', \'lat\': 37.78, \'lon\': -122.42, \'tz_id\': \'America/Los_Angeles\', \'localtime_epoch\': 1714575155, \'localtime\': \'2024-05-01 7:52\'}, \'current\': {\'last_updated_epoch\': 1714574700, \'last_updated\': \'2024-05-01 07:45\', \'temp_c\': 10.0, \'temp_f\': 50.0, \'is_day\': 1, \'condition\': {\'text\': \'Sunny\', \'icon\': \'//cdn.weatherapi.com/weather/64x64/day/113.png\', \'code\': 1000}, \'wind_mph\': 6.9, \'wind_kph\': 11.2, \'wind_degree\': 280, \'wind_dir\': \'W\', \'pressure_mb\': 1017.0, \'pressure_in\': 30.02, \'precip_mm\': 0.0, \'precip_in\': 0.0, \'humidity\': 80, \'cloud\': 0, \'feelslike_c\': 9.6, \'feelslike_f\': 49.4, \'vis_km\': 16.0, \'vis_miles\': 9.0, \'uv\': 4.0, \'gust_mph\': 11.4, \'gust_kph\': 18.4}}"}]')]}
----
{'agent_outcome': AgentFinish(return_values={'output': 'The current weather in San Francisco is sunny with a temperature of 10°C (50°F). The wind is blowing at 6.9 mph from the west. The humidity is at 80%, and there is no precipitation. The visibility is 16.0 km (9.0 miles), and the UV index is 4.0.'}, log='The current weather in San Francisco is sunny with a temperature of 10°C (50°F). The wind is blowing at 6.9 mph from the west. The humidity is at 80%, and there is no precipitation. The visibility is 16.0 km (9.0 miles), and the UV index is 4.0.')}

LangSmith流程展现:

action处有授权,LangSmith流程中观察不出来

官方资源

代码示例更详细说明:

OpenGPTS:https://github.com/langchain-ai/opengpts

官方Blog中给的 OpenGPTS Demo不好用,建议自己拉代码用docker搭建一下去测试。


LangGraph(二)——Human-in-the-loop
https://mztchaoqun.com.cn/posts/D33_LangGraph_Human-in-the-loop/
作者
mztchaoqun
发布于
2024年8月19日
许可协议