当前位置: 首页 > news >正文

大模型笔记8 文本数据清洗

帮其他组做了下数据清洗, 分享一些相关经验. (文中数据相关部分经过模糊/改写处理)

目录

格式改写

Jsonl格式

统计词条数目与长度分布

重复性检测

Topic去重

Content元素去重

句子去重

n-gram去重

去除content中空字符串

低质内容检测

多语言检测

content统计时先拼接

多换行标题检测

读取label图形展示

分层抽样

1. 转换成dataframe格式

2.分层抽样

content文字长度

中文比例

多语言分词

Magpie


格式改写

读取文件中数据, 并将其处理为目标结构.

每个topic的字符串末尾包含: 百度百科\n

据此区分topic与content.

处理结果目标格式:

{"topic": 'xx', "content": ["", ""]}

原始数据样例:

{"text": "A_百度百科\n"}

{"text": "A…今后若有类似问题或者升级补丁会提前在官网告知用户。\n"}

{"text": "… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金\n"}

{"text": "B_百度百科\n"}

{"text": "B…今后若有类似问题或者升级补丁会提前在官网告知用户。\n"}

{"text": "B… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金\n"}

处理代码:

import json

with open('data/data.json', 'r', encoding='utf-8') as f:

    lines = [json.loads(line.strip())['text'].strip() for line in f if line.strip()]

results = []

current_topic = None

current_content = []

for line in lines:

    if line.endswith('_百度百科'):

        # 如果已经有前一个topic的内容,保存它

        if current_topic:

            results.append({

                "topic": current_topic.replace('_百度百科', ''),

                "content": current_content

            })

        # 新的 topic

        current_topic = line.replace('_百度百科', '')

        current_content = []

    else:

        current_content.append(line)

# 处理最后一个 topic

if current_topic:

    results.append({

        "topic": current_topic.replace('_百度百科', ''),

        "content": current_content

    })

# 保存到文件

# print(json.dumps(results, ensure_ascii=False, indent=2))

with open('data/processed_data.json', 'w', encoding='utf-8') as f:

    json.dump(results, f, ensure_ascii=False, indent=2)

其中strip 去除头尾字符、空白符(包括\n、\r、\t、' ',即:换行、回车、制表符、空格)因此判断_百度百科的时候不需要再次考虑\n

原始数据是逐行json, 而不是标准的json文件格式, 因此不能直接通过json.load(f)读取文件

改写后样例:

[

  {

    "topic": "A",

    "content": [

      "A…今后若有类似问题或者升级补丁会提前在官网告知用户。",

      "A… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金"

    ]

  },

  {

    "topic": "A",

    "content": [

      "A…今后若有类似问题或者升级补丁会提前在官网告知用户。",

      "A… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金"

    ]

  }

]

Jsonl格式

JSONL 文件格式在处理大量数据、写入文本文件或日志文件以及实时处理数据流等场景中具有优势,因为它可以逐行读取和处理数据,而不需要一次性加载整个数据集合。

直接写入代码:

# 写入 JSONL 文件

with open('processed.jsonl', 'w', encoding='utf-8') as f:

    for entry in data:

        f.write(json.dumps(entry, ensure_ascii=False) + '\n')

调用jsonlines库, 每个条目添加一个uuid:

import json

import jsonlines

import uuid

read_file_path='data/quality_check.json'

save_repeat_path='data/jsonlines.jsonl'

# 读取 JSON 格式文件(上次保存的)

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 写入为 JSON Lines 格式

with jsonlines.open(save_repeat_path, mode='w') as writer:

    for item in data:

        writer.write(item)

Jsonl文件内容的读取:

data = []

with jsonlines.open(save_repeat_path, mode='r') as reader:

    for obj in reader:

        data.append(obj)

print(data[0])

统计词条数目与长度分布

  1. 统计词条数

统计刚刚保存的文件中topic的数量

import json

read_file_path='data/processed_data.json'

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 统计 topic 数量(每个 topic 对应一个对象)

topic_count = len(data)

print(f"共包含 {topic_count} 个 topic")

2.content长度分布

在处理完的文件统计每个 content 的总字符串长度

from collections import Counter

# 统计每个 content 的总字符串长度

lengths = [sum(len(item) for item in entry["content"]) for entry in results]

# 打印一些基本统计信息

print(f"样本总数: {len(lengths)}")

print(f"最小长度: {min(lengths)}")

print(f"最大长度: {max(lengths)}")

print(f"平均长度: {sum(lengths) / len(lengths):.2f}")

# 获取长度分布(以10为一个分组区间,例如 0-9, 10-19,...)

bin_size = 10

