LangGraph(三)—— Multi-Agent Workflows

一、什么是Multi-Agent

Multi-Agent 指的是由语言模型驱动的多个独立参与者,这些参与者以特定的方式相互连接。其中每个Agent可以拥有自己的prompt, LLM, tools和其他自定义代码,以便能更好的与其他Agent协作。

因此在 Multi-Agent Workflows中有两个主要问题:

  • 多个独立的Agent分别是什么
  • 这些Agent如何连接

在LangGraph中每个Agent都是一个Node,它们之间的连接被称为Edge,控制流程由条件Edge来管理,它们之间通过添加到图的状态来进行通信。LangGraph中有一个状态机的概念,每个独立的Agent节点都有状态,而Agent之间的连接则相当于转移矩阵。由于状态机本质上也是一个有向图,LangGraph的graph表示方法与状态机的概念相辅相成,使得开发者能够以graph的方式设计和实现复杂的多代理系统。

1.1 Multi-Agent 的好处

  1. 对工具/职责进行分组:可以提供更好的结果。与必须从数十种工具中进行选择,相比一个Agent在专注于一个任务时更有可能成功。

  2. 单独的Prompt:可以提供更好的结果。每个prompt可以有自己的指令和少量示例。每个Agent甚至可以由单独经过微调的大型语言模型(LLM)驱动

  3. 有助于开发概念模型:你可以单独评估和改进每个Agent,而不会破坏更大的应用。

Multi-agent设计允许你将复杂的问题划分为可由专门的Agent和大型语言模型(LLM)程序解决的可行工作单元。

二、Multi-Agent 例子

2.1 Agent Supervisor

如图Agent Supervisor负责将任务路由到各个独立Agent。而每个独立Agent都是一个LangChain Agent,它们有自己的prompt、LLM和Tools,调用每个独立Agent时不仅仅是一次LLM调用,而是一次AgentExecutor 的运行。

Supervisor也可以被认为是一个Agent,它的Tools是其他Agent.

Tools

1
2
3
4
5
6
7
8
9
10
from typing import Annotated, List, Tuple, Union

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langchain_experimental.tools import PythonREPLTool

tavily_tool = TavilySearchResults(max_results=5)

# This executes code locally, which can be unsafe
python_repl_tool = PythonREPLTool()

Utilites

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI


def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
# Each worker node will be given a name and some tools.
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
system_prompt,
),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor

def agent_node(state, agent, name):
result = agent.invoke(state)
return {"messages": [HumanMessage(content=result["output"], name=name)]}

Agent Supervisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

members = ["Researcher", "Coder"]
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
)
# Our team supervisor is an LLM node. It just picks the next agent to process
# and decides when the work is completed
options = ["FINISH"] + members
# Using openai function calling can make output parsing easier for us
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [
{"enum": options},
],
}
},
"required": ["next"],
},
}
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
(
"system",
"Given the conversation above, who should act next?"
" Or should we FINISH? Select one of: {options}",
),
]
).partial(options=str(options), members=", ".join(members))

llm = ChatOpenAI(model="gpt-3.5-turbo")

supervisor_chain = (
prompt
| llm.bind_functions(functions=[function_def], function_call="route")
| JsonOutputFunctionsParser()
)

Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END


# The agent state is the input to each node in the graph
class AgentState(TypedDict):
# The annotation tells the graph that new messages will always
# be added to the current states
messages: Annotated[Sequence[BaseMessage], operator.add]
# The 'next' field indicates where to route to next
next: str


research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.")
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION
code_agent = create_agent(
llm,
[python_repl_tool],
"You may generate safe python code to analyze data and generate charts using matplotlib.",
)
code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

workflow = StateGraph(AgentState)
workflow.add_node("Researcher", research_node)
workflow.add_node("Coder", code_node)
workflow.add_node("supervisor", supervisor_chain)

for member in members:
# We want our workers to ALWAYS "report back" to the supervisor when done
workflow.add_edge(member, "supervisor")
# The supervisor populates the "next" field in the graph state
# which routes to a node or finishes
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
# Finally, add entrypoint
workflow.set_entry_point("supervisor")

graph = workflow.compile()

展示Graph

1
2
3
from IPython.display import Image

Image(graph.get_graph().draw_png())

执行

1
2
3
4
5
6
7
for s in graph.stream(
{"messages": [HumanMessage(content="Write a brief research report on Python, Then Code hello world and print it to the terminal.")]},
{"recursion_limit": 100},
):
if "__end__" not in s:
print(s)
print("----")

输出

