目录
概述
预备知识
定义自定义文档构建器
下载数据集
解析和迭代数据集
将数据集写入 JSONL 格式
使用文档构建器加载数据集
使用现有工具统一 Unicode 格式
设计自定义数据集过滤器
编辑所有个人识别信息
添加指令提示
整合管线
概述
出于演示目的,本文重点介绍一个涉及电子邮件分类的玩具示例。目标是整理一个基于文本的小型数据集,其中每个记录都包含电子邮件(主题和正文)以及该电子邮件的预定义分类标签。
为此,我们使用了 Enron 电子邮件数据集,将每封电子邮件标记为八个类别之一。此数据集可在 Hugging Face 上公开获取,并包含约 1400 条记录。
数据管护流程涉及以下高级步骤:
- 定义下载器、迭代器和提取器类,将数据集转换为 JSONL 格式。
- 使用现有工具统一 Unicode 表示。
- 定义自定义数据集过滤器,以删除空白或过长的电子邮件。
- 编辑数据集中的所有个人识别信息 (PII)。
- 为每条记录添加指令提示。
- 整合整个管线。
在消费级硬件上执行此策展制作流程需要不到 5 分钟的时间。要访问本教程的完整代码,请参阅 NVIDIA/NeMo-Curator GitHub 资源库。
预备知识
开始之前,您必须安装 NeMo Curator 框架。按照 NeMo Curator GitHub README 文件中的说明安装该框架。
接下来,运行以下命令以验证安装并安装任何其他依赖项:
$ python -c "import nemo_curator; print(nemo_curator);" $ pip3 install requests
定义自定义文档构建器
整理数据集的第一步是实现文档构建器,以便下载并迭代数据集。
下载数据集
实现DocumentDownloader
类获取数据集的 URL,并使用requests
库。
import requests from nemo_curator.download.doc_builder import DocumentDownloaderclass EmailsDownloader(DocumentDownloader):def __init__(self, download_dir: str):super().__init__()if not os.path.isdir(download_dir):os.makedirs(download_dir)self._download_dir = download_dirprint("Download directory: ", self._download_dir)def download(self, url: str) -> str:filename = os.path.basename(url)output_file = os.path.join(self._download_dir, filename)if os.path.exists(output_file):print(f"File '{output_file}' already exists, skipping download.")return output_fileprint(f"Downloading Enron emails dataset from '{url}'...")response = requests.get(url)with open(output_file, "wb") as file:file.write(response.content)return output_file
下载的数据集是一个文本文件,每个条目大致遵循以下格式:
“<s>[system instruction prompts]Subject:: [email subject] Body:: [email body][category label] <s>”
您可以使用正则表达式轻松地将这种格式分解为其组成部分。要记住的关键是,条目由“<s> … <s>”
并且始终以指令提示开始。此外,示例分隔符令牌和系统提示令牌与 Llama 2 标记器系列兼容。
由于您可能会将这些数据与不支持特殊令牌的其他分词器或模型一起使用,因此最好在解析期间丢弃这些指令和令牌。在本文的稍后部分中,我们将展示如何使用 NeMo Curator 将指令提示或特殊令牌添加到每个条目中DocumentModifier
实用程序。
解析和迭代数据集
实现DocumentIterator
和DocumentExtractor
用于提取电子邮件主题、正文和类别 (类) 标签的类:
from nemo_curator.download.doc_builder import (DocumentExtractor,DocumentIterator, )class EmailsIterator(DocumentIterator):def __init__(self):super().__init__()self._counter = -1self._extractor = EmailsExtractor()# The regular expression pattern to extract each email.self._pattern = re.compile(r"\"<s>.*?<s>\"", re.DOTALL)def iterate(self, file_path):self._counter = -1file_name = os.path.basename(file_path)with open(file_path, "r", encoding="utf-8") as file:lines = file.readlines()# Ignore the first line which contains the header.file_content = "".join(lines[1:])# Find all the emails in the file.it = self._pattern.finditer(file_content)for email in it:self._counter += 1content = email.group().strip('"').strip()meta = {"filename": file_name,"id": f"email-{self._counter}",}extracted_content = self._extractor.extract(content)# Skip if no content extractedif not extracted_content:continuerecord = {**meta, **extracted_content}yield recordclass EmailsExtractor(DocumentExtractor):def __init__(self):super().__init__()# The regular expression pattern to extract subject/body/label into groups.self._pattern = re.compile(r"Subject:: (.*?)\nBody:: (.*?)\n.*\[/INST\] (.*?) <s>", re.DOTALL)def extract(self, content: str) -> Dict[str, str]:matches = self._pattern.findall(content)if not matches:return Nonematches = matches[0]return {"subject": matches[0].strip(),"body": matches[1].strip(),"category": matches[2].strip(),}
迭代器使用正则表达式,\"<s>.*?<s>\"
然后,它将字符串传递给提取器,提取器使用正则表达式"Subject:: (.*?)\nBody:: (.*?)\n.*\[/INST\] (.*?) <s>"
此表达式使用分组运算符(.*?)
提取主题、正文和类别。
这些提取的部分以及有用的元数据(例如每封电子邮件的唯一 ID)存储在字典中,并返回给调用者。
现在,您可以将此数据集转换为 JSONL 格式,这是 NeMo Curator 支持的多种格式之一
将数据集写入 JSONL 格式
数据集以纯文本文件的形式下载。DocumentIterator
和DocumentExtractor
用于迭代记录的类,将其转换为 JSONL 格式,并将每条记录作为一行存储在文件中。
import jsondef download_and_convert_to_jsonl() -> str:"""Downloads the emails dataset and converts it to JSONL format.Returns:str: The path to the JSONL file."""# Download the dataset in raw format and convert it to JSONL.downloader = EmailsDownloader(DATA_DIR)output_path = os.path.join(DATA_DIR, "emails.jsonl")raw_fp = downloader.download(DATASET_URL)iterator = EmailsIterator()# Parse the raw data and write it to a JSONL file.with open(output_path, "w") as f:for record in iterator.iterate(raw_fp):json_record = json.dumps(record, ensure_ascii=False)f.write(json_record + "\n")return output_path
数据集中每条记录的信息都写入多个 JSON 字段:
subject
body
category
- Metadata:
id
filename
这一点很有必要,因为 NeMo Curator 中的许多数据管护操作必须知道要在每个记录中操作哪个字段。这一结构允许 NeMo Curator 操作轻松地定位不同的数据集信息。
使用文档构建器加载数据集
在 NeMo Curator 中,数据集表示为类型对象DocumentDataset
.这提供了从磁盘加载各种格式的数据集的辅助工具。使用以下代码加载数据集并开始使用:
from nemo_curator.datasets import DocumentDataset # define `filepath` to be the path to the JSONL file created above. dataset = DocumentDataset.read_json(filepath, add_filename=True)
您现在拥有了定义自定义数据集策管线和准备数据所需的一切。
使用现有工具统一 Unicode 格式
通常最好修复数据集中的所有 Unicode 问题,因为从在线来源抓取的文本可能包含不一致或 Unicode 错误。
为了修改文档,NeMo Curator 提供了一个DocumentModifier
界面以及Modify
辅助程序,用于定义如何修改每个文档中的给定文本。有关实现您自己的自定义文档修改器的更多信息,请参阅文本清理和统一在上一篇文章中看到的部分内容。
在本示例中,应用UnicodeReformatter
到数据集。由于每条记录都有多个字段,因此请对数据集中的每个相关字段应用一次操作。这些操作可以通过Sequential
类:
Sequential([Modify(UnicodeReformatter(), text_field="subject"),Modify(UnicodeReformatter(), text_field="body"),Modify(UnicodeReformatter(), text_field="category"), ])
设计自定义数据集过滤器
在许多 PEFT 用例中,优化数据集涉及过滤掉可能无关紧要或质量较低的记录,或者那些具有特定不合适属性的记录。在电子邮件数据集中,有些电子邮件过长或为空。出于演示目的,通过实现自定义,从数据集中删除所有此类记录DocumentFilter
类:
from nemo_curator.filters import DocumentFilterclass FilterEmailsWithLongBody(DocumentFilter):"""If the email is too long, discard."""def __init__(self, max_length: int = 5000):super().__init__()self.max_length = max_lengthdef score_document(self, text: str) -> bool:return len(text) <= self.max_lengthdef keep_document(self, score) -> bool:return scoreclass FilterEmptyEmails(DocumentFilter):"""Detects empty emails (either empty body, or labeled as empty)."""def score_document(self, text: str) -> bool:return (not isinstance(text, str) # The text is not a stringor len(text.strip()) == 0 # The text is emptyor "Empty message" in text # The email is labeled as empty)def keep_document(self, score) -> bool:return score
我们FilterEmailsWithLongBody
class 会计算所提供文本中的字符数,并返回True
如果长度是可以接受的,或False
否则。您必须在body
每个记录的字段。
我们FilterEmptyEmails
类检查给定文本的类型和内容,以确定其是否为空电子邮件,并返回True
如果电子邮件被视为空白,或者False
否则。您必须在所有相关字段中明确应用此过滤器:subject
, body
以及category
每条记录的字段。
返回值与类的命名一致,可提高代码的可读性。但是,由于目标是丢弃空电子邮件,因此必须反转此过滤器的结果。换言之,如果过滤器返回,则丢弃记录True
并在过滤器返回时保留记录False
.这可以通过提供相关标志来完成ScoreFilter
辅助程序:
Sequential([# Apply only to the `body` field.ScoreFilter(FilterEmailsWithLongBody(), text_field="body", score_type=bool),# Apply to all fields, also invert the action.ScoreFilter(FilterEmptyEmails(), text_field="subject", score_type=bool, invert=True),ScoreFilter(FilterEmptyEmails(), text_field="body", score_type=bool, invert=True),ScoreFilter(FilterEmptyEmails(), text_field="category", score_type=bool, invert=True), ])
指定标志invert=True
来指示ScoreFilter
丢弃过滤器返回的文档True
.通过指定 score_type=bool
为每个过滤器明确指定返回类型,以避免在执行期间进行类型推理。
编辑所有个人识别信息
接下来,定义处理步骤,以编辑每个记录主题和正文中的所有个人识别信息 (PII)。此数据集包含许多 PII 实例,例如电子邮件、电话或传真号码、姓名和地址。
借助 NeMo Curator,您可以轻松指定要检测的个人身份信息(PII)类型以及对每次检测采取的操作。使用特殊令牌替换每个检测:
def redact_pii(dataset: DocumentDataset, text_field) -> DocumentDataset:redactor = Modify(PiiModifier(supported_entities=["ADDRESS","EMAIL_ADDRESS","LOCATION","PERSON","URL","PHONE_NUMBER",],anonymize_action="replace",device="cpu",),text_field=text_field,)return redactor(dataset)
您可以将这些运算应用到subject
和body
使用 Pythonfunctools.partial
辅助程序:
from functools import partialredact_pii_subject = partial(redact_pii, text_field="subject") redact_pii_body = partial(redact_pii, text_field="body")Sequential([redact_pii_subject,redact_pii_body,] )
添加指令提示
数据管护流程的最后一步是向每条记录添加指令提示,并确保每个类别的值都以句点终止。通过实现相关的DocumentModifier
类:
from nemo_curator.modifiers import DocumentModifierclass AddSystemPrompt(DocumentModifier):def modify_document(self, text: str) -> str:return SYS_PROMPT_TEMPLATE % textclass AddPeriod(DocumentModifier):def modify_document(self, text: str) -> str:return text + "."
在代码示例中,SYS_PROMPT_TEMPLATE
变量包含一个格式字符串,可用于在文本周围添加指令提示。这些修改器可以链接在一起:
Sequential([Modify(AddSystemPrompt(), text_field="body"),Modify(AddPeriod(), text_field="category"), ])
整合管线
在实现管线的每个步骤后,是时候将所有内容放在一起并按顺序对数据集应用每个操作了。您可以使用Sequential
将类到链式管理操作结合在一起:
curation_steps = Sequential([## Unify the text encoding to Unicode.#Modify(UnicodeReformatter(), text_field="subject"),Modify(UnicodeReformatter(), text_field="body"),Modify(UnicodeReformatter(), text_field="category"),## Filtering#ScoreFilter(FilterEmptyEmails(), text_field="subject", score_type=bool, invert=True),ScoreFilter(FilterEmptyEmails(), text_field="body", score_type=bool, invert=True),ScoreFilter(FilterEmptyEmails(), text_field="category", score_type=bool, invert=True),ScoreFilter(FilterEmailsWithLongBody(), text_field="body", score_type=bool),## Redact personally identifiable information (PII).#redact_pii_subject,redact_pii_body,## Final modifications.#Modify(AddSystemPrompt(), text_field="body"),Modify(AddPeriod(), text_field="category"),] )dataset = curation_steps(dataset) dataset = dataset.persist() dataset.to_json("/output/path", write_to_filename=True)
NeMo Curator 使用 Dask 以分布式方式处理数据集。由于 Dask 操作是延迟评估的,因此您必须调用.persist
用于指示 Dask 应用操作的函数。处理完成后,您可以通过调用.to_json
并提供输出路径。