bins = [((l // bin_size) * bin_size) for l in lengths]

length_distribution = Counter(bins)

# 打印分布(按长度区间升序排序)

for length_bin in sorted(length_distribution.keys()):

    print(f"{length_bin:>3} - {length_bin + bin_size - 1:>3} : {length_distribution[length_bin]}")

输出样例:

共包含 2 个 topic

样本总数: 2    

最小长度: 163  

最大长度: 163  

平均长度: 163.00

160 - 169 : 2

画图:

import matplotlib.pyplot as plt

plt.hist(lengths, bins=range(0, max(lengths)+10, 10), edgecolor='black')

plt.title("Content 长度分布")

plt.xlabel("总长度")

plt.ylabel("频数")

plt.grid(True)

plt.show()

结果样例:

保存图片:

plt.savefig('length_distribution.png')

将统计信息保存到文件中:

with open(save_stat_path, "w", encoding="utf-8") as f:

    f.write(f"topic总数: {len(lengths)}\n")

    f.write(f"最小长度: {min(lengths)}\n")

    f.write(f"最大长度: {max(lengths)}\n")

    f.write(f"平均长度: {sum(lengths) / len(lengths):.2f}\n")

    f.write("\n长度分布:\n")

    for length_bin in sorted(length_distribution.keys()):

        f.write(f"{length_bin:>3} - {length_bin + bin_size - 1:>3} : {length_distribution[length_bin]}\n")

重复性检测

Topic去重

读取刚刚保存的文件. 去除重复topic及其对应content.

  先判断 topic 是否重复,使用一个 dict 来记录;

  如果 topic 相同,检查其 content 列表中的文本是否重复,合并并去重;

import json

# import re

read_file_path='data/processed_data.json'

save_repeat_path='data/topic_deduplicate.json'

# 读取保存的文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 用 dict 去重 topic

unique_topics = {}

for entry in data:

    topic = entry['topic']

    contents = entry['content']

   

    if topic in unique_topics:

        # 合并内容并去重

        existing_contents = unique_topics[topic]

        combined_contents = list(set(existing_contents + contents))

        unique_topics[topic] = combined_contents

    else:

        unique_topics[topic] = contents

# 重新组织为原始格式

results = [{"topic": topic, "content": contents} for topic, contents in unique_topics.items()]

# 保存去重后的结果

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(results, f, ensure_ascii=False, indent=2)

print("去重完成,共保留 topic 数量:", len(results))

Content元素去重

读取上次保存的文件, 检查同一个topic中的所有字符串元素, 如果content中一个字符串元素被另一个字符串元素包含(是其“子串”), 则去除这个被包含的元素.

遍历每个 topic 的 content 列表;

两两比较其中的字符串(例如 s1 in s2),如果 s1 是 s2 的子串且 s1 != s2,就标记 s1 为要删除的;

删除所有被包含的字符串,保留信息量更多的那个。

import json

# import re

read_file_path='data/topic_deduplicate.json'

save_repeat_path='data/content_deduplicate.json'

# 读取 deduplicated 文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

def remove_contained_contents(contents):

    unique_contents = []

    for i, s1 in enumerate(contents):

        contained = False

        for j, s2 in enumerate(contents):

            if i != j and s1 in s2:

                contained = True

                break

        if not contained:

            unique_contents.append(s1)

        else:#找到需要去除的字符串打印其前10个字符

            print("重复:",s1[0:10])

    return unique_contents

# 对每个 topic 的 content 进行处理

for entry in data:

    entry['content'] = remove_contained_contents(entry['content'])

# 保存最终结果

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(data, f, ensure_ascii=False, indent=2)

print("content中重复字符串元素去除完成")

输出结果例子:

清洗前

  {

    "topic": "A",

    "content": [

      "A",

      "A\n… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金",

      "B。\n…今后若有类似问题或者升级补丁会提前在官网告知用户。"

    ]

  }

清洗后

  {

    "topic": "A",

    "content": [

      "A\n… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金",

      "B。\n…今后若有类似问题或者升级补丁会提前在官网告知用户。"

    ]

  }

句子去重

读取上次保存的文件, 对同一条 content 字符串元素内的句子进行去重

import json

import re

read_file_path='data/processed_data.json'

save_repeat_path='data/repeat_stats.json'

def deduplicate_sentences(text):

    # 按标点切句(保留标点)

    sentences = re.split(r'(?<=[。!?\?])\s*', text.strip())

    seen = set()

    deduped = []

    for s in sentences:

        s = s.strip()

        if s and s not in seen:

            seen.add(s)

            deduped.append(s)

    return ''.join(deduped)

# 读取已保存的文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 去重处理每条 content 中的句子

for item in data:

    item['content'] = [deduplicate_sentences(text) for text in item['content']]

# 保存新文件

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(data, f, ensure_ascii=False, indent=2)

print("句子去重完成")

其中句子分割的re.split(r'(?<=[。!?\?])\s*', text.strip()):通过正向肯定前瞻保留标点符号

表达式部分

说明

(?<=...)

正向肯定前瞻:表示“前面必须是……”,但不会包含在结果中分割符中被删除。

[。!?\?]

匹配一个中文句号(。)、感叹号(!)、问号(?)或英文问号(?)。你也可以加上英文句号 \.。

(?<=[。!?\?])

匹配“这些标点符号的后面位置”,作为切割点。不会丢失这些标点本身。

\s*

匹配标点符号后面可能存在的空格、换行、Tab 等所有空白符,全部删掉。

读取上次保存的文件, 对同一个topic内所有content字符串元素之间的句子进行去重. 保留该句子在其第一次出现的字符串元素中.

跨content句子去重:

import json

import re

def split_sentences(text):

    # 中文及英文常见句子结束符,分割时保留标点

    sentences = re.split(r'(?<=[。!?\?])\s*', text.strip())

    return [s.strip() for s in sentences if s.strip()]

# 读取上次保存的文件

with open('processed_deduplicated.json', 'r', encoding='utf-8') as f:

    data = json.load(f)

# 跨content去重句子

for item in data:

    seen_sentences = set()

    new_content = []

    for content_text in item['content']:

        sentences = split_sentences(content_text)

        deduped_sentences = []

        for s in sentences:

            if s not in seen_sentences:

                seen_sentences.add(s)

                deduped_sentences.append(s)

        # 合并回文本

        new_content.append(''.join(deduped_sentences))

    item['content'] = new_content

# 保存处理后的文件

with open('processed_cross_deduplicated.json', 'w', encoding='utf-8') as f:

    json.dump(data, f, ensure_ascii=False, indent=2)

print("跨content句子去重完成")

句子重复判定方式改成ngram相似度判断

import json

import re

def split_sentences(text):

    sentences = re.split(r'(?<=[。!?\?])\s*', text.strip())

    return [s.strip() for s in sentences if s.strip()]

def ngrams(sentence, n=3):

    return set([sentence[i:i+n] for i in range(len(sentence) - n + 1)])

def jaccard_sim(s1, s2, n=3):

    ngrams1 = ngrams(s1, n)

    ngrams2 = ngrams(s2, n)

    if not ngrams1 or not ngrams2:

        return 0.0

    return len(ngrams1 & ngrams2) / len(ngrams1 | ngrams2)

def is_duplicate(s, seen_sentences, threshold=0.8):

    for prev in seen_sentences:

        if jaccard_sim(s, prev) >= threshold:

            return True

    return False

# 读取文件

with open('processed_deduplicated.json', 'r', encoding='utf-8') as f:

    data = json.load(f)

# 对每个 topic 进行去重

for item in data:

    seen_sentences = []

    new_content = []

    for content_text in item['content']:

        sentences = split_sentences(content_text)

        deduped_sentences = []

        for s in sentences:

            if not is_duplicate(s, seen_sentences):

                seen_sentences.append(s)

                deduped_sentences.append(s)

        new_content.append(''.join(deduped_sentences))

    item['content'] = new_content

# 保存结果

with open('processed_ngram_deduplicated.json', 'w', encoding='utf-8') as f:

    json.dump(data, f, ensure_ascii=False, indent=2)

print("n-gram 相似度去重完成")

用现成的 Python 库 difflib来实现这个代码中句子相似度的判定部分

import difflib

def is_duplicate(s, seen_sentences, threshold=0.85):  # 你可以微调这个阈值

    for prev in seen_sentences:

        similarity = difflib.SequenceMatcher(None, s, prev).ratio()

        if similarity >= threshold:

            return True

    return False

正则划分句子时候收到警告:

FutureWarning: split() requires a non-empty pattern match.

  return _compile(pattern, flags).split(string, maxsplit)

这个警告的根源是来自这句代码:

re.split(r'(?<=[。!?\?])\s*', text.strip())

当传入的 text.strip() 是 空字符串 "" 时,re.split() 会尝试匹配一个空字符串,然后抛出 FutureWarning,因为这样可能在将来的 Python 版本中会变成错误行为。

n-gram去重

对每个 content 内的句子进行 n-gram 重复性检测。检测思路如下:

  1. 切句子: 根据标点符号(比如 “。!?”)将每个 content 文本拆分为句子。
  2. 生成 n-gram: 对每个句子生成指定长度(例如3)的 n-gram 列表。
  3. 计算相似度: 采用 Jaccard 相似度来比较两个句子的 n-gram 重合程度,若超过设定阈值则认为它们重复。

参考:

利用N-Gram模型进行数据清洗的相关示例

利用N-Gram模型进行数据清洗_数据清洗ngram-CSDN博客

import json

import re

read_file_path='data/processed_data.json'

save_repeat_path='data/repeat_stats.json'

def get_ngrams(text, n=3):

    """

    根据给定的 n 值生成文本的 n-gram 列表

    """

    return [text[i:i+n] for i in range(len(text)-n+1)] if len(text) >= n else [text]

def jaccard_similarity(ngrams1, ngrams2):

    """

    计算两个 n-gram 集合的 Jaccard 相似度

    """

    set1 = set(ngrams1)

    set2 = set(ngrams2)

    union = set1.union(set2)

    if not union:

        return 0.0

    intersection = set1.intersection(set2)

    return len(intersection) / len(union)

def detect_duplicate_sentences(content_text, n=3, sim_threshold=0.8):

    """

    对给定的文本按照句子划分,利用 n-gram 方法检测重复句子。

    返回:

      - sentences:分割后的句子列表

      - duplicates:重复句子对的索引及相似度列表

    """

    # 简单根据中文句号、问号、感叹号切分句子

    sentences = re.split(r'[。!?]', content_text)

    # 去掉空白以及冗余的空格

    sentences = [s.strip() for s in sentences if s.strip()]

   

    duplicates = []

    # 分别计算每个句子的 n-gram

    ngrams_list = [get_ngrams(s, n) for s in sentences]

    num = len(sentences)

   

    # 两两比较句子

    for i in range(num):

        for j in range(i+1, num):

            sim = jaccard_similarity(ngrams_list[i], ngrams_list[j])

            if sim >= sim_threshold:

                duplicates.append({

                    "sentence_index_pair": (i, j),

                    "sentence_pair": (sentences[i], sentences[j]),

                    "similarity": sim

                })

    return sentences, duplicates

#文件读取

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 针对每个 topic 下的每个 content 做 n-gram 句子级重复性检测

# 检测参数:n=3, 阈值sim_threshold=0.8(可根据需要调整)

detailed_results = []

for topic_item in data:

    topic_title = topic_item["topic"]

    # processed_contents = []

    # 拼接同话题下content

    all_content=""

    for content in topic_item["content"]:

        all_content+=content

        # processed_contents.append({

        #     "original_content": content,

        #     "sentences": sentences,

        #     "duplicate_sentence_info": duplicates

        # })

    # 针对同话题下所有内容调用检测函数,对内容进行句子分割和重复性检测

    sentences, duplicates = detect_duplicate_sentences(all_content, n=3, sim_threshold=0.8)

   

   

    detailed_results.append({

        "topic": topic_title,

        "content": topic_item["content"],

        "sentences": sentences,

        "duplicate_sentence_info": duplicates

    })

#文件保存

# # 输出检测结果,格式化为 JSON

# print(json.dumps(detailed_results, ensure_ascii=False, indent=2))

# 将处理结果写入文件

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(detailed_results, f, ensure_ascii=False, indent=2)

实际运行时候调整参数:

检测粒度参数n

相似度阈值sim_threshold

去除content中空字符串

读取之前保存的文件, 去除content中为空的字符串元素

import json

read_file_path='data/sentence_deduplicate2.json'

save_repeat_path='data/remove_empty.json'

# 读取之前处理后的文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 去除 content 中的空字符串

for topic in data:

    topic['content'] = [content for content in topic['content'] if content.strip()]

# 保存清洗结果

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(data, f, ensure_ascii=False, indent=2)

print("已清除 content 中的空字符串")

低质内容检测

多语言检测

1.读取上次保存的文件, 检测content中的低质量字符串元素, 标记其低质类别标签, 及其在content列表中的元素序号.

当中文字符比例小于10%时候标记为多语言

结果保存格式例子:

[

  {

    "topic": "sniffer",

    "content": [

      "…elseprintf(“protocolid:%d\\n”,iphead[9])…"

    ],

    "low_quality": [

      {

        "multilingual":0

      }

    ]

  }

]

检测代码:

import json

import re

# import re

read_file_path='data/topic_deduplicate.json'

save_repeat_path='data/quality_check.json'

# 中文字符判断函数

def chinese_ratio(text):

    chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)

    return len(chinese_chars) / len(text) if text else 0

# 读取处理后的 JSON 文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

results = []

for item in data:

    topic = item['topic']

    content = item['content']

    low_quality = []

    for idx, text in enumerate(content):

        ratio = chinese_ratio(text)

        if ratio < 0.1:

            low_quality.append({'multilingual': idx})

    result = {

        "topic": topic,

        "content": content

    }

    # if low_quality:

    #     result["low_quality"] = low_quality

    result["low_quality"] = low_quality

    results.append(result)

# 保存结果到文件

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(results, f, ensure_ascii=False, indent=2)

print("低质量内容分析完成")

运行结果:

[

  {

    "topic": "iPhone闹钟门",

    "content": [

      "A… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金",

      "A…今后若有类似问题或者升级补丁会提前在官网告知用户。"

    ],

    "low_quality": []

  },

  {

    "topic": "sniffer",

    "content": [

      "…elseprintf(“protocolid:%d\\n”,iphead[9])…"

    ],

    "low_quality": [

      {

        "multilingual": 0

      }

    ]

  }

]

content统计时先拼接

import json

import re

import jsonlines

read_file_path='data/jsonlines.jsonl'

save_repeat_path='data/quality_check.jsonl'

# 中文字符判断函数

def count_chinese_ratio(text):

    chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)

    return len(chinese_chars) / len(text) if text else 0

# 未换行标题检测函数

def count_unstructured_titles(text):

    count = 0

    patterns = [

        r'\[[1-9]\d*\]',                          # [1], [2], etc.

        r'第[一二三四五六七八九十百千万零1234567890]+章',     # 第一章, 第二章

        r'\d+(?:\.\d+)+',                          # 1.1.1, 1.1.2

        r'([一二三四五六七八九十]+)',            # (一), (二)

        r'(\d+)',                                # (1), (2)

        r'([A-Z])',                              # (A), (B)

        r'([a-z])',                              # (a), (b)

        r'([ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+)',                    # (Ⅰ), (Ⅱ)

        r'([ⅰⅱⅲⅳⅴⅵⅶⅷⅸⅹ]+)',                    # (i), (ii)

        r'[一二三四五六七八九十]+、',              # 一、, 二、

        r'\d+、',                                 # 1、, 2、

        r'\d+\.',                                 # 1., 2.

        r'[A-Z]\.',                               # A., B.

        r'[a-z]\)',                               # a), b)

        r'[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩ]+\)',                     # Ⅰ), Ⅱ)

        r'[ⅰⅱⅲⅳⅴⅵⅶⅷⅸⅹ]+\)'                      # i), ii)

    ]

    # 组合正则表达式(使用多行模式 ^ 匹配行首)

    combined_pattern = re.compile(

        r'|'.join(patterns),

        flags=re.MULTILINE  # 关键:让 ^ 匹配每行的开头

    )

   

    # 统计匹配次数

#     matches = combined_pattern.findall(text)

    matches = list(re.finditer(combined_pattern, text))

    for m in matches:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1  

    return count

# 判断匹配内容前后是否有换行

def around_has_newline(text, start, end, window=10):

    before = text[max(0, start - 5):start]

    after = text[end:end + window]

    return '\n' in before or '\n' in after

# 读取上一步处理后的 JSON 文件

data = []

with jsonlines.open(read_file_path, mode='r') as reader:

    for obj in reader:

        data.append(obj)

# 统计

results = []

for item in data:

    topic = item['topic']

    contents = item['content']

    combined_content = "".join(item['content'])

    # print("content:",combined_content)

    # low_quality = []

    # 检测中文比例

    chinese_ratio = count_chinese_ratio(combined_content)

    # 检测未换行标题个数

    title_num=count_unstructured_titles(combined_content)

    result = {

        "topic": topic,

        "content": contents,

        'chinese_ratio': chinese_ratio,

        "title_num": title_num

    }

    results.append(result)

# 写入为 JSON Lines 格式

with jsonlines.open(save_repeat_path, mode='w') as writer:

    for item in results:

        writer.write(item)

输出结果:

{"topic": "A", "content": ["A\n… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金", "A\n…今后若有类似问题或者升级补丁会提前在官网告知用户。"], "chinese_ratio": 0.6363636363636364, "title_num": 2}

{"topic": "sniffer", "content": ["…elseprintf(“protocolid:%d\\n”,iphead[9])…"], "chinese_ratio": 0.0, "title_num": 1}

{"topic": "sniffer", "content": ["第1章…第1章…第1章…第1章…"], "chinese_ratio": 0.5, "title_num": 4}

多换行标题检测

增加一种低质标签检测:

针对未换行标题记录个数, 超过3个则记录标签为multiple_titles

未换行标题指出现以下序号的前后未出现换行符\n

[1] ,[2] ...,  第一章、第二章, 3.1.4

检测代码:

# 未换行标题检测函数

def count_unstructured_titles(text):

    count = 0

    # 模式1: [1], [2]...

    matches1 = list(re.finditer(r'\[\d+\]', text))

    for m in matches1:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    # 模式2: 第一章、第二章等

    matches2 = list(re.finditer(r'(第[一二三四五六七八九十百千万零〇]+章)', text))

    for m in matches2:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    # 模式3: 3.1 或 2.4.5 等小节编号

    matches3 = list(re.finditer(r'(?<!\d)(\d+\.\d+(?:\.\d+)*)(?!\d)', text))

    for m in matches3:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    return count

# 判断匹配内容前后是否有换行

def around_has_newline(text, start, end, window=10):

    before = text[max(0, start - 5):start]

    after = text[end:end + window]

    return '\n' in before or '\n' in after

完整代码:

import json

import re

# import re

read_file_path='data/topic_deduplicate.json'

save_repeat_path='data/quality_check.json'

# 中文字符判断函数

def chinese_ratio(text):

    chinese_chars = re.findall(r'[\u4e00-\u9fff]', text)

    return len(chinese_chars) / len(text) if text else 0

# 未换行标题检测函数

def count_unstructured_titles(text):

    count = 0

    # 模式1: [1], [2]...

    matches1 = list(re.finditer(r'\[\d+\]', text))

    for m in matches1:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    # 模式2: 第一章、第二章等

    matches2 = list(re.finditer(r'(第[一二三四五六七八九十百千万零〇]+章)', text))

    for m in matches2:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    # 模式3: 3.1 或 2.4.5 等小节编号

    matches3 = list(re.finditer(r'(?<!\d)(\d+\.\d+(?:\.\d+)*)(?!\d)', text))

    for m in matches3:

        if not around_has_newline(text, m.start(), m.end()):

            count += 1

    return count

# 判断匹配内容前后是否有换行

def around_has_newline(text, start, end, window=10):

    before = text[max(0, start - 5):start]

    after = text[end:end + window]

    return '\n' in before or '\n' in after

# 读取上一步处理后的 JSON 文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

results = []

for item in data:

    topic = item['topic']

    content = item['content']

    low_quality = []

    for idx, text in enumerate(content):

        # 检测中文比例

        ratio = chinese_ratio(text)

        if ratio < 0.1:

            low_quality.append({'multilingual': idx})

        # 检测未换行标题个数

        if count_unstructured_titles(text) > 5:

            low_quality.append({"multiple_titles": idx})

    result = {

        "topic": topic,

        "content": content

    }

    # if low_quality:

    #     result["low_quality"] = low_quality

    result["low_quality"] = low_quality

    results.append(result)

# 保存结果到文件

with open(save_repeat_path, 'w', encoding='utf-8') as f:

    json.dump(results, f, ensure_ascii=False, indent=2)

print("低质量内容分析完成")

运行结果:

  {

    "topic": "sniffer",

    "content": [

      "第一章…第一章…第一章…第一章…第一章…第一章…第一章…"

    ],

    "low_quality": [

      {

        "multiple_titles": 0

      }

    ]

  }

读取label图形展示

读取上次保存的文件, 展示文件中各种 low_quality 标签(如 multilingual、multiple_titles)出现的 频率比例,即:

每种标签出现的次数 / 所有 content

import json

import matplotlib.pyplot as plt

from collections import defaultdict

read_file_path='data/quality_check.json'

# 读取之前保存的标注文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

# 初始化计数器

label_counts = defaultdict(int)

total_contents = 0

# 统计所有内容字符串的数量 + 标签出现次数

for topic in data:

    total_contents += len(topic['content'])

    if 'low_quality' in topic:

        for tag in topic['low_quality']:

            for label in tag:

                label_counts[label] += 1

# 计算比例

labels = list(label_counts.keys())

ratios = [label_counts[label] / total_contents for label in labels]

# 绘图

plt.figure(figsize=(8, 5))

bars = plt.bar(labels, ratios, color='skyblue')

# 添加具体数值标注

for bar, ratio in zip(bars, ratios):

    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height(),

             f'{ratio:.2%}', ha='center', va='bottom', fontsize=10)