1
2
3
4
5
6
7
8
9
10
{'supervisor': {'next': 'Researcher'}}
----
{'Researcher': {'messages': [HumanMessage(content='Research Report on Python\n\nPython is a high-level programming language known for its simplicity and readability. It was created by Guido van Rossum and first released in 1991. Python is widely used for various purposes, including web development, data analysis, artificial intelligence, scientific computing, and automation.\n\n1. Features of Python:\n - Easy to learn and understand: Python has a clean and straightforward syntax, making it beginner-friendly.\n - Cross-platform compatibility: Python can run on different operating systems, including Windows, macOS, and Linux.\n - Extensive standard library: Python comes with a vast collection of modules and libraries that provide pre-built functionality for various tasks.\n - Third-party libraries and frameworks: Python has a rich ecosystem with numerous third-party libraries and frameworks, such as Django, Flask, NumPy, Pandas, and TensorFlow, which expand its capabilities.\n - Dynamic typing: Python does not require explicit variable declarations and supports dynamic typing, allowing flexibility in coding.\n - Object-oriented programming (OOP) support: Python supports OOP principles, allowing developers to create reusable and modular code.\n - Interpreted language: Python code is executed line by line by the Python interpreter, eliminating the need for compilation before running.\n\n2. Use Cases of Python:\n - Web Development: Python web frameworks like Django and Flask are popular choices for building scalable 时候只需要一个代理提供的最终答案ding advanced computational capabilities.\n\nNow, let\'s write a simple "Hello World" program in Python and print it to the terminal:\n\n```python\nprint("Hello World")\n```\n\nThis code snippet will print "Hello World" to the terminal when executed.\n\nPython\'s popularity and versatility make it a preferred language for developers across various domains. Its simplicity, extensive library support, and community-driven nature contribute to its widespread adoption and continuous growth.', name='Researcher')]}}
----
{'supervisor': {'next': 'Coder'}}
----
{'Coder': {'messages': [HumanMessage(content='The Python code to print "Hello World" to the terminal is:\n\n```python\nprint(\'Hello World\')\n```\n\nWhen executed, it will display "Hello World" as the output.', name='Coder')]}}
----
{'supervisor': {'next': 'FINISH'}}
----

LangSmith流程展示:

2.2 Multi Agent 协作

与Agent Supervisor不同,Multi Agent 协作中不同的Agent在一个共享的消息草稿上进行协作。它们所做的所有工作对彼此都是可见的。这样做的好处是其他Agent可以看到完成的所有单独步骤。但是,有时传递所有这些信息可能会过于冗长且不必要,有时候只需要一个代理提供的最终答案。之所以称其为协作,是因为消息草稿的共享特性。

在这种情况下,独立Agent实际上只是一个 LLM 调用。 具体来说,它们是一个特定的提示模板(使用特定的系统消息以特定的方式格式化输入)加上 LLM 调用。

控制状态转换的主要是Router,但它是一个基于规则的Router。 基本上,在每次大型语言模型(LLM)调用之后,它会查看输出,如果调用了一个工具,那么它就会调用那个工具;如果没有调用任何工具,而LLM响应“最终答案”,则它会返回给用户。否则(如果没有调用任何工具,且LLM没有响应“最终答案”),它就会转向另一个LLM。

Agents

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import json

from langchain_core.messages import (
AIMessage,
BaseMessage,
ChatMessage,
FunctionMessage,
HumanMessage,
)
from langchain.tools.render import format_tool_to_openai_function
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph
from langgraph.prebuilt.tool_executor import ToolExecutor, ToolInvocation


def create_agent(llm, tools, system_message: str):
"""Create an agent."""
functions = [format_tool_to_openai_function(t) for t in tools]

prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
return prompt | llm.bind_functions(functions)

Tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain_core.tools import tool
from typing import Annotated
from langchain_experimental.utilities import PythonREPL
from langchain_community.tools.tavily_search import TavilySearchResults

tavily_tool = TavilySearchResults(max_results=5)

# Warning: This executes code locally, which can be unsafe when not sandboxed

repl = PythonREPL()


