我们已经看到了整个RAG流程,并获得了第一手的实践经验,您可能会对RAG流程中一些组件的使用和目的存在很多疑惑,比如RunnablePassthrough。在本节中,我们将进一步了解这些关键组件。
RAG的核心模型思想是将一个复杂的任务分解为多个小而简单的任务组合,分而治之的策略在处理大型和复杂问题时非常有用。让我们先来看一个非常简单的RAG流水线:
from langchain_core.runnables import Runnable, RunnableSequence# 定义第一个任务:输出 "hello"
class TaskA(Runnable):def invoke(self, input, context=None):return input + " a"# 定义第二个任务:输出 "beautiful"
class TaskBeautiful(Runnable):def invoke(self, input, context=None):return input + " beautiful"class TaskDay(Runnable):def invoke(self, input, context = None):return input + " day!"
# 创建任务实例
task_a = TaskA()
task_beautiful = TaskBeautiful()
task_day = TaskDay()# 使用"|"操作符重载Runnable
rag_chain = task_a | task_beautiful | task_day
output = rag_chain.invoke("what")
print(output)
上面代码的输出是"what a beautiful day"。可以看到Runnable类重载了"|"操作符,我们可以将Runnable的实例组合在一起,并且最后一个Runnable实例的输出成为下一个Runnable实例的输入。同时,我们需要确保每个Runnable实例提供invoke接口,以便数据在连接的Runnable实例之间传递。
回到RAG流程中使用的组件,RunnablePassThrough实际上类似于以下代码:
class DoNothing(Runnable):def invoke(self, input, context=None):return input
让我们看一个例子:
passthrough = RunnablePassthrough()
input_data = {"msg": "this is a test"}
output_data = passthrough.invoke(input_data)
print(output_data)
运行上面的代码将输出: {‘msg’: ‘this is a test’}
RunnablePassthrough通常在管道中用作占位符或连接器,以连接两个功能管道,但它实际上可以对其输入数据进行一些简单的更改:
# 原始数据
input_data = {"message": "Hello, World!", "status": "initial"}passthrough = RunnablePassthrough.assign(result = lambda x: "processed")# 使用输入数据调用
output_data = passthrough.invoke(input_data)print(output_data)
这里我们需要确保输入数据是字典形式,且assign的参数为lambda表达式,运行上述代码得到以下结果:
{'message': 'Hello, World!', 'status': 'initial', 'result': 'processed'}
Runnable对象可以像下面这样将几个函数链接在一起:
def task1(input1):return f"task1:{input1}"def task2(input1):return f"task2:{input1}"def context(input):return f"context: {input}"rag_chain = RunnablePassthrough().assign(context=context) | task1 | task2
res = rag_chain.invoke({"invoke": "input for invoke"})
print(res)
invoke调用将数据发送到RunnablePassthrough对象,数据未更改地输出,然后数据将发送到task1,task1的输出将作为task2的输入。运行上述代码的结果为:
task2:task1:{'invoke': 'input for invoke', 'context': "context: {'invoke': 'input for invoke'}"}
另一个需要了解的组件是RunnableMap,它允许我们将多个Runnable对象组合在一起,可以并行或顺序运行。RunnableMap对象就像一个字典,字典中的键就像分支或步骤,可以在相同输入上独立运行。当我们需要多个步骤或函数独立处理相同输入时,这非常有用。让我们来看一个示例:
from langchain_core.runnables import Runnable, RunnableMap# 定义第一个任务:输出 "task1"
class Task1(Runnable):def invoke(self, input, context=None):return f"task1 for {input}"class Task2(Runnable):def invoke(self, input, context=None):return f"task2 for {input}"
pipeline = RunnableMap({"task1": Task1(),"task2": Task2(),
})input_data = "input data"
output_data = pipeline.invoke(input_data)
print(output_data)
传递给RunnableMap的映射中每个键的值都应实现invoke接口,pipeline.invoke调用将并行触发映射中每个值的invoke调用。上述代码的输出为:
{'task1': 'task1 for input data', 'task2': 'task2 for input data'}
最后一个我们需要了解的组件是RunnableParallel,它类似于RunnableMap,它们之间可能有一些细微差别,但在此不重要。我们可以将RunnableParallel视为RunnableMap,值得注意的一点是RunnableParallel的assign。让我们来看代码示例:
from langchain_core.runnables import Runnable, RunnableParallel, RunnableMap
import time
# 定义第一个任务:输出 "a"
class TaskA(Runnable):def invoke(self, input, context=None):time.sleep(1)return input + " a"# 定义第二个任务:输出 "beautiful"
class TaskBeautiful(Runnable):def invoke(self, input, context=None):time.sleep(2)return input + " beautiful"class TaskDay(Runnable):def invoke(self, input, context = None):time.sleep(3)return input + " day"
# 创建任务实例class TaskAssign(Runnable):def invoke(self, input, context = None):time.sleep(3)print(f"input: {input}")return {**input}start = time.time()
parallel_tasks = RunnableParallel({"taskA": TaskA(),"taskBeautiful": TaskBeautiful(),"TaskDay": TaskDay(),
}).assign(taskAssign=TaskAssign())
output = parallel_tasks.invoke("what")
print(f"output for rag run parallelly: {output}")
end = time.time()
print(f"time for rag run parallel: {end - start}" )
上面代码的输出为:
input: {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day'}
output for rag run parallelly: {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day', 'taskAssign': {'taskA': 'what a', 'taskBeautiful': 'what beautiful', 'TaskDay': 'what day'}}
time for rag run parallel: 6.011621713638306
从输出可以看出,在RunnableParallel中使用assign时,它会首先执行传递给RunnableParallel的映射中的任务,然后使用结果字典作为TaskAssign的参数,最终输出为RunnableParallel的字典和一个键为"TaskAssign"的条目,值为TaskAssign的返回结果。
最后,让我们重新构建上一节的RAG流水线,如下所示:
def task_from_docs(input):print(f"task from docs: {input}")print(f"input['context']: {input['context']}")return lambda x: format_docs(input["context"])rag_chain_from_docs = (RunnablePassthrough.assign(context=(task_from_docs))
)rag_chain_with_source = RunnableParallel({"context": retriver,"question": RunnablePassthrough(),
}).assign(answer=rag_chain_from_docs)
'''
rag_chain_with_source的invoke将并行触发retriver、RunnablePassthrough的invoke,
然后将得到的字典结果发送到rag_chain_from_docs流水线,再将该字典发送到task_from_docs函数,并将
输出作为字典中的"context"键的值
'''
res = rag_chain_with_source.invoke("How does RAG compare with fine-tuning")
print(f"context of res {res['context']}")
print(f"question of res {res['question']}")#res将用作调用rag_chain_from_docs的参数,并将其传递给task_from_docs调用,
#task_from_docs将提取字典中context的值并传递给format_docsrag_chain = rag_chain_with_source | prompt | llm | StrOutputParser()
response = rag_chain.invoke("How does RAG compare with fine-tuning")
print(f"final rag response: {response}")
上面代码运行后所得结果如下:
Retrieval-Augmented Generation (RAG) and fine-tuning are two different approaches to enhancing the capabilities of Large Language Models (LLMs). RAG combines the strengths of LLMs with internal data, allowing organizations to access and utilize their own data effectively. It enhances the accuracy and relevance of responses by fetching specific information from databases in real time, thus expanding the model's knowledge beyond its initial training data. RAG is particularly useful for organizations that need to leverage their proprietary data or the latest information that was not included in the model's training set.On the other hand, fine-tuning involves adjusting the weights and biases of a pre-trained model based on new training data. This process permanently alters the model's behavior and is suitable for teaching the model specialized tasks or adapting it to specific domains. However, fine-tuning can be complex and costly, especially as data sources evolve, and it may lead to issues like overfitting.In summary, RAG is more about integrating real-time data retrieval to enhance model responses, while fine-tuning is about permanently modifying the model based on new data. Each approach has its advantages and limitations, and the choice between them depends on the specific needs and resources of the organization.
``基于上述知识,我们可以轻松理解代码。rag_chain_with_source的invoke将生成一个带有context和question键的字典,context的值为retriver.invoke("How does RAG compare with fine-tuning")的结果,question的值为RunnablePassthrough().invoke("How does RAG compare with fine-tuning")的结果,即原始字符串本身。然后带有context和question的字典作为task_from_docs的输入。在task_from_docs中,取出context键的值传递给format_docs,task_from_docs的输出将发送给prompt函数。prompt的输出将生成一个带有问题和上下文的提示,提示发送到ChatGPT以获得最终返回结果。