plt.title('Low-quality content ratio')

plt.ylabel('ratio')

plt.ylim(0, 1)

plt.grid(axis='y', linestyle='--', alpha=0.5)

plt.tight_layout()

plt.show()

示例运行结果

去重比例:

import json

import matplotlib.pyplot as plt

from collections import defaultdict

read_file_path='data/quality_check.json'

format_file_path='data/processed_data.json'

# 读取之前保存标注的文件

with open(read_file_path, 'r', encoding='utf-8') as f:

    data = json.load(f)

with open(format_file_path, 'r', encoding='utf-8') as f:

    format_data = json.load(f)

# 初始化计数器

label_counts = defaultdict(int)

total_contents = 0

total_contents_before_deduplicate = 0

# 统计所有内容字符串的数量 + 标签出现次数

for topic in data:

    total_contents += len(topic['content'])

    if 'low_quality' in topic:

        for tag in topic['low_quality']:

            for label in tag:

                label_counts[label] += 1

# 统计去重前内容字符串的数量 +

for topic in format_data:

    total_contents_before_deduplicate += len(topic['content'])

print("total_contents:",total_contents)

print("total_contents_before_deduplicate:",total_contents_before_deduplicate)

# 计算比例

labels = list(label_counts.keys())

ratios = [label_counts[label] / total_contents for label in labels]

