LlamaIndex(四)——LlamaIndex Loading

LlamaIndex 中数据提取的关键是加载和转换。 加载文档后,您可以通过转换和输出节点来处理它们。

一、文档和节点

文档和节点对象是 LlamaIndex 中的核心抽象。文档可以处理很多类型的数据源,例如,PDF,API输出或者数据库检索。通过LlamaIndex可以手动构建数据,也可以通过数据加载器自动创建。默认情况下,文档会存储文本以及一些其他属性:

  • metadata-可以附加到文本的注释字典。
  • relationships-包含与其他文档/节点的关系的字典。

文档还支持存储图像,并且LlamaIndex正在积极地改进其多模态能力。

一个Node代表源文档的一个chunk,无论是文本块、图像块还是其他类型的块。与文档类似,它们包含元数据和与其他节点的关系信息。

节点在 LlamaIndex 中级别最高。可以直接定义节点及其所有属性。也可以选择通过 NodeParser 类将源文档解析为节点。默认情况下,从文档派生的每个节点都会继承该文档的相同元数据(例如,文档中的 file_name字段会传播到每个节点)。

1.1 文档

1.1.1 定义文档

默认情况下,所有的数据加载器通过load_data函数返回 Document 对象。

1
2
3
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data/paul_graham/").load_data()

也可以选择手动构建文档。LlamaIndex 提供了 Document 结构体。

1
2
3
4
from llama_index.core import Document

text_list = [text1, text2, ...]
documents = [Document(text=t) for t in text_list]

1.1.2 自定义文档

本节介绍自定义 Document 对象的各种方法。由于 Document 对象是 TextNode 对象的子类,所有这些设置和细节也适用于 TextNode 对象类。

Metadata

文档还提供了包含有用元数据的功能。使用每个文档上的metadata字典,可以包含额外的信息,以帮助提供查询响应的信息并追踪查询响应的来源。这些信息可以是任何东西,比如文件名或类别。如果你正在与向量数据库集成,一些向量数据库要求keys必须是字符串,值必须是扁平的(要么是 str,要么是 float,或者是 int)。

在每个文档的metadata字典中设置的任何信息都会出现在使用该文档创建的每个源节点的metadata中。此外,这些信息包含在节点中,使索引能够在查询和响应中使用它。默认情况下,metadata被注入到文本中,用于嵌入和大语言模型调用。
有几种设置这个字典的方法:

  1. 在文档构造器中
1
2
3
4
document = Document(
text="text",
metadata={"filename": "<doc_file_name>", "category": "<category>"},
)
  1. 文档创建后
1
document.metadata = {"filename": "<doc_file_name>"}
  1. 使用 SimpleDirectoryReaderfile_metadata 钩子自动设置文件名。这将自动在每个文档上运行钩子以设置元数据字段
1
2
3
4
5
6
7
8
from llama_index.core import SimpleDirectoryReader

filename_fn = lambda filename: {"file_name": filename}

# automatically sets the metadata of each document according to filename_fn
documents = SimpleDirectoryReader(
"./data", file_metadata=filename_fn
).load_data()

自定义文档id

doc_id 用于在索引中高效地刷新文档。当使用 SimpleDirectoryReader 时,可以自动将doc_id设置为每个文档的完整路径

1
2
3
4
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("./data", filename_as_id=True).load_data()
print([x.doc_id for x in documents])

还可以直接设置任何Documentdoc_id

1
document.doc_id = "My new document id!"

1.1.3 高级Metadata自定义

上述提到的关键细节是,默认情况下,你设置的任何元数据都包含在embeddings生成和LLM中。

自定义LLM Metadata文本

通常,文档可能有很多metadata keys,但你可能不想让大语言模型在响应合成期间看到所有的metadata keys。在上面的例子中,可能不希望LLM读取文档的file_name。然而,file_name可能包含有助于生成更好embeddings的信息。这样做的一个关键优势是在不改变LLM最终阅读内容的情况下,可以不影响检索embeddings。

