大量数据分批次处理+concurrent.futures.ThreadPoolExecutor多线程处理文件

背景:数据量很大的时候,需要多线程调用接口获取数据但是又不想一次性加载全部的原始数据进列表,可以结合批量加载数据和多线程。

代码实现:

        批量加载:

def load_data_in_batches(file_path, batch_size):if file_path.endswith(".json"):#先读取数据with open(file_path, "r") as file:data_list = json.load(file)else:data_list = []with open(file_path, "r") as f:file_lines = f.read().strip().split("\n")for line in file_lines:line = json.loads(line)data_list.append(line)for i in range(0, len(data_list), batch_size):yield data_list[i:i + batch_size]def prepare_batch_input_data(batch, model, temperature=None): #如果原始数据需要处理一下再发送给接口就在此方法进行 否则不需要此方法input_data_list = []for data in batch:query_content = data["messages"][0]["content"]input_data = {"prompt": [{"role": "user", "content": query_content}],"label": "","model": model,}if temperature is not None:input_data["temperature"] = temperatureinput_data_list.append(input_data)return input_data_list

        多线程调用接口:

import logging
default_logger = logging.getLogger(__name__)
class SSEClient:def __init__(self,data,# url=""auth_token="",logger=None,):if logger is None:self.logger = default_loggerself.url = urlself.headers = {"Authorization": auth_token,"Content-Type": "application/json",}self.data = dataself.send_time = 0  # 用于记录请求发送的时间self.create_time = time.time()  # 创建对象的时间self.first_message_time = 0  # 第一次接收消息的时间self.first_server_response = 0  # 第一次服务器响应的时间self.update_logger_context()def update_logger_context(self):self.logger = logging.LoggerAdapter(self.logger, {"session_id": self.session_id, "task_id": self.session_id})def get_result(self, show_details=False):self.send_time = time.time()  # 记录请求发送的时间# Stream outputresponse = requests.post(self.url,headers=self.headers,json=self.data,stream=True,)if response.headers.get("Content-Type") == "application/json":return {"status": "failed", "message": response.json()}if show_details:print("headers is ", response.headers)text = ""event_type = ""buffer = ""final_text = ""for line in response.iter_lines(decode_unicode=True):buffer += line + "\n"if not self.first_message_time:  # 当收到第一条消息时记录时间self.first_message_time = time.time()self.logger.info(f"First server message response at {self.first_message_time}")if buffer.endswith("\n\n"):for field_line in buffer.split("\n"):if field_line.startswith("event:"):event_type = field_line[len("event:") :].strip()if event_type == "finish":breakelif field_line.startswith("data:"):value = field_line[len("data:") :].replace("\\n", "\n")text += valueif show_details:print(f"Event Type: {event_type}, Text: {text}")final_text += textbuffer = ""text = ""self.message = final_text# 根据您提供的字典结构构建返回结果result = {"response": self.message,"data": copy.deepcopy(self.data),"send_time": self.send_time,"create_time": self.create_time,"first_message_time": self.first_message_time,"get_first_message_time": round(self.first_message_time - self.send_time, 2),"duration": round(time.time() - self.send_time, 2),"status": "success",}self.logger.info(f"Final result is {json.dumps(result)}")return resultdef invoke_model(data):for i in range(0, 1): #如果想一条数据多次调用的话#print(data)if "url" in data:url = data["url"]result = SSEClient(data, url=url).get_result()else:result = SSEClient(data).get_result()# 后面还可以加一些对数据的处理,这里可以直接定义储存,比如:with open(save_path,"a") as f:f.write(result)f.write("\n")def parallel_execution(demo_json_list, n_jobs=4):n_jobs = min(n_jobs, len(demo_json_list))results = [None] * len(demo_json_list)print(len(demo_json_list))with concurrent.futures.ThreadPoolExecutor(max_workers=n_jobs) as executor:# 使用字典来存储future与其在demo_json_list中的索引future_to_idx = {executor.submit(invoke_model, demo): idxfor idx, demo in enumerate(demo_json_list)}for feature, feature_idx in tqdm(future_to_idx.items(), desc="Processing"):try:result = feature.result(timeout=120)results[feature_idx] = resultexcept Exception as e:print(e)results[feature_idx] = None

联合使用:

def main(test_data_list, batch_size):for test_data in test_data_list: #如果有多个数据file_path = test_data["file_path"]temperature = test_data.get("temperature")# Load and process data in batchesfor batch in load_data_in_batches(file_path, batch_size):input_data_list = prepare_batch_input_data(batch, model, temperature)response_list = parallel_execution(input_data_list, n_jobs=n_jobs)data_list = {
[{"file_path":"","temperature":0.1},{"file_path":"","temperature":0.2}]
main(data_list,10)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/150004.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

【活动】人工智能时代,程序员如何保持核心竞争力?需要掌握哪些技能?

人工智能时代,程序员如何保持核心竞争力? 随着人工智能(AI)技术的迅猛发展,程序员面临着前所未有的挑战和机遇。AI不仅改变了软件开发的方式,也重新定义了程序员的角色。在这种背景下,如何保持…

15个RPA+GenAI的典型用例

RPA(机器人流程自动化)和生成式人工智能是数字化转型领域的两种流行工具: 到 2030 年,RPA 的全球市场预计将增长超过 130 亿美元。1.麦肯锡分析的 63 个用例中,预计生成式人工智能每年将增加 2.6 万亿美元 2. 这两种…

C语言长度受限制的字符串函数:(strncpy,strncat,strncmp)

strncpy 重点&#xff1a;1.拷贝num个字符从源字符串到目标空间 2.如果源字符串的长度小于num&#xff0c;则拷贝完源字符串之后&#xff0c;在目标的后边追加0&#xff0c;直到num个 3.这个函数不会拷贝\0。 列子&#xff1a; #include<stdio.h> #include<string…

u-navber自定义导航栏搜索框

效果 代码 <template><view><u-navbar :is-back"false"><view class"navbar"><view class"search"><image src"../../static/my_device/search_icon.png" class"search_image"></i…

三目运算判断字母大小写-C语言

1.问题&#xff1a; 输入一个字符&#xff0c;判别它是否为大写字母&#xff0c;如果是&#xff0c;将它转换成小写&#xff0c;如果不是&#xff0c;不转换。然后输出最后得到的字符&#xff0c;要求使用三目运算符。 2.解答&#xff1a; 用条件表达式来处理&#xff0c;当字…

单域名SSL证书和通配符SSL证书的区别,主要有3点不同

随着互联网的不断发展&#xff0c;网站安全性问题一直备受关注&#xff0c;在保护网站数据安全的过程中&#xff0c;SSL证书一直发挥着至关重要的作用。而在选择SSL证书时&#xff0c;单域名SSL证书和通配符SSL证书是两种常见的选择。本文将详细介绍单域名SSL证书和通配符SSL证…

智源研究院与百度达成战略合作 共建AI产研协同生态

2024年9月24日&#xff0c;北京智源人工智能研究院&#xff08;简称“智源研究院”&#xff09;与北京百度网讯科技有限公司&#xff08;简称“百度”&#xff09;正式签署战略合作协议&#xff0c;双方将充分发挥互补优势&#xff0c;在大模型等领域展开深度合作&#xff0c;共…

《开题报告》基于SpringBoot的交通管理系统的设计与实现+学习文档+答辩讲解视频

开题报告 研究背景 随着城市化进程的加速和机动车保有量的急剧增长&#xff0c;交通管理面临着前所未有的挑战。传统的交通管理方式&#xff0c;如人工监控、纸质记录等&#xff0c;已经难以满足现代交通管理的需求。交通拥堵、违章行为频发、事故处理效率低下等问题日益突出…

柒奶奶火完玖奶奶火,发疯文学号20天涨粉11万!疯狂变现10W+,一文教会你!

今天给大家分享的项目是**AI发疯文学号。**先看一下下面这组图片&#xff0c;点赞都是大几万&#xff0c;一个是柒奶奶另一个是玖奶奶&#xff0c;其实不管是哪个奶奶&#xff0c;都只是发疯文学的载体。 这种账号在小红书涨粉非常快&#xff0c;据说20天就达到了11W&#xff0…

Redis:哨兵机制

在上文主从复制的基础上&#xff0c;如果主节点出现故障该怎么办呢&#xff1f; 在 Redis 主从集群中&#xff0c;哨兵机制是实现主从库自动切换的关键机制&#xff0c;它有效地解决了主从复制模式下故障转移的问题。 哨兵机制&#xff08;Redis Sentinel&#xff09; Redis S…

Linux系统下载各大模型的方法

1. 下载Civitai模型 wget -O xxxx.safetensors "https://civitai.com/api/download/models/xxxx?&tokenxxxxxxxxxx" --content-disposition2. 下载huggingface模型 点击这3个点 选择Clone repository 如果是想下载当前仓库下所有文件&#xff0c;包括好多个GB的…

今年双11哪些东西值得买?分享五款实用耐用的好物,不再乱花钱!

随着一年一度的1111购物节脚步渐近&#xff0c;是否还在为挑选商品而犹豫不决&#xff1f;别担心&#xff0c;我们贴心整理了一份双十一必买好物推荐&#xff0c;专为追求品质生活的您量身打造。跟随这份清单&#xff0c;让您的数字生活更加丰富多彩&#xff0c;无需多虑&#…

自助服务智能终端界面设计,要遵循的7个原则。

自助服务智能终端在银行、医院、政务、公共服务大厅等场景下&#xff0c;为用户提供了诸多方面&#xff0c;因为面对的群体层次不一&#xff0c;所以在设计过程要遵循诸多原则&#xff0c;本文为大家总结了7点。 1. 界面简洁明了&#xff1a; 避免过多的文字和图标&#xff0…

ELK-02-skywalking-v10.0.1安装

文章目录 前言一、下载skywalking二、上传到服务器并解压三、安装jdk21四、修改配置五、启动总结 前言 skywalking-v10.0.1安装。 运用es持久化数据&#xff0c;所以需先完成ELK-01步骤。 一、下载skywalking 下载地址&#xff1a;https://skywalking.apache.org/downloads/ …

python-list

Python 列表 原文:https://www.geeksforgeeks.org/python-list/ 列表就像动态大小的数组&#xff0c;用其他语言声明(C中的 vector 和 Java 中的 ArrayList)。列表不必总是相同的&#xff0c;这使得它成为 Python 中最强大的工具。单个列表可能包含整数、字符串和对象等数据类型…

指针 (2)

目录 1.指针变量的⼤⼩ 2 指针的解引⽤ 3指针-整数 1.指针变量的⼤⼩ 指针变量的大小和编译器的位数有关系&#xff0c;例如vs2022的 x64 就是64位&#xff0c; x86 就是 32位 当两个同时运行一个代码的时候就会有差异。 当我在运行x86的时 总结&#xff1a; 在x86…

java面对对象高级

1.类变量和类方法 1.1static变量 &#xff08;1&#xff09;类变量&#xff1a; 也叫静态变量/静态属性&#xff0c;所有对象共享并且所有对象访问的值是相同的 static变量是同一个类所有对象共享的 static类变量&#xff0c;在类加载的时候就生成了 &#xff08;2&#xff09…

MySQL基础篇 - SQL

01 SQL通用语法 02 SQL分类 03 DDL语句 04 DML语句 05 DQL语句(单表查询) 05_01 学习总览 05_02 基本查询 05_03 条件查询 【应用实例】&#xff1a; 05_04 聚合函数 05_05 分组查询 05_06 排序查询 05_07 分页查询 【boss题目】&#xff1a; 05_08 执行顺序 06 DCL语句 【概…

国家标准和团体标准有什么区别?

国家标准和团体标准的区别主要体现在以下几个方面&#xff1a; 1. 制定标准的主体不同&#xff1a;国家标准是由国家机构通过并公开发布的标准&#xff1b;团体标准是由学会、协会、商会、联合会、产业技术联盟等社会团体协调相关市场主体共同制…

Libtorrent 安装、编译与使用(附 Boost 的编译与使用)

文章目录 Part.I IntroductionChap.I 预备知识Chap.II 所用设备系统与软件Part.II 准备工作Chap.I 编译 Boost 库Chap.II 下载必需文件Part.III 编译与使用 LibtorrentChap.I 运行 Example 和 TestChap.II 使用库文件ReferencePart.I Introduction libtorrent 是 BitTorrent 协…