# 去重比例

labels.append("Duplicate content ratio")

ratios.append(total_contents / total_contents_before_deduplicate)

# 绘图

plt.figure(figsize=(8, 5))

bars = plt.bar(labels, ratios, color='skyblue')

# 添加具体数值标注

for bar, ratio in zip(bars, ratios):

    plt.text(bar.get_x() + bar.get_width() / 2, bar.get_height(),

             f'{ratio:.2%}', ha='center', va='bottom', fontsize=10)

plt.title('Low-quality content ratio')

plt.ylabel('ratio')

plt.ylim(0, 1)

plt.grid(axis='y', linestyle='--', alpha=0.5)

plt.tight_layout()

plt.show()

分层抽样

针对不同比例的多语言问题和标题个数抽样看具体情况

目标是寻找不同处理方式(抛弃, 重写, 原样保留)的合适阈值

分几个频次, 分别筛取所有落在这个范围内的样本, 最后从这些样本中抽样显示

参考分层抽样的库和代码:

https://zhuanlan.zhihu.com/p/384322508

from sklearn.model_selection import train_test_split

stratified_sample, _ = train_test_split(population, test_size=0.9, stratify=population[['label']])

print (stratified_sample)

但是这些库是基于结构化数据的, 有两个思路, 一个是基于非结构化数据找库或者自己写, 一个是把非结构化数据转换为结构化的.