可以这样排除它:

1
document.excluded_llm_metadata_keys = ["file_name"]

然后,可以使用 get_content() 函数并指定 MetadataMode.LLM 来测试LLM实际上最终会读取什么:

1
2
3
from llama_index.core.schema import MetadataMode

print(document.get_content(metadata_mode=MetadataMode.LLM))

自定义Embedding Metadata文本

类似于自定义LLM可见的元数据,也可以自定义Embedding模型可见的元数据。在这种情况下,你可以特别排除Embedding模型可见的元数据,以防你不希望特定文本影响Embedding 。

1
document.excluded_embed_metadata_keys = ["file_name"]

然后,可以使用 get_content() 函数并指定 MetadataMode.EMBED 来测试Embedding 模型实际上最终会读取什么:

1
2
3
from llama_index.core.schema import MetadataMode

print(document.get_content(metadata_mode=MetadataMode.EMBED))

自定义metadata格式

元数据在发送到LLM或Embedding模型时被注入到每个文档/节点的实际文本中。默认情况下,此元数据的格式由三个属性控制:

  1. Document.metadata_seperator -> default = "\n"

在连接元数据的所有key/value字段时,此字段控制每个key/value对之间的分隔符。

  1. Document.metadata_template -> default = "{key}: {value}"

此属性控制元数据中每个key/value对的格式。两个变量 key 和 value 字符串键是必需的。

  1. Document.text_template -> default = {metadata_str}\n\n{content}

当元数据使用 metadata_seperatormetadata_template 转换为字符串时,此模板控制该元数据与你的文档/节点的文本内容连接后的格式。元数据和内容字符串key是必需的。

1.1.4 总结

创建一个使用所有这些功能的简短示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from llama_index.core import Document
from llama_index.core.schema import MetadataMode

document = Document(
text="This is a super-customized document",
metadata={
"file_name": "super_secret_document.txt",
"category": "finance",
"author": "LlamaIndex",
},
excluded_llm_metadata_keys=["file_name"],
metadata_seperator="::",
metadata_template="{key}=>{value}",
text_template="Metadata: {metadata_str}\n-----\nContent: {content}",
)

print(
"The LLM sees this: \n",
document.get_content(metadata_mode=MetadataMode.LLM),
)
print(
"The Embedding model sees this: \n",
document.get_content(metadata_mode=MetadataMode.EMBED),
)

输出

1
2
3
4
5
6
7
8
The LLM sees this: 
Metadata: category=>finance::author=>LlamaIndex
-----
Content: This is a super-customized document
The Embedding model sees this:
Metadata: file_name=>super_secret_document.txt::category=>finance::author=>LlamaIndex
-----
Content: This is a super-customized document

1.2 节点

Nodes代表源文档的chunk,无论是文本块、图像还是更多。它们还包含元数据以及与其他节点和索引结构的关系信息。

在 LlamaIndex 中,节点是最高级别的。你可以选择直接定义节点及其所有属性。你也可以选择通过 NodeParser 类将源文档“解析”成节点。

自动构建

1
2
3
4
5
from llama_index.core.node_parser import SentenceSplitter

parser = SentenceSplitter()

nodes = parser.get_nodes_from_documents(documents)

手动构建 Node 对象

1
2
3
4
5
6
7
8
9
10
11
12
from llama_index.core.schema import TextNode, NodeRelationship, RelatedNodeInfo

node1 = TextNode(text="<text_chunk>", id_="<node_id>")
node2 = TextNode(text="<text_chunk>", id_="<node_id>")
# set relationships
node1.relationships[NodeRelationship.NEXT] = RelatedNodeInfo(
node_id=node2.node_id
)
node2.relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo(
node_id=node1.node_id
)
nodes = [node1, node2]

RelatedNodeInfo 类还可以存储额外的元数据:

1
2
3
node2.relationships[NodeRelationship.PARENT] = RelatedNodeInfo(
node_id=node1.node_id, metadata={"key": "val"}
)