@tool
def python_repl(
code: Annotated[str, "The python code to execute to generate your chart."]
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
return f"Succesfully executed:\n```python\n{code}\n```\nStdout: {result}"

Graph State

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import operator
from typing import Annotated, List, Sequence, Tuple, TypedDict, Union

from langchain.agents import create_openai_functions_agent
from langchain.tools.render import format_tool_to_openai_function
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict


# This defines the object that is passed between each node
# in the graph. We will create different nodes for each agent and tool
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
sender: str

Agent Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import functools


# Helper function to create a node for a given agent
def agent_node(state, agent, name):
result = agent.invoke(state)
# We convert the agent output into a format that is suitable to append to the global state
if isinstance(result, FunctionMessage):
pass
else:
result = HumanMessage(**result.dict(exclude={"type", "name"}), name=name)
return {
"messages": [result],
# Since we have a strict workflow, we can
# track the sender so we know who to pass to next.
"sender": name,
}


llm = ChatOpenAI(model="gpt-4-turbo")

# Research agent and node
research_agent = create_agent(
llm,
[tavily_tool],
system_message="You should provide accurate data for the Chart Generator to use.",
)
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")

# Chart Generator
chart_agent = create_agent(
llm,
[python_repl],
system_message="Any charts you display will be visible by the user.",
)
chart_node = functools.partial(agent_node, agent=chart_agent, name="Chart_Generator")

Tool Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
tools = [tavily_tool, python_repl]
tool_executor = ToolExecutor(tools)


def tool_node(state):
"""This runs tools in the graph

It takes in an agent action and calls that tool and returns the result."""
messages = state["messages"]
# Based on the continue condition
# we know the last message involves a function call
last_message = messages[-1]
# We construct an ToolInvocation from the function_call
tool_input = json.loads(
last_message.additional_kwargs["function_call"]["arguments"]
)
# We can pass single-arg inputs by value
if len(tool_input) == 1 and "__arg1" in tool_input:
tool_input = next(iter(tool_input.values()))
tool_name = last_message.additional_kwargs["function_call"]["name"]
action = ToolInvocation(
tool=tool_name,
tool_input=tool_input,
)
# We call the tool_executor and get back a response
response = tool_executor.invoke(action)
# We use the response to create a FunctionMessage
function_message = FunctionMessage(
content=f"{tool_name} response: {str(response)}", name=action.tool
)
# We return a list, because this will get added to the existing list
return {"messages": [function_message]}

条件Edge

1
2
3
4
5
6
7
8
9
10
11
12
# Either agent can decide to end
def router(state):
# This is the router
messages = state["messages"]
last_message = messages[-1]
if "function_call" in last_message.additional_kwargs:
# The previus agent is invoking a tool
return "call_tool"
if "FINAL ANSWER" in last_message.content:
# Any agent decided the work is done
return "end"
return "continue"

Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
workflow = StateGraph(AgentState)

workflow.add_node("Researcher", research_node)
workflow.add_node("Chart_Generator", chart_node)
workflow.add_node("call_tool", tool_node)

workflow.add_conditional_edges(
"Researcher",
router,
{"continue": "Chart_Generator", "call_tool": "call_tool", "end": END},
)
workflow.add_conditional_edges(
"Chart_Generator",
router,
{"continue": "Researcher", "call_tool": "call_tool", "end": END},
)

workflow.add_conditional_edges(
"call_tool",
# Each agent node updates the 'sender' field
# the tool calling node does not, meaning
# this edge will route back to the original agent
# who invoked the tool
lambda x: x["sender"],
{
"Researcher": "Researcher",
"Chart_Generator": "Chart_Generator",
},
)
workflow.set_entry_point("Researcher")
graph = workflow.compile()

Graph 预览

1
2
3
from IPython.display import Image

Image(graph.get_graph().draw_png())

执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
for s in graph.stream(
{
"messages": [
HumanMessage(
content="Fetch the UK's GDP over the past 5 years,"
" then draw a line graph of it."
" Once you code it up, finish."
)
],
},
# Maximum number of steps to take in the graph
{"recursion_limit": 150},
):
print(s)
print("----")

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{'Researcher': {'messages': [HumanMessage(content='', additional_kwargs={'function_call': {'arguments': '{"query":"UK GDP past 5 years"}', 'name': 'tavily_search_results_json'}}, response_metadata={'token_usage': {'completion_tokens': 23, 'prompt_tokens': 221, 'total_tokens': 244}, 'model_name': 'gpt-4-turbo', 'system_fingerprint': 'fp_ea6eb70039', 'finish_reason': 'function_call', 'logprobs': None}, name='Researcher', id='run-ff1edf83-d634-4016-8884-a72f4ace82a3-0', invalid_tool_calls=[], tool_calls=[])], 'sender': 'Researcher'}}
----
{'call_tool': {'messages': [FunctionMessage(content='tavily_search_results_json response: [{\'url\': \'https://www.macrotrends.net/global-metrics/countries/GBR/united-kingdom/gdp-gross-domestic-product\', \'content\': "U.K. gdp for 2021 was $3,141.51B, a 16.45% increase from 2020. U.K. gdp for 2020 was $2,697.81B, a 5.39% decline from 2019. U.K. gdp for 2019 was $2,851.41B, a 0.69% decline from 2018. GDP at purchaser\'s prices is the sum of gross value added by all resident producers in the economy plus any product taxes and minus any subsidies not included in ..."}, {\'url\': \'https://www.ons.gov.uk/economy/grossdomesticproductgdp/timeseries/ihyp/pn2\', \'content\': \'Preliminary estimate of GDP time series (PGDP), released on 27 April 2018\\nPublications that use this data\\nContact details for this data\\nFooter links\\nHelp\\nAbout ONS\\nConnect with us\\nAll content is available under the Open Government Licence v3.0, except where otherwise stated Year on Year growth: CVM SA %\\nDownload full time series as:\\nDownload filtered time series as:\\nTable\\nNotes\\nFollowing a quality review it has been identified that the methodology used to estimate elements of purchased software within gross fixed capital formation (GFCF) has led to some double counting from 1997 onwards. GDP quarterly national accounts time series (QNA), released on 22 December 2023\\nIHYP: UK Economic Accounts time series (UKEA), released on 22 December 2023\\nIHYP: GDP first quarterly estimate time series\\n(PN2), released on 10 November 2023\\nIHYP: Year on Year growth: CVM SA %\\nSource dataset: GDP first quarterly estimate time series (PN2)\\nContact: Niamh McAuley\\nRelease date: 10 November 2023\\nView previous versions\\n %\\nFilters\\nCustom time period\\nChart\\nDownload this time seriesGross Domestic Product:\'}, {\'url\': \'https://www.macrotrends.net/global-metrics/countries/GBR/united-kingdom/gdp-growth-rate\', \'content\': \'U.K. gdp growth rate for 2021 was 8.67%, a 19.03% increase from 2020. U.K. gdp growth rate for 2020 was -10.36%, a 12% decline from 2019. U.K. gdp growth rate for 2019 was 1.64%, a 0.24% increase from 2018. Annual percentage growth rate of GDP at market prices based on constant local currency. Aggregates are based on constant 2010 U.S. dollars.\'}, {\'url\': \'https://data.worldbank.org/indicator/NY.GDP.MKTP.KD.ZG?locations=GB\', \'content\': \'GDP growth (annual %) - United Kingdom. World Bank national accounts data, and OECD National Accounts data files. License : CC BY-4.0. LineBarMap. Also Show Share Details. Label. 1961 - 2022.\'}, {\'url\': \'https://www.ons.gov.uk/economy/grossdomesticproductgdp\', \'content\': \'Quarter on Quarter growth: CVM SA %\\nChained Volume Measures (CVM)\\nGross Domestic Product: q-on-q4 growth rate CVM SA %\\nChained Volume Measures (CVM)\\nGross Domestic Product at market prices: Current price: Seasonally adjusted £m\\nCurrent Prices (CP)\\nGross Domestic Product: quarter on quarter growth rate: CP SA %\\nCurrent Prices (CP)\\nGross Domestic Product: q-on-q4 growth quarter growth: CP SA %\\nCurrent Prices (CP)\\nDatasets related to Gross Domestic Product (GDP)\\n A roundup of the latest data and trends on the economy, business and jobs\\nTime series related to Gross Domestic Product (GDP)\\nGross Domestic Product: chained volume measures: Seasonally adjusted £m\\nChained Volume Measures (CVM)\\nGross Domestic Product: Hide\\nData and analysis from Census 2021\\nGross Domestic Product (GDP)\\nGross domestic product (GDP) estimates as the main measure of UK economic growth based on the value of goods and services produced during a given period. Contains current and constant price data on the value of goods and services to indicate the economic performance of the UK.\\nEstimates of short-term indicators of investment in non-financial assets; business investment and asset and sector breakdowns of total gross fixed capital formation.\\n Monthly gross domestic product by gross value added\\nThe gross value added (GVA) tables showing the monthly and annual growths and indices as published within the monthly gross domestic product (GDP) statistical bulletin.\\n\'}]', name='tavily_search_results_json')]}}
----
{'Researcher': {'messages': [HumanMessage(content="Based on the search results, here are the GDP values for the UK over the past five years:\n\n- 2023: Data not explicitly given, but the GDP for 2022 will be used as a reference for the latest year.\n- 2022: Data not explicitly given, but given trends and the GDP from 2021, we can estimate or use the 2021 data.\n- 2021: $3,141.51 billion\n- 2020: $2,697.81 billion\n- 2019: $2,851.41 billion\n- 2018: Approximately $2,870 billion (estimated from the 2019 value noting a 0.69% decline)\n\nNow, let's create the line graph based on these values. Since the exact figure for 2022 and 2023 is not specified, I will use the value from 2021 as a placeholder for both to illustrate the trend. This assumption is to maintain continuity in the graph but should be updated with exact figures when available. \n\nHere is the data ready for the graph:\n\n- 2018: $2,870 billion\n- 2019: $2,851.41 billion\n- 2020: $2,697.81 billion\n- 2021: $3,141.51 billion\n- 2022: $3,141.51 billion (estimated, needs exact data)\n- 2023: $3,141.51 billion (estimated, needs exact data)\n\nThis dataset will be used to plot the line graph.", response_metadata={'token_usage': {'completion_tokens': 324, 'prompt_tokens': 1293, 'total_tokens': 1617}, 'model_name': 'gpt-4-turbo', 'system_fingerprint': 'fp_3450ce39d5', 'finish_reason': 'stop', 'logprobs': None}, name='Researcher', id='run-79fd4e6f-74ce-410a-90c2-e211bc383d7a-0', invalid_tool_calls=[], tool_calls=[])], 'sender': 'Researcher'}}
----
{'Chart_Generator': {'messages': [HumanMessage(content='', additional_kwargs={'function_call': {'arguments': '{"code":"import matplotlib.pyplot as plt\\n\\nyears = [2018, 2019, 2020, 2021, 2022, 2023]\\ngdp_values = [2870, 2851.41, 2697.81, 3141.51, 3141.51, 3141.51] # in billions\\n\\nplt.figure(figsize=(10, 5))\\nplt.plot(years, gdp_values, marker=\'o\')\\nplt.title(\'UK GDP Over the Past 5 Years\')\\nplt.xlabel(\'Year\')\\nplt.ylabel(\'GDP (in billions USD)\')\\nplt.grid(True)\\nplt.xticks(years)\\nplt.yticks(range(2600, 3200, 100))\\nplt.show()"}', 'name': 'python_repl'}}, response_metadata={'token_usage': {'completion_tokens': 182, 'prompt_tokens': 1637, 'total_tokens': 1819}, 'model_name': 'gpt-4-turbo', 'system_fingerprint': 'fp_ea6eb70039', 'finish_reason': 'function_call', 'logprobs': None}, name='Chart_Generator', id='run-1bb9abd1-52df-44a8-8c0b-d5227f8e9a2a-0', invalid_tool_calls=[], tool_calls=[])], 'sender': 'Chart_Generator'}}
----
{'call_tool': {'messages': [FunctionMessage(content="python_repl response: Succesfully executed:\n```python\nimport matplotlib.pyplot as plt\n\nyears = [2018, 2019, 2020, 2021, 2022, 2023]\ngdp_values = [2870, 2851.41, 2697.81, 3141.51, 3141.51, 3141.51] # in billions\n\nplt.figure(figsize=(10, 5))\nplt.plot(years, gdp_values, marker='o')\nplt.title('UK GDP Over the Past 5 Years')\nplt.xlabel('Year')\nplt.ylabel('GDP (in billions USD)')\nplt.grid(True)\nplt.xticks(years)\nplt.yticks(range(2600, 3200, 100))\nplt.show()\n```\nStdout: ", name='python_repl')]}}
----
{'Chart_Generator': {'messages': [HumanMessage(content="Here is the line graph depicting the UK's GDP over the past 5 years:\n\n![UK GDP Over the Past 5 Years](sandbox:/mnt/data/plot1.png)", response_metadata={'token_usage': {'completion_tokens': 37, 'prompt_tokens': 1812, 'total_tokens': 1849}, 'model_name': 'gpt-4-turbo', 'system_fingerprint': 'fp_46a93fa712', 'finish_reason': 'stop', 'logprobs': None}, name='Chart_Generator', id='run-1ef41caf-107f-441c-8ea6-ae96cc5e3b39-0', invalid_tool_calls=[], tool_calls=[])], 'sender': 'Chart_Generator'}}
----
{'Researcher': {'messages': [HumanMessage(content="FINAL ANSWER\n\nThe line graph above illustrates the UK's GDP over the past 5 years. It shows a decline in GDP from 2018 to 2020, followed by a significant recovery in 2021. The values for 2022 and 2023 are estimated based on the 2021 data, as the exact figures were not available at the time of the search. For the most accurate and up-to-date data, it is recommended to refer to official sources like the UK's Office for National Statistics or the World Bank.", response_metadata={'token_usage': {'completion_tokens': 111, 'prompt_tokens': 1842, 'total_tokens': 1953}, 'model_name': 'gpt-4-turbo', 'system_fingerprint': 'fp_ea6eb70039', 'finish_reason': 'stop', 'logprobs': None}, name='Researcher', id='run-8099f53f-9c23-4f69-9bf4-6a80a678c52b-0', invalid_tool_calls=[], tool_calls=[])], 'sender': 'Researcher'}}
----

LangSmith流程展示:

2.3 层级Agent Teams

层级Agent Teams与Agent Supervisor类似,但节点中的Agent实际上是其他 LangGraph 对象。这比使用 LangChain AgentExecutor 作为Agent Runtime更灵活。称其为分层团队,因为子Agent在某种程度上可以被视为团队。这里每个LangGraph Agent是一个独立的Agent了。

Tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from typing import Annotated, List, Tuple, Union

import matplotlib.pyplot as plt
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.tools import tool
from langsmith import trace

tavily_tool = TavilySearchResults(max_results=5)


@tool
def scrape_webpages(urls: List[str]) -> str:
"""Use requests and bs4 to scrape the provided web pages for detailed information."""
loader = WebBaseLoader(urls)
docs = loader.load()
return "\n\n".join(
[
f'<Document name="{doc.metadata.get("title", "")}">\n{doc.page_content}\n</Document>'
for doc in docs
]
)

Document writing team tools

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Dict, Optional

from langchain_experimental.utilities import PythonREPL
from typing_extensions import TypedDict

_TEMP_DIRECTORY = TemporaryDirectory()
WORKING_DIRECTORY = Path(_TEMP_DIRECTORY.name)


@tool
def create_outline(
points: Annotated[List[str], "List of main points or sections."],
file_name: Annotated[str, "File path to save the outline."],
) -> Annotated[str, "Path of the saved outline file."]:
"""Create and save an outline."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
for i, point in enumerate(points):
file.write(f"{i + 1}. {point}\n")
return f"Outline saved to {file_name}"


@tool
def read_document(
file_name: Annotated[str, "File path to save the document."],
start: Annotated[Optional[int], "The start line. Default is 0"] = None,
end: Annotated[Optional[int], "The end line. Default is None"] = None,
) -> str:
"""Read the specified document."""
with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()
if start is not None:
start = 0
return "\n".join(lines[start:end])


@tool
def write_document(
content: Annotated[str, "Text content to be written into the document."],
file_name: Annotated[str, "File path to save the document."],
) -> Annotated[str, "Path of the saved document file."]:
"""Create and save a text document."""
with (WORKING_DIRECTORY / file_name).open("w") as file:
file.write(content)
return f"Document saved to {file_name}"


@tool
def edit_document(
file_name: Annotated[str, "Path of the document to be edited."],
inserts: Annotated[
Dict[int, str],
"Dictionary where key is the line number (1-indexed) and value is the text to be inserted at that line.",
],
) -> Annotated[str, "Path of the edited document file."]:
"""Edit a document by inserting text at specific line numbers."""

with (WORKING_DIRECTORY / file_name).open("r") as file:
lines = file.readlines()

sorted_inserts = sorted(inserts.items())

for line_number, text in sorted_inserts:
if 1 <= line_number <= len(lines) + 1:
lines.insert(line_number - 1, text + "\n")
else:
return f"Error: Line number {line_number} is out of range."

with (WORKING_DIRECTORY / file_name).open("w") as file:
file.writelines(lines)

return f"Document edited and saved to {file_name}"


# Warning: This executes code locally, which can be unsafe when not sandboxed

repl = PythonREPL()


@tool
def python_repl(
code: Annotated[str, "The python code to execute to generate your chart."]
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
return f"Succesfully executed:\n```python\n{code}\n```\nStdout: {result}"

Helper Utilities

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from typing import Any, Callable, List, Optional, TypedDict, Union

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import Runnable
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI

from langgraph.graph import END, StateGraph


def create_agent(
llm: ChatOpenAI,
tools: list,
system_prompt: str,
) -> str:
"""Create a function-calling agent and add it to the graph."""
system_prompt += "\nWork autonomously according to your specialty, using the tools available to you."
" Do not ask for clarification."
" Your other team members (and other teams) will collaborate with you with their own specialties."
" You are chosen for a reason! You are one of the following team members: {team_members}."
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
system_prompt,
),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
]
)
agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor


def agent_node(state, agent, name):
result = agent.invoke(state)
return {"messages": [HumanMessage(content=result["output"], name=name)]}


def create_team_supervisor(llm: ChatOpenAI, system_prompt, members) -> str:
"""An LLM-based router."""
options = ["FINISH"] + members
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"title": "Next",
"anyOf": [
{"enum": options},
],
},
},
"required": ["next"],
},
}
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
(
"system",
"Given the conversation above, who should act next?"
" Or should we FINISH? Select one of: {options}",
),
]
).partial(options=str(options), team_members=", ".join(members))
return (
prompt
| llm.bind_functions(functions=[function_def], function_call="route")
| JsonOutputFunctionsParser()
)

Research Team

supervisor agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import functools
import operator

from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI
import functools


# Research team graph state
class ResearchTeamState(TypedDict):
# A message is added after each team member finishes
messages: Annotated[List[BaseMessage], operator.add]
# The team members are tracked so they are aware of
# the others' skill-sets
team_members: List[str]
# Used to route work. The supervisor calls a function
# that will update this every time it makes a decision
next: str


llm = ChatOpenAI(model="gpt-4-turbo")

search_agent = create_agent(
llm,
[tavily_tool],
"You are a research assistant who can search for up-to-date info using the tavily search engine.",
)
search_node = functools.partial(agent_node, agent=search_agent, name="Search")

research_agent = create_agent(
llm,
[scrape_webpages],
"You are a research assistant who can scrape specified urls for more detailed information using the scrape_webpages function.",
)
research_node = functools.partial(agent_node, agent=research_agent, name="Web Scraper")

supervisor_agent = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following workers: Search, Web Scraper. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["Search", "Web Scraper"],
)

Research Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
research_graph = StateGraph(ResearchTeamState)
research_graph.add_node("Search", search_node)
research_graph.add_node("Web Scraper", research_node)
research_graph.add_node("supervisor", supervisor_agent)

# Define the control flow
research_graph.add_edge("Search", "supervisor")
research_graph.add_edge("Web Scraper", "supervisor")
research_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{"Search": "Search", "Web Scraper": "Web Scraper", "FINISH": END},
)


research_graph.set_entry_point("supervisor")
chain = research_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str):
results = {
"messages": [HumanMessage(content=message)],
}
return results


research_chain = enter_chain | chain

Research Graph预览

1
Image(chain.get_graph().draw_png())

Document Writing Team

doc writing supervisor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import operator
from pathlib import Path


# Document writing team graph state
class DocWritingState(TypedDict):
# This tracks the team's conversation internally
messages: Annotated[List[BaseMessage], operator.add]
# This provides each worker with context on the others' skill sets
team_members: str
# This is how the supervisor tells langgraph who to work next
next: str
# This tracks the shared directory state
current_files: str


# This will be run before each worker agent begins work
# It makes it so they are more aware of the current state
# of the working directory.
def prelude(state):
written_files = []
if not WORKING_DIRECTORY.exists():
WORKING_DIRECTORY.mkdir()
try:
written_files = [
f.relative_to(WORKING_DIRECTORY) for f in WORKING_DIRECTORY.rglob("*")
]
except:
pass
if not written_files:
return {**state, "current_files": "No files written."}
return {
**state,
"current_files": "\nBelow are files your team has written to the directory:\n"
+ "\n".join([f" - {f}" for f in written_files]),
}


llm = ChatOpenAI(model="gpt-4-turbo")

doc_writer_agent = create_agent(
llm,
[write_document, edit_document, read_document],
"You are an expert writing a research document.\n"
# The {current_files} value is populated automatically by the graph state
"Below are files currently in your directory:\n{current_files}",
)
# Injects current directory working state before each call
context_aware_doc_writer_agent = prelude | doc_writer_agent
doc_writing_node = functools.partial(
agent_node, agent=context_aware_doc_writer_agent, name="Doc_Writer"
)

note_taking_agent = create_agent(
llm,
[create_outline, read_document],
"You are an expert senior researcher tasked with writing a paper outline and"
" taking notes to craft a perfect paper.{current_files}",
)
context_aware_note_taking_agent = prelude | note_taking_agent
note_taking_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="Note_Taker"
)

chart_generating_agent = create_agent(
llm,
[read_document, python_repl],
"You are a data viz expert tasked with generating charts for a research project."
"{current_files}",
)
context_aware_chart_generating_agent = prelude | chart_generating_agent
chart_generating_node = functools.partial(
agent_node, agent=context_aware_note_taking_agent, name="Chart_Generator"
)

doc_writing_supervisor = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following workers: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["Doc_Writer", "Note_Taker", "Chart_Generator"],
)

Document Writing Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Create the graph here:
# Note that we have unrolled the loop for the sake of this doc
authoring_graph = StateGraph(DocWritingState)
authoring_graph.add_node("Doc_Writer", doc_writing_node)
authoring_graph.add_node("Note_Taker", note_taking_node)
authoring_graph.add_node("Chart_Generator", chart_generating_node)
authoring_graph.add_node("supervisor", doc_writing_supervisor)

# Add the edges that always occur
authoring_graph.add_edge("Doc_Writer", "supervisor")
authoring_graph.add_edge("Note_Taker", "supervisor")
authoring_graph.add_edge("Chart_Generator", "supervisor")

# Add the edges where routing applies
authoring_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{
"Doc_Writer": "Doc_Writer",
"Note_Taker": "Note_Taker",
"Chart_Generator": "Chart_Generator",
"FINISH": END,
},
)

authoring_graph.set_entry_point("supervisor")
chain = authoring_graph.compile()


# The following functions interoperate between the top level graph state
# and the state of the research sub-graph
# this makes it so that the states of each graph don't get intermixed
def enter_chain(message: str, members: List[str]):
results = {
"messages": [HumanMessage(content=message)],
"team_members": ", ".join(members),
}
return results


# We re-use the enter/exit functions to wrap the graph
authoring_chain = (
functools.partial(enter_chain, members=authoring_graph.nodes)
| authoring_graph.compile()
)

Document Writing Graph 预览

1
Image(chain.get_graph().draw_png())

Add Layers

Supervisor Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from langchain_core.messages import AIMessage, BaseMessage, HumanMessage
from langchain_openai.chat_models import ChatOpenAI


llm = ChatOpenAI(model="gpt-4-turbo")

supervisor_node = create_team_supervisor(
llm,
"You are a supervisor tasked with managing a conversation between the"
" following teams: {team_members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH.",
["Research team", "Paper writing team"],
)

Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Top-level graph state
class State(TypedDict):
messages: Annotated[List[BaseMessage], operator.add]
next: str


def get_last_message(state: State) -> str:
return state["messages"][-1].content


def join_graph(response: dict):
return {"messages": [response["messages"][-1]]}


# Define the graph.
super_graph = StateGraph(State)
# First add the nodes, which will do the work
super_graph.add_node("Research team", get_last_message | research_chain | join_graph)
super_graph.add_node(
"Paper writing team", get_last_message | authoring_chain | join_graph
)
super_graph.add_node("supervisor", supervisor_node)

# Define the graph connections, which controls how the logic
# propagates through the program
super_graph.add_edge("Research team", "supervisor")
super_graph.add_edge("Paper writing team", "supervisor")
super_graph.add_conditional_edges(
"supervisor",
lambda x: x["next"],
{
"Paper writing team": "Paper writing team",
"Research team": "Research team",
"FINISH": END,
},
)
super_graph.set_entry_point("supervisor")
super_graph = super_graph.compile()

Graph预览

1
Image(super_graph.get_graph().draw_png())

执行

1
2
3
4
5
6
7
8
9
10
11
12
13
for s in super_graph.stream(
{
"messages": [
HumanMessage(
content="Write a brief research report on the North American sturgeon. Include a chart."
)
],
},
{"recursion_limit": 150},
):
if "__end__" not in s:
print(s)
print("---")

输出

1
2
3
4
5
6
7
8
9
10
{'supervisor': {'next': 'Research team'}}
---
{'Research team': {'messages': [HumanMessage(content='### Research Report on North American Sturgeon\n\n#### Overview\nSturgeons are ancient fish known for their distinctive anatomical features such as a heterocercal caudal fin, rows of bony scutes along the body, and a protrusible, toothless mouth with tactile barbels. These fish are bottom-dwellers and primarily feed on invertebrates and smaller fish. Sturgeons are notable for their significant sizes, long lifespans, and slow reproduction rates, which unfortunately make them highly susceptible to overexploitation and environmental changes.\n\n#### Species of North American Sturgeon\n1. **White Sturgeon (Acipenser transmontanus)** - Also known as the Pacific sturgeon, this is the largest freshwater fish in North America. It can weigh up to 1,800 pounds and measure up to 20 feet in length. This species is found from Alaska down to Mexico along the western coast.\n\n2. **Shortnose Sturgeon (Acipenser brevirostrum)** - Found on the eastern coast, from New Brunswick to Florida, this species is smaller, reaching up to 4 feet and weighing about 50 pounds.\n\n3. **Green Sturgeon (Acipenser medirostris)** - This species inhabits the western coast from Mexico to Alaska. It can grow up to 7 feet long and weighs up to 350 pounds.\n\n#### Conservation Status\nNorth American sturgeons are under significant threat due to factors such as overfishing, habitat loss (particularly from dam construction), and pollution. The White Sturgeon has a stable population overall, but localized populations, such as those in the Kootenai River, have been listed as endangered. The Green and Shortnose sturgeons face similar challenges, with the latter being listed under the Endangered Species Act.\n\n#### International Protection\nAll sturgeon species are protected under the Convention on International Trade in Endangered Species of Wild Fauna and Flora (CITES), which regulates international trade and aims to prevent species extinction due to excessive commercial exploitation.\n\n#### Chart: North American Sturgeon Species Overview\n\n| Species | Length (feet) | Weight (pounds) | Distribution | Status |\n|-------------------|---------------|-----------------|-------------------------------------|------------------|\n| White Sturgeon | Up to 20 | Up to 1,800 | Western coast from Mexico to Alaska | Not endangered* |\n| Shortnose Sturgeon| Up to 4 | Up to 50 | Eastern coast from New Brunswick to Florida | Endangered |\n| Green Sturgeon | Up to 7 | Up to 350 | Western coast from Mexico to Alaska | Threatened |\n\n*Note: Local populations such as those in the Kootenai River are considered endangered.\n\n#### Conclusion\nThe North American sturgeons, with their impressive sizes and ancient lineage, are crucial components of their aquatic ecosystems. However, they are facing significant threats that require concerted conservation efforts to ensure their survival and the health of their habitats.', name='Search')]}}
---
{'supervisor': {'next': 'Paper writing team'}}
---
{'Paper writing team': {'messages': [HumanMessage(content='The research report on North American Sturgeon has been successfully created and saved. If you need further analysis, edits, or additional information, feel free to let me know!', name='Doc_Writer')]}}
---
{'supervisor': {'next': 'FINISH'}}
---

LangSmith流程展示:

三、第三方应用

基于LangGraph构建的 Multi-Agent 架构

3.1 GPT-Newspaper

GPT-Newspaper 是一个创新的自治Agent,旨在创建根据用户偏好量身定制的个性化报纸。GPT-Newspaper 通过利用人工智能的力量,根据个人的品味和兴趣来策划、撰写、设计和编辑内容,从而彻底改变了我们消费新闻的方式。该架构由六个专门的子Agent组成。其中有一个关键步骤——一个作者与评论者之间的循环,这个循环增加了一个有益的循环过程。

3.2 Crew AI example

CrewAI 结合 LangChain 和 LangGraph 自动化检查电子邮件和创建草稿的过程。CrewAI 对自主 AI 代理进行编排,使它们能够高效地协作并执行复杂任务。

YouTube视频:https://youtu.be/OzYdPqzlcPo

四、其他框架

LangGraph 并不是第一个支持Multi-Agent工作流的框架。这些框架之间的大部分差异主要在于它们引入的mental model和概念。

4.1 Autogen

Autogen 可能是第一个Multi-Agent框架。LangGraph 和 Autogen 在mental model上的最大区别在于Agent的构建方式。LangGraph 倾向于明确定义不同的Agent和转换概率,并将其表示为图。Autogen 则更多地将其框架用于“对话”。LangGraph官方认为这种“图”框架更直观,并为构建更复杂和有特定意见的工作流(真正想要控制节点之间的转换概率)提供了更好的开发者体验。它还支持那些没有被“对话”明确捕获的工作流。

Autogen 和 LangGraph 之间的另一个关键区别是,LangGraph 完全集成到了 LangChain 生态系统中,这意味着你可以充分利用所有 LangChain 集成和 LangSmith 的可观测性。

4.2 CrewAI

CrewAI 最近成为创建 Multi-Agent Teams 的流行方式。与LangGraph相比,CrewAI是一个更高级别的框架。LangGraph官方正在积极与CrewAI团队合作,将LangGraph集成到CrewAI中。CrewAI已经实现了一个非常棒的更高级别的开发者体验(DevEx)。

官方资源

代码示例更详细说明:


LangGraph(三)—— Multi-Agent Workflows
https://mztchaoqun.com.cn/posts/D34_LangGraph_Multi-Agent/
作者
mztchaoqun
发布于
2024年8月25日
许可协议