Dict数据转pandas参考:

Stratified Sampling in Pandas | GeeksforGeeks

不成比例的采样: 使用 pandas groupby,根据学生的成绩(即 A、B、C)将学生分成几组,并使用样本函数从每个年级组中随机抽样 2 名学生

df.groupby('Grade', group_keys=False).apply(lambda x: x.sample(2))

每层个数统计(可以从test文件看)

之前重新保存的低质量统计结果格式样例为:

{"topic": "A", "content": ["A\n… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金", "B\n…今后若有类似问题或者升级补丁会提前在官网告知用户。"], "chinese_ratio": 0.6363636363636364, "title_num": 2}

{"topic": "sniffer", "content": ["…elseprintf(“protocolid:%d\\n”,iphead[9])…"], "chinese_ratio": 0.0, "title_num": 1}

{"topic": "sniffer", "content": ["第1章…第1章…第1章…第1章…"], "chinese_ratio": 0.5, "title_num": 4}

读取之前保存的文件, 并根据chinese_ratio进行分层采样.

文件格式:

{"topic": "A", "content": ["A。\n… 参考资料 1. iPhone陷闹钟失声门 苹果称不能保证未来没问题 2. 苹果吸金有术 用户体验点石成金", "2011年1月1日,炙手可热的iPhone遭遇“闹钟门”,导致很多上班族迟到。\n…今后若有类似问题或者升级补丁会提前在官网告知用户。"], "chinese_ratio": 0.6363636363636364, "title_num": 2}

{"topic": "sniffer", "content": ["…elseprintf(“protocolid:%d\\n”,iphead[9])…"], "chinese_ratio": 0.0, "title_num": 1}

{"topic": "sniffer", "content": ["第1章…第1章…第1章…第1章…"], "chinese_ratio": 0.5, "title_num": 4}

改结构化可以先遍历所有json对象, 用list把它们分别存起来.

参考test:

total_num = 1

uniq_set = []

rule_res = []

seq_len = []

chinese_ratio = []

possible_igline_num = []

for ljson in data[:2000]:

    topic = ljson['topic']

   

    total_num += 1

   

    if topic not in uniq_set:

        uniq_set.append(topic)

        text = '\n'.join(ljson['content'])

        seq_len.append(len(text))

   

        crat = filter_text_based_on_chinese_ratio(text)

        num = count_qing_lines(text)

        chinese_ratio.append(crat)

        possible_igline_num.append(num)

    else:

        print("重复topic:",topic)