定制node_id

每个节点都有一个 node_id 属性,如果没有手动指定,则会自动生成。这个 ID 可以用于多种目的,包括能够更新存储中的节点,通过 IndexNode 定义节点之间的关系等。

也可以直接获取和设置任何 TextNodenode_id

1
2
print(node.node_id)
node.node_id = "My new node_id!"

1.3 元数据提取(Metadata Extraction)

可以使用LLM通过元数据提取器模块来自动化元数据提取。元数据提取器模块包括以下feature extractors

  • SummaryExtractor - 自动提取一组节点的摘要
  • QuestionsAnsweredExtractor - 提取每个节点能够回答的一组问题
  • TitleExtractor - 提取每个节点上下文的标题
  • EntityExtractor - 提取每个节点内容中提到的实体(例如地名、人名、事物名)

然后,将元数据提取器与节点解析器串联起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from llama_index.core.extractors import (
TitleExtractor,
QuestionsAnsweredExtractor,
)
from llama_index.core.node_parser import TokenTextSplitter

text_splitter = TokenTextSplitter(
separator=" ", chunk_size=512, chunk_overlap=128
)
title_extractor = TitleExtractor(nodes=5)
qa_extractor = QuestionsAnsweredExtractor(questions=3)

# assume documents are defined -> extract nodes
from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
transformations=[text_splitter, title_extractor, qa_extractor]
)

nodes = pipeline.run(
documents=documents,
in_place=True,
show_progress=True,
)

或者插入到索引中:

1
2
3
4
5
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_documents(
documents, transformations=[text_splitter, title_extractor, qa_extractor]
)

其他一些例子:https://docs.llamaindex.ai/en/stable/module_guides/loading/documents_and_nodes/usage_metadata_extractor/#resources

二、SimpleDirectoryReader

SimpleDirectoryReader 是将本地文件数据加载到 LlamaIndex 的最简单方式。对于生产用例,可以要使用 LlamaHub 上提供的 Readers。

2.1 支持的文件类型

默认情况下,SimpleDirectoryReader 会尝试读取它找到的任何文件,并将它们全部视为文本。除了纯文本之外,它还明确支持以下文件类型,这些文件类型会根据文件扩展名自动被检测:

  • .csv - 逗号分隔值
  • .docx - Microsoft Word
  • .epub - EPUB 电子书格式
  • .hwp - Hangul Word Processor
  • .ipynb - Jupyter Notebook
  • .jpeg, .jpg - JPEG 图像
  • .mbox - MBOX 电子邮件存档
  • .md - Markdown
  • .mp3, .mp4 - 音频和视频
  • .pdf - 便携式文档格式
  • .png - 便携式网络图形
  • .ppt, .pptm, .pptx - Microsoft PowerPoint

JSON Loader可以支持JSON格式的文件。

2.2 使用

最基本的使用方式是传递一个 input_dir,它将加载该目录下所有支持的文件:

1
2
3
4
from llama_index.core import SimpleDirectoryReader

reader = SimpleDirectoryReader(input_dir="./data/paul_graham/")
documents = reader.load_data()

如果从目录加载较多文件,可以使用multiprocessing来加载文档,使用多进程时,在 Windows 和 Linux/MacOS 机器上存在差异,这在multiprocessing文档中有所解释[1]。最终,Windows 用户可能会看到较少或没有性能提升,而 Linux/MacOS 用户在加载完全相同的文件集时会看到这些提升。

1
2
...
documents = reader.load_data(num_workers=4)

2.2.1 从子目录中读取

默认情况下,SimpleDirectoryReader 只会读取目录顶层的文件。要读取子目录中的文件,设置 recursive=True

1
SimpleDirectoryReader(input_dir="path/to/directory", recursive=True)

2.2.2 在加载时迭代文件

使用 iter_data() 方法,在加载文件时对文件进行迭代和处理:

1
2
3
4
5
reader = SimpleDirectoryReader(input_dir="path/to/directory", recursive=True)
all_docs = []
for docs in reader.iter_data():
# <do something with the documents per file>
all_docs.extend(docs)

2.2.3 限制加载的文件

可以传递一个文件路径列表:

1
2
3
SimpleDirectoryReader(
input_dir="path/to/directory", required_exts=[".pdf", ".docx"]
)

使用 exclude 传递一个要排除的文件路径列表:

1
2
3
SimpleDirectoryReader(
input_dir="path/to/directory", exclude=["path/to/file1", "path/to/file2"]
)

还可以将 required_exts 设置为一个文件扩展名列表,以仅加载具有这些扩展名的文件:

1
2
3
SimpleDirectoryReader(
input_dir="path/to/directory", required_exts=[".pdf", ".docx"]
)

还可以使用 num_files_limit 设置要加载的文件的最大数量:

1
SimpleDirectoryReader(input_dir="path/to/directory", num_files_limit=100)

2.2.4 指定文件编码

SimpleDirectoryReader 默认是 utf-8 编码的,但您可以使用 encoding 参数来覆盖它:

1
SimpleDirectoryReader(input_dir="path/to/directory", encoding="latin-1")

2.2.5 读取metadata

可以通过 file_metadata 传递来指定一个函数,该函数将读取每个文件并提取附加到每个文件的 Document 对象的元数据:

1
2
3
4
5
def get_meta(file_path):
return {"foo": "bar", "file_path": file_path}


SimpleDirectoryReader(input_dir="path/to/directory", file_metadata=get_meta)

该函数应该接受单个参数,即文件路径,并返回一个元数据字典。

2.2.6 扩展到其他文件类型

可以通过将文件扩展名字典传递给 BaseReader 实例作为 file_extractor 来扩展 SimpleDirectoryReader 以读取其他文件类型。BaseReader 应该读取文件并返回一个 Document 对象列表。例如,要添加对 .myfile文件的自定义支持:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from llama_index.core import SimpleDirectoryReader
from llama_index.core.readers.base import BaseReader
from llama_index.core import Document


class MyFileReader(BaseReader):
def load_data(self, file, extra_info=None):
with open(file, "r") as f:
text = f.read()
# load_data returns a list of Document objects
return [Document(text=text + "Foobar", extra_info=extra_info or {})]


reader = SimpleDirectoryReader(
input_dir="./data", file_extractor={".myfile": MyFileReader()}
)

documents = reader.load_data()
print(documents)

这个映射将覆盖指定的文件类型的默认文件提取器,因此如果想要重新覆盖的类型,需要将它们重新添加回去。

2.2.7 支持外部文件系统

与其他模块一样,SimpleDirectoryReader 接受一个可选的 fs 参数,可以用来遍历远程文件系统。

fsspec 协议实现的任何文件系统对象。 fsspec 协议具有针对各种远程文件系统的开源实现,包括 AWS S3、Azure BlobDataLake、Google Drive、SFTP 等。

以下是连接到 S3 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
from s3fs import S3FileSystem

s3_fs = S3FileSystem(key="...", secret="...")
bucket_name = "my-document-bucket"

reader = SimpleDirectoryReader(
input_dir=bucket_name,
fs=s3_fs,
recursive=True, # recursively searches all subdirectories
)

documents = reader.load_data()
print(documents)

可以查看完整的例子

三、Data Connectors

数据连接器(也称为Reader)从不同的数据源和数据格式中提取数据,将其转换为简单的Document表示形式(文本和简单的元数据)。

数据连接器通过 LlamaHub 提供,LlamaHub 是一个开源仓库。

3.1 使用

每个数据加载器都包含一个Usage部分,展示了如何使用该加载器。使用每个加载器的核心是一个 download_loader 函数,该函数将加载器文件下载到在应用程序中可以使用的模块。

1
2
3
4
5
6
7
8
9
10
from llama_index.core import VectorStoreIndex, download_loader