手动分层中文比例采样:

import jsonlines

import numpy as np

import random

# 参数设置

filename = 'processed_with_ratio.jsonl'   # 你的 jsonl 文件名

n_bins = 5                                # 分成几层(bins)

sample_ratio = 0.5                        # 每层采样的比例(0~1 之间)

random.seed(42)                           # 保证可复现

# Step 1: 读取数据

data = []

with jsonlines.open(filename, mode='r') as reader:

    for item in reader:

        data.append(item)

# Step 2: 分层采样

# 将chinese_ratio按等距分bin(如0~0.2,0.2~0.4等)

bin_edges = np.linspace(0, 1, n_bins + 1)  # eg: [0. , 0.2, 0.4, ..., 1.0]

binned_data = [[] for _ in range(n_bins)]

# 分配每个item到对应bin

for item in data:

    ratio = item.get('chinese_ratio', 0.0)

    # 确保最大值归到最后一组

    for i in range(n_bins):

        if bin_edges[i] <= ratio < bin_edges[i + 1] or (i == n_bins - 1 and ratio == 1.0):

            binned_data[i].append(item)

            break

# Step 3: 每层采样

sampled_data = []

for bin_items in binned_data:

    n = int(len(bin_items) * sample_ratio)

    sampled = random.sample(bin_items, min(n, len(bin_items)))

    sampled_data.extend(sampled)

# Step 4: 打印或保存结果

print(f"原始数量: {len(data)}, 采样后数量: {len(sampled_data)}")

# 保存采样结果(可选)

with jsonlines.open('sampled_by_chinese_ratio.jsonl', mode='w') as writer:

    writer.write_all(sampled_data)

对比决定还是换格式比较靠谱

1. 转换成dataframe格式

读取之前保存的文件, 转换成dataframe格式. 其中content列表中的元素拼接为一个字符串:

import jsonlines

import pandas as pd

read_file_path='data/quality_check.jsonl'

save_repeat_path='data/df_data.csv'


 

# 读取 JSONL 格式文件(上次保存的)

data = []

with jsonlines.open(read_file_path, mode='r') as reader:

    for obj in reader:

        # 拼接 content 中的所有段落为一个字符串

        obj['content'] = ''.join(obj['content'])

        data.append(obj)

# print(data[0])

#转换为 DataFrame

df = pd.DataFrame(data)

# print(df.head())

#保存为 CSV

df.to_csv(save_repeat_path, index=False)

# 读取 CSV 文件检查

df = pd.read_csv(save_repeat_path, dtype={'chinese_ratio': 'float32'})#节省内存

# 显示前几行数据

print(df.head())

2.分层抽样

content文字长度

content文字长度, chinese_ratio, 以及title_num分别做分布图

读取刚刚保存的csv文件, 针对content字符串长度做分布图

import pandas as pd

import matplotlib.pyplot as plt

import seaborn as sns

read_file_path='data/df_data.csv'

# save_repeat_path='data/df_data.csv'

# 读取 CSV 文件

df = pd.read_csv(read_file_path)

# 计算 content 字符串长度

df['content_length'] = df['content'].astype(str).apply(len)

# 横轴范围( 0 到 25000)

x_min = 0

x_max = 200

# 绘制分布图

plt.figure(figsize=(10, 6))

# sns.histplot(df['content_length'], bins=30, kde=True, color='skyblue', edgecolor='black')

sns.histplot(

    df['content_length'],

    bins=30,

    binrange=(x_min, x_max),

    kde=True,

    color='skyblue',

    edgecolor='black'

)

plt.title('Content Length Distribution')

plt.xlabel('Content Length (characters)')

plt.ylabel('Frequency')

plt.xlim(x_min, x_max)

plt.grid(True)

plt.tight_layout()

plt.show()

分布图例子:

content_length的最大值:

df['content_length'].max()

层内样本数量, 上界以上的样本占比

1. 统计content_length 为0-50样本数量, 50以上的样本占比

def count_sample_num(lb,ub):

    # 统计 0-50 范围内的样本数

    sample_num = df[(df['content_length'] > lb) & (df['content_length'] <= ub)].shape[0]

    # 统计 >50 的占比

    total = df.shape[0]

    count_above_ub = df[df['content_length'] > ub].shape[0]

    ratio_above_ub = count_above_ub / total * 100

    # 输出结果

    print(f"content_length 在 {lb} 到 {ub} 之间的样本数量: {sample_num}")

    print(f"content_length 大于 {ub} 的样本占比: {ratio_above_ub:.2f}%")

count_sample_num(0,50)

输出结果

content_length 在 0 到 50 之间的样本数量: 2

content_length 大于 50 的样本占比: 33.33%

2.在该区间内抽取3个样本, 注意处理样本比欲抽样个数小的情况

#避免文字过早截断

pd.set_option('display.max_colwidth', 1000)

def content_length_samples(lb,ub,sample_n):

    # 过滤出 content_length 在 (0, 50] 的样本

    subset = df[(df['content_length'] > lb) & (df['content_length'] <= ub)]

    # 检查子集大小,确保样本数大于等于 3

    sample_size = min(sample_n, subset.shape[0])

    # 随机抽取 3 个样本

    sampled = subset.sample(n=sample_size, random_state=42)

    # 显示结果

    print(f"---content长度 在 {lb} 到 {ub} 之间的样本---")

    print("抽取样本:")

    print(sampled[['topic', 'content']])

    # 统计 0-50 范围内的样本数

    sample_num = subset.shape[0]

    # 统计 >50 的占比

    total = df.shape[0]

    count_above_ub = df[df['content_length'] > ub].shape[0]

    ratio_above_ub = count_above_ub / total * 100

    # 输出结果

    print(f"样本数量: {sample_num}")

    print(f"样本占比: {ratio_above_ub:.2f}%")

content_length_samples(0,50,3)

中文比例

content文字长度, chinese_ratio, 以及title_num分别做分布图

读取刚刚保存的csv文件, 针对content字符串长度做分布图

x_min = 0

x_max = 1

# 绘制分布图

plt.figure(figsize=(10, 6))

sns.histplot(

    # (df['chinese_ratio'] * 100).map(lambda x: f"{x:.2f}%"),

    df['chinese_ratio'],

    bins=30,

    binrange=(x_min, x_max),

    color='skyblue',

    edgecolor='black'

)