from llama_index.readers.google import GoogleDocsReader

gdoc_ids = ["1wf-y2pd9C878Oh-FmLH7Q_BQkljdm6TQal-c1pUfrec"]
loader = GoogleDocsReader()
documents = loader.load_data(document_ids=gdoc_ids)
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
query_engine.query("Where did the author go to school?")

3.2 LlamaParse

LlamaParse 是由 LlamaIndex 创建的 API,用于高效地解析和表示文件,以便使用 LlamaIndex 框架进行高效的检索和上下文增强。LlamaParse 直接与 LlamaIndex 集成。

首先,从 https://cloud.llamaindex.ai 获取一个 api-key。运行以下代码来解析你的第一个 PDF 文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import nest_asyncio

nest_asyncio.apply()

from llama_parse import LlamaParse

parser = LlamaParse(
api_key="llx-...", # can also be set in your env as LLAMA_CLOUD_API_KEY
result_type="markdown", # "markdown" and "text" are available
verbose=True,
)

# sync
documents = parser.load_data("./my_file.pdf")

# sync batch
documents = parser.load_data(["./my_file1.pdf", "./my_file2.pdf"])

# async
documents = await parser.aload_data("./my_file.pdf")

# async batch
documents = await parser.aload_data(["./my_file1.pdf", "./my_file2.pdf"])

也可以将解析器集成为 SimpleDirectoryReader 的默认 PDF 加载器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import nest_asyncio

nest_asyncio.apply()

from llama_parse import LlamaParse
from llama_index.core import SimpleDirectoryReader

parser = LlamaParse(
api_key="llx-...", # can also be set in your env as LLAMA_CLOUD_API_KEY
result_type="markdown", # "markdown" and "text" are available
verbose=True,
)

file_extractor = {".pdf": parser}
documents = SimpleDirectoryReader(
"./data", file_extractor=file_extractor
).load_data()

更多例子:https://docs.llamaindex.ai/en/stable/module_guides/loading/connector/llama_parse/#examples

Data Connectors Module Guides:https://docs.llamaindex.ai/en/stable/module_guides/loading/connector/modules/

四、Node Parsers

Node parsers 是一种简单的抽象,它们接受一组文档列表,并将它们分解成 Node 对象,使得每个节点都是父文档的一个特定部分。当文档被分解成节点时,它的所有属性(即元数据、文本和元数据模板等)都会被继承到子节点中。

4.1 使用

4.1.1 独立使用

Node parsers 可以独立使用:

1
2
3
4
5
6
7
8
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter

node_parser = SentenceSplitter(chunk_size=1024, chunk_overlap=20)

nodes = node_parser.get_nodes_from_documents(
[Document(text="long text")], show_progress=False
)

4.1.2 Transformation

Node parsers 可以包含在任何转换集中,与提取管道一起使用。

1
2
3
4
5
6
7
8
9
from llama_index.core import SimpleDirectoryReader
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import TokenTextSplitter

documents = SimpleDirectoryReader("./data").load_data()

pipeline = IngestionPipeline(transformations=[TokenTextSplitter(), ...])

nodes = pipeline.run(documents=documents)

4.1.3 Index

transformations或全局设置中设置,在.from_documents() 构建索引时自动使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from llama_index.core import SimpleDirectoryReader, VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter

documents = SimpleDirectoryReader("./data").load_data()

# global
from llama_index.core import Settings

Settings.text_splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=20)

# per-index
index = VectorStoreIndex.from_documents(
documents,
transformations=[SentenceSplitter(chunk_size=1024, chunk_overlap=20)],
)

4.2 Node Parser Modules

4.2.1 基于文件的节点解析器

基于文件的节点解析器可以根据不同的内容类型(如JSON、Markdown等)创建节点。最简单的流程是将FlatFileReaderSimpleFileNodeParser结合使用,以自动为每种内容类型使用最佳的节点解析器。考虑文本的实际长度,可以将基于文件的节点解析器与基于文本的节点解析器串联起来。

SimpleFileNodeParser

1
2
3
4
5
6
7
8
from llama_index.core.node_parser import SimpleFileNodeParser
from llama_index.readers.file import FlatReader
from pathlib import Path

md_docs = FlatReader().load_data(Path("./test.md"))

parser = SimpleFileNodeParser()
md_nodes = parser.get_nodes_from_documents(md_docs)

HTMLNodeParser

此节点解析器使用beautifulsoup解析原始HTML。默认情况下,它将解析选定的HTML标签子集,但设置可以改变。

默认标签为:["p", "h1", "h2", "h3", "h4", "h5", "h6", "li", "b", "i", "u", "section"]

1
2
3
4
from llama_index.core.node_parser import HTMLNodeParser

parser = HTMLNodeParser(tags=["p", "h1"]) # optional list of tags
nodes = parser.get_nodes_from_documents(html_docs)

JSONNodeParser

JSONNodeParser解析原始JSON

1
2
3
4
5
from llama_index.core.node_parser import JSONNodeParser

parser = JSONNodeParser()

nodes = parser.get_nodes_from_documents(json_docs)

MarkdownNodeParser

MarkdownNodeParser解析原始markdown文本。

1
2
3
4
5
from llama_index.core.node_parser import MarkdownNodeParser

parser = MarkdownNodeParser()

nodes = parser.get_nodes_from_documents(markdown_docs)

4.2.2 Text-Splitters

CodeSplitter
根据编写的语言分割原始代码文本。支持语言的完整列表

1
2
3
4
5
6
7
8
9
from llama_index.core.node_parser import CodeSplitter

splitter = CodeSplitter(
language="python",
chunk_lines=40, # lines per chunk
chunk_lines_overlap=15, # lines overlap between chunks
max_chars=1500, # max chars per chunk
)
nodes = splitter.get_nodes_from_documents(documents)

LangchainNodeParser

还可以使用节点解析器包装任何现有的langchain文本分割器。

1
2
3
4
5
from langchain.text_splitter import RecursiveCharacterTextSplitter
from llama_index.core.node_parser import LangchainNodeParser

parser = LangchainNodeParser(RecursiveCharacterTextSplitter())
nodes = parser.get_nodes_from_documents(documents)

SentenceSplitter

SentenceSplitter尝试在考虑句子边界的同时分割文本。

1
2
3
4
5
6
7
from llama_index.core.node_parser import SentenceSplitter

splitter = SentenceSplitter(
chunk_size=1024,
chunk_overlap=20,
)
nodes = splitter.get_nodes_from_documents(documents)

SentenceWindowNodeParser
SentenceWindowNodeParser与其他节点解析器类似,但它将所有文档分割成单独的句子。结果节点还在元数据中包含了每个节点周围window的句子。请注意,此元数据对LLM或Embedding模型不可见。

这在生成具有非常特定范围的Embedding时最有用。然后,结合MetadataReplacementNodePostProcessor使用,可以在将节点发送到LLM之前,用其周围上下文替换句子。

以下是使用默认设置设置解析器的示例。在实践中,通常只需要调整句子窗口的大小。

1
2
3
4
5
6
7
8
9
10
11
import nltk
from llama_index.core.node_parser import SentenceWindowNodeParser

node_parser = SentenceWindowNodeParser.from_defaults(
# how many sentences on either side to capture
window_size=3,
# the metadata key that holds the window of surrounding sentences
window_metadata_key="window",
# the metadata key that holds the original sentence
original_text_metadata_key="original_sentence",
)

与MetadataReplacementNodePostProcessor结合使用的完整示例

SemanticSplitterNodeParser

Semantic chunking是Greg Kamradt在他的视频教程中提出的一个新概念,关于5个embedding chunking级别的教程:https://youtu.be/8OJC21T2SL4?t=1933