plt.title('Chinese Ratio Distribution')

plt.xlabel('Chinese Ratio')

plt.ylabel('Frequency')

plt.xlim(x_min, x_max)

plt.grid(True)

plt.tight_layout()

plt.show()

抽样:

def nonchinese_ratio_samples(lb,ub,sample_n):

    label='nonchinese_ratio'

    df[label]=1-df['chinese_ratio']

    # 过滤出 content_length 在 (0, 50] 的样本

    subset = df[(df[label] > lb) & (df[label] <= ub)]

    # 检查子集大小,确保样本数大于等于 3

    sample_size = min(sample_n, subset.shape[0])

    # 随机抽取 n 个样本

    sampled = subset.sample(n=sample_size, random_state=42)

    # 显示结果

    print(f"---content长度 在 {lb} 到 {ub} 之间的样本---")

    print("抽取样本:")

    print(sampled[['topic', 'content']])

    # 统计 范围内的样本数

    sample_num = subset.shape[0]

    # 统计上界外的占比

    total = df.shape[0]

    count_above_ub = df[df[label] > ub].shape[0]

    ratio_above_ub = count_above_ub / total * 100

    # 输出结果

    print(f"区间内样本数量: {sample_num}")

    print(f"区间内样本占比: {sample_num/total* 100:.2f}%")

    print(f"大于{ub}样本占比: {ratio_above_ub:.2f}%")

nonchinese_ratio_samples(0,1,3)

保存抽样样本中的主要几列

columns_to_save = ['uuid', 'topic', 'content', 'chinese_ratio']

df[columns_to_save].to_csv('selected_columns.csv', index=False)

多语言分词

统计语言占比时, 词统计比字符统计更精确, 但是数据包含多语言, 因此需要考虑多语言分词

LangSegment可以看看Stanza库

统计一个字符串的words数量, 其中包含多种语言(中英日韩)和代码

将分词, 从而每种语言包含的:

对一个字符串进行word segmentation, 从而统计其包含的words数量, 字符串中包含多种语言(中英日韩)和代码

pip install LangSegment

from LangSegment import getTexts

from collections import Counter

# 示例文本,包含中文、日文、韩文和英文

text = "你的名字叫<ja>佐々木?</ja>吗?韩语中的오빠读什么呢?あなたの体育の先生は誰ですか? 此次发布会带来了四款iPhone 15系列机型和三款Apple Watch等一系列新品,这次的iPad Air采用了LCD屏幕"

# 获取分词结果

segments = getTexts(text)

# 统计每种语言的词数

lang_counts = Counter()

for seg in segments:

    lang = seg['lang']

    words = seg['text'].split()

    lang_counts[lang] += len(words)

# 输出每种语言的词数

for lang, count in lang_counts.items():

    print(f"{lang}: {count} words")

Nltk版本:

import re

import jieba

import nltk

from nltk.tokenize import word_tokenize

nltk.download('punkt')

def count_multilingual_words(text):

    # Segment Chinese text

    chinese_words = jieba.lcut(text)

    # Tokenize English and other languages

    other_words = word_tokenize(text)

    # Combine and deduplicate words

    combined_words = set(chinese_words + other_words)

    return len(combined_words)

# Example usage

text = "我爱Python编程。Hello, world!"

print("Word count:", count_multilingual_words(text))

split-lang的功能是将不同语言进行分割, 没有split_by_lang这个库与单词分割的功能, 其代码如下

pip install split-lang

from split_lang import LangSplitter

lang_splitter = LangSplitter()

text = "你喜欢看アニメ吗"

substr = lang_splitter.split_by_lang(

    text=text,

)

for index, item in enumerate(substr):

    print(f"{index}|{item.lang}:{item.text}")

区分不同语言后分割单词:

pip install jieba kiwipiepy

pip install sudachipy sudachidict_core

除了中英日韩, 其他文字用字符数统计词数

Kiwipiepy是Kiwi(韩国智能单词标识符)的Python版本

MeCab, Nagisa, konoha环境配置存在问题, 换成sudachipy

from split_lang import LangSplitter

import jieba

from kiwipiepy import Kiwi

from konoha import WordTokenizer

# 初始化分词器

lang_splitter = LangSplitter()

kiwi = Kiwi()

ja_tokenizer = WordTokenizer("nagisa")  # 使用 Nagisa 作为日语分词器

text = "你喜欢看アニメ吗?Hello, world! 안녕하세요. مرحبا بالعالم"

segments = lang_splitter.split_by_lang(text=text)

word_count = 0

for segment in segments:

    lang = segment.lang

    content = segment.text.strip()

    if not content or lang == 'pun':

        continue  # 跳过空字符串和标点

    if lang == 'zh':

        tokens = list(jieba.cut(content))

    elif lang == 'ja':

        tokens = [token.surface for token in ja_tokenizer.tokenize(content)]

    elif lang == 'ko':

        tokens = [token.form for token in kiwi.tokenize(content)]

    elif lang == 'en':

        tokens = content.split()

    else:

        # 对于其他语言,按字符数统计词数

        tokens = list(content)

    word_count += len(tokens)

print(f"总词数:{word_count}")

from split_lang import LangSplitter

import jieba

from kiwipiepy import Kiwi

from sudachipy import dictionary, tokenizer

# 初始化语言分段器和分词器

lang_splitter = LangSplitter()

kiwi = Kiwi()

sudachi_tokenizer = dictionary.Dictionary().create()

sudachi_mode = tokenizer.Tokenizer.SplitMode.C  # 使用最细粒度的分词模式

# 示例多语言文本

text = "你喜欢看アニメ吗?Hello, world! 안녕하세요. مرحبا بالعالم"

# 进行语言分段

segments = lang_splitter.split_by_lang(text=text)

# 初始化词数统计

word_count = 0

# 遍历每个语言段落

for segment in segments:

    lang = segment.lang

    content = segment.text.strip()

    if not content or lang == 'pun':

        continue  # 跳过空字符串和标点

    if lang == 'zh':

        tokens = list(jieba.cut(content))

    elif lang == 'ja':

        tokens = [m.surface() for m in sudachi_tokenizer.tokenize(content, sudachi_mode)]

    elif lang == 'ko':

        tokens = [token.form for token in kiwi.tokenize(content)]

    elif lang == 'en':

        tokens = content.split()

    else:

        # 对于其他语言,按字符数统计词数

        tokens = list(content)

    word_count += len(tokens)