与使用固定块大小分割文本不同,语义分割器会使用embedding相似性自适应地选择句子之间的断点。这确保了一个“块”包含了语义上相关联的句子。

  • 该正则表达式主要用于英语句子
  • 可能需要调整断点百分位阈值。
1
2
3
4
5
6
7
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.embeddings.openai import OpenAIEmbedding

embed_model = OpenAIEmbedding()
splitter = SemanticSplitterNodeParser(
buffer_size=1, breakpoint_percentile_threshold=95, embed_model=embed_model
)

完整示例

TokenTextSplitter

TokenTextSplitter试图根据原始Token计数分割成一致的块大小。

1
2
3
4
5
6
7
8
from llama_index.core.node_parser import TokenTextSplitter

splitter = TokenTextSplitter(
chunk_size=1024,
chunk_overlap=20,
separator=" ",
)
nodes = splitter.get_nodes_from_documents(documents)

4.2.3 Relation-Based Node Parsers

HierarchicalNodeParser

此节点解析器将节点分块成层次化的节点。这意味着单个输入将被分块成几个层次的块大小,每个节点都包含对其父节点的引用。

当与AutoMergingRetriever结合使用时,这使我们能够自动地用其父节点替换检索到的节点,当大多数子节点被检索到时。这个过程为响应合成提供了更完整的上下文给LLM。

1
2
3
4
5
from llama_index.core.node_parser import HierarchicalNodeParser

node_parser = HierarchicalNodeParser.from_defaults(
chunk_sizes=[2048, 512, 128]
)

与AutoMergingRetriever结合使用的完整示例

五、Ingestion Pipeline

IngestionPipeline使用了一个Transformations的概念,这些转换应用于输入数据。这些转换应用于您的输入数据,生成的节点要么被返回,要么被插入到给定的向量数据库中。每个节点+转换对都会被缓存,这样在后续运行中(如果缓存被保留)使用相同的节点+转换组合时,就可以使用缓存结果,从而节省时间。

要查看IngestionPipeline的实际应用示例,请查看 RAG CLI

5.1 Ingestion Pipeline

最简单的使用方式是实例化一个IngestionPipeline,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache

# create the pipeline with transformations
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
]
)

# run the pipeline
nodes = pipeline.run(documents=[Document.example()])

5.1.1 连接向量数据库

在运行IngestionPipeline时,您还可以选择自动将生成的节点插入到远程向量存储中。然后,您可以稍后从该向量存储构建索引。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")

pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
vector_store=vector_store,
)

# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])

# Create your index
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

在上述示例中,Embedding是在管道的一部分计算的。如果您将管道连接到向量存储,Embedding必须是您管道的一个阶段,否则您后续实例化索引将会失败。如果您没有连接到向量存储,只是生成节点列表,您可以从管道中省略嵌入。

IngestionPipeline中,每个节点 + 转换组合都会被哈希并缓存。这在后续使用相同数据的运行中节省了时间。

5.1.2 本地缓存管理

一旦有了pipeline,您可能想要存储和加载缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# save
pipeline.persist("./pipeline_storage")

# load and restore state
new_pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
],
)
new_pipeline.load("./pipeline_storage")

# will run instantly due to the cache
nodes = pipeline.run(documents=[Document.example()])

# delete all context of the cache
cache.clear()

5.1.3 远程缓存管理

支持多个远程存储后端用于缓存

  • RedisCache
  • MongoDBCache
  • FirestoreCache

以下是使用 RedisCache 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.core.ingestion.cache import RedisCache


pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TitleExtractor(),
OpenAIEmbedding(),
],
cache=IngestionCache(
cache=RedisCache(
redis_uri="redis://127.0.0.1:6379", collection="test_cache"
)
),
)

# Ingest directly into a vector db
nodes = pipeline.run(documents=[Document.example()])

这里不需要 persist 步骤,因为一切都在指定的远程集合中缓存。

5.1.4 异步支持

IngestionPipeline还支持异步操作

1
nodes = await pipeline.arun(documents=documents)

5.1.5 Document Management

将文档存储 (docstore) 附加到IngestionPipeline将启用文档管理。使用 document.doc_idnode.ref_doc_id作为基准点,IngestionPipeline将积极寻找重复文档。

它的工作原理是:

  • 存储一个 doc_id -> document_hash 的映射

  • 如果附加了向量存储:

    • 如果检测到重复的 doc_id,并且哈希已更改,则文档将被重新处理并更新插入

    • 如果检测到重复的 doc_id 并且哈希未更改,则跳过该节点

  • 如果没有附加向量存储:

    • 检查每个节点的所有现有哈希

    • 如果发现重复项,则跳过该节点

    • 否则,处理该节点

注意:如果我们没有附加向量存储,我们只能检查并删除重复的输入。

1
2
3
4
5
6
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore

pipeline = IngestionPipeline(
transformations=[...], docstore=SimpleDocumentStore()
)

完整例子 1,2

5.1.6 Parallel Processing

IngestionPipelinerun 方法可以使用并行执行。它通过使用 multiprocessing.Pool 将节点批次分配到多个处理器中来实现。

要使用并行处理执行,请将 num_workers设置为您想要使用的进程数量:

1
2
3
4
5
6
from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)

更多:https://docs.llamaindex.ai/en/stable/module_guides/loading/ingestion_pipeline/#modules

5.2 Transformations

变换是一种输入节点列表并返回节点列表的操作。实现变换基类(Transformation base class)的每个组件都具有同步的 __call__() 定义和异步的 acall() 定义。

目前,以下组件是Transformation对象:

  • 文本分割器(TextSplitter)
  • 节点解析器(NodeParser)
  • 元数据提取器(MetadataExtractor)
  • Embeddings模型(支持的Embeddings模型列表

5.2.1 使用

变换最好与IngestionPipeline一起使用,但也可以直接使用。

1
2
3
4
5
6
7
8
9
10
11
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor

node_parser = SentenceSplitter(chunk_size=512)
extractor = TitleExtractor()

# use transforms directly
nodes = node_parser(documents)

# or use a transformation in async
nodes = await extractor.acall(nodes)

5.2.2 与索引结合使用

变换可以传递到索引或整体全局设置中,并在调用索引上的 from_documents()insert() 时使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from llama_index.core import VectorStoreIndex
from llama_index.core.extractors import (
TitleExtractor,
QuestionsAnsweredExtractor,
)
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.node_parser import TokenTextSplitter

transformations = [
TokenTextSplitter(chunk_size=512, chunk_overlap=128),
TitleExtractor(nodes=5),
QuestionsAnsweredExtractor(questions=3),
]

# global
from llama_index.core import Settings

Settings.transformations = [text_splitter, title_extractor, qa_extractor]

# per-index
index = VectorStoreIndex.from_documents(
documents, transformations=transformations
)

5.2.3 自定义变换

通过实现基类来实现任何自定义变换。以下自定义变换将从文本中移除任何特殊字符或标点符号。

1
2
3
4
5
6
7
8
9
10
11
12
13
import re
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.schema import TransformComponent


class TextCleaner(TransformComponent):
def __call__(self, nodes, **kwargs):
for node in nodes:
node.text = re.sub(r"[^0-9A-Za-z ]", "", node.text)
return nodes

可以直接使用或在任何IngestionPipeline中使用。

1
2
3
4
5
6
7
8
9
10
# use in a pipeline
pipeline = IngestionPipeline(
transformations=[
SentenceSplitter(chunk_size=25, chunk_overlap=0),
TextCleaner(),
OpenAIEmbedding(),
],
)

nodes = pipeline.run(documents=[Document.example()])

官方资源


LlamaIndex(四)——LlamaIndex Loading
https://mztchaoqun.com.cn/posts/D17_LlamaIndex_Loading/
作者
mztchaoqun
发布于
2024年4月18日
许可协议