print(f"总词数:{word_count}")

split() 保留了标点符号,可能导致单词统计不准确。因为字符串中包含代码片段, 因此使用正则r'\b\w+\b'分割

其中\b:​表示单词边界(word boundary),用于匹配单词的开始或结束位置。

\w+:​匹配一个或多个字母、数字或下划线,等价于 [A-Za-z0-9_]。

tokens = re.findall(r'\b\w+\b', content)

english_words = re.findall(r'\b[a-zA-Z]+\b', content)

lang == 'pun'代表为字符, x代表无法被识别

lang== 'x'时, 抽取其中英文字符, 正则r'\b\w+\b'分割单词, 抽取后剩余的部分计算字符数

lang== 'x'时, 抽取其中英文字符, 正则r'\b\w+\b'分割单词后计算单词数, 抽取后剩余的部分计算字符数

import re

from split_lang import LangSplitter

import jieba

from kiwipiepy import Kiwi

from sudachipy import dictionary, tokenizer

# 初始化分词器

lang_splitter = LangSplitter()

kiwi = Kiwi()

sudachi_tokenizer = dictionary.Dictionary().create()

sudachi_mode = tokenizer.Tokenizer.SplitMode.C  # 最细粒度

# 示例文本

text = "你喜欢看アニメ吗?Hello, world! 안녕하세요. مرحبا بالعالم"

# 进行语言分段

segments = lang_splitter.split_by_lang(text=text)

# 初始化词数统计

word_count = 0

# 遍历每个语言段落

for segment in segments:

    lang = segment.lang

    content = segment.text.strip()

    if not content or lang == 'pun':

        continue  # 跳过空字符串和标点

    if lang == 'zh':

        tokens = list(jieba.cut(content))

    elif lang == 'ja':

        tokens = [m.surface() for m in sudachi_tokenizer.tokenize(content, sudachi_mode)]

    elif lang == 'ko':

        tokens = [token.form for token in kiwi.tokenize(content)]

    elif lang == 'en':

        tokens = content.split()

    elif lang == 'x':

        # 判断是否为英文字符

        if re.search(r'[A-Za-z]', content):

            tokens = re.findall(r'\b\w+\b', content)

        else:

            tokens = list(content)

    else:

        tokens = list(content)

    word_count += len(tokens)

print(f"总词数:{word_count}")

# 提取英文单词

english_words = re.findall(r'\b\w+\b', text)

# 统计英文单词数

english_word_count = len(english_words)

# 移除英文单词后的剩余部分

remaining_text = text

for word in english_words:

    remaining_text = remaining_text.replace(word, '')

# 移除空格后的剩余部分

remaining_text = remaining_text.replace(' ', '')

# 统计剩余字符数

remaining_char_count = len(remaining_text)

Magpie

Magpie:

https://arxiv.org/html/2406.08464v2

https://zhuanlan.zhihu.com/p/717562402

直接从对齐的LLM中提取高质量指令数据来大规模合成高质量的指令数据

Our key observation is that aligned LLMs like Llama-3-Instruct can generate a user query when we input only the pre-query templates up to the position reserved for user messages, thanks to their auto-regressive nature.

We use this method to prompt Llama-3-Instruct and generate 4 million instructions along with their corresponding responses.

仅将pre-query模板输入到为用户消息保留的位置时,诸如Llama-3-Instruction之类的aligned LLM可以生成用户查询,这要归功于其auto-regressive性质。

仅输入左侧模板直到用户消息预留位置时,能够生成用户查询

对齐LLM的典型输入包含三个关键组成部分:预查询模板、查询和后查询模板。例如,对Llama-2-chat的输入可以是“[INST] Hi! [/INST]”,其中[INST]是预查询模板,[/INST]是后查询模板。

我们使用这种方法来提示Llama-3-Instruct模型,并生成400万个说明及其相应的响应。

We further introduce extensions of Magpie for filtering, generating multi-turn, preference optimization, domain-specific and multilingual datasets.

我们进一步介绍了Magpie的扩展,用于过滤,生成多转,偏好优化,域特异性和多语言数据集。

该图说明了如何使用系统提示来控制生成数学和中文指令。

用户prompt部分到end_head, 没有后续文字和eot_id, llm就会自动生成了

该系统提示为中文转换为:“您是AI助手,旨在为用户的中文问题提供准确,简洁的答案。”用户在中文中的回答翻译为:“我想了解AI的一些基本概念和历史发展趋势。”

http://www.xdnf.cn/news/186013.html

相关文章:

  • C语言数据结构—数组(cpu内存与指针)
  • CSS选择器
  • 6.图的OJ题(1-10,未完)
  • shell命令二
  • “八股训练营”学习总结
  • Linux学习——FTP(功能实现)
  • windows anaconda 配置 ipynb 环境
  • 《电商业务分析终极框架:从数据到决策的标准化路径》
  • OpenLayers WebGL与3D渲染 (进阶一)
  • 五分钟讲清数据需求怎么梳理!
  • 数据库关系模型的总结
  • 软件功能设计视角下的能源管理系统功能清单构建与实践
  • Redis高可用架构全解析:主从复制、哨兵模式与集群实战指南
  • 2025系统架构师---黑板架构风格
  • 风控策略引擎架构设计全解析:构建智能实时决策系统
  • 探索大语言模型(LLM):自监督学习——从数据内在规律中解锁AI的“自学”密码
  • MLLM之Bench:LEGO-Puzzles的简介、安装和使用方法、案例应用之详细攻略
  • OpenSSH 漏洞 CVE-2025-26465 和 CVE-2025-26466 可引发中间人攻击和 DoS 攻击
  • 毫米波振荡器设计知识笔记
  • BeautifulSoup的详细使用说明
  • 迈锐思C1pro插件安装包【附百度网盘链接】
  • 信创系统 sudoers 权限配置实战!从小白到高手
  • Spring 与 ActiveMQ 的深度集成实践(三)
  • ARP协议(地址解析协议)
  • Unreal Niagara制作Scratch随模型发射粒子特效
  • Make学习二:makefile组成要素
  • 基于STM32、HAL库的ADS1115模数转换器ADC驱动程序设计
  • 驱动开发硬核特训 · Day 22(上篇): 电源管理体系完整梳理:I2C、Regulator、PMIC与Power-Domain框架
  • ByeCode,AI无代码开发平台,拖拽式操作构建应用
  • OpenFeign 自定义拦截器