业务数据批量插入数据库实践

业务数据如何存储一直以来都是项目开发中的一个比较重要的话题。我们要从资源的利用率,业务场景和技术实现多个方面考虑存储的问题。“抛开业务谈技术就是耍流氓”,所有技术架构都要站在实际的业务场景中分析。比如个人端的产品,这种就属于读多写少的业务,对于这种系统的架构就要更多的考虑增加缓存来减少数据库读压力;B端的产品,基本上属于写多读少的业务,客户数量不多但是单个客户的qps会非常高,要减少这种业务对数据库的压力,数据写入一般要采用批处理。物联网类型的产品,基本上也是属于写多读少的场景,一个设备会有几百上千的测点数据上报,并且上报的频率非常均匀,对于这种类型的系统我们要保证数据能够及时写入数据库并且不能对数据库产生太大的压力,一般我们就采用批量写库方案。下面就针对这种写多读少的物联网系统设计方案分享一下个人的经验。
我最近在做的一个系统是接收储能系统上报的数据,设备每10s上报一轮数据,云端接收到数据后做一些基本的处理判断逻辑,就将数据存入数据库。业务端展示数据分为历史数据和实时数据,历史数据用于其他系统或业务部门分析,实时数据需要做一些预警和最新数据展示。所以我设计的系统机构图大致如下:
系统架构图
系统使用kafka消息队列将接收数据和处理数据的两个服务进行解耦,在这个系统中,最为核心的服务就是数据服务,它要负责数据的处理、入库、查询等多个功能。在批量写数据库时我们要考虑两个方面:一个是批量插入数据多少条比较合适,另外一个就是如果数据积累比较缓慢最长等待多久就要执行一次批量写。
综合以上的分析,我设计的批量写入数据库采用 缓存队列+定时任务 方式实现,这样既能满足批量写入数据库的要求,又能满足及时更新数据库。
既然数据有缓存,就存在缓存数据丢失的风险,规避数据丢失的问题,行业内采用比较多的方案就是WAL(Write-Ahead Logging)。对于我的系统就是数据服务从kafka消费到数据后先写本地日志,再将数据放入缓存队列,然后通知kafka消费数据成功;数据在缓存队列中积累到批量写入数据库的条数或到达一定时间后就插入数据库。在这个过程中如果服务宕机导致缓存队列的数据没有入库,我们也可以通过本地日志找回数据,再次启动服务后重新加载插入数据库。
对于日志中的数据,会有一个标识标记当前已经入库的位置,当缓存队列的数据写入数据库后,更新日志标记点,标记点之前的数据都是写入数据库成功的,标记点之后的数据是目前还在缓存中并未写入数据库。如果程序正常退出,会有一个钩子函数清理缓存队列,将队列中的所有数据都写入数据库,清理标记点;如果程序异常退出,那么标记点之后的数据都是不安全的,在程序下一次正常启动时检查这个标记点,将标记点之后的数据重新加载进程序并写入数据库。这里的标记点通过在项目部署目录下创建一个文件来实现:程序每次写入数据库后更新这个标记点并将它写入文件,程序正常退出就通过钩子函数删除这个文件。下次程序启动后检查文件是否存在:不存在就是正常退出不用做任何处理;存在就是异常退出,这时就要读取标记点的值,把它之后的所有数据重新写入数据库。
通过上面的分析,整个写入数据的流程图如下:
处理数据业务流程
对于标记点的设计结构如下:
标记点结构
由于设备上报的数据有多种类型,那么缓存队列就存在多个,每个队列对应的标记点就有多个,这里选择数据存入缓存队列的时间戳作为标记,占用4个字节,每次写入数据库成功后就将这个时间戳写入到文件,多个缓存队列就依次向后写入文件,为了提高写文件性能,使用mmap将文件映射到内存中,通过修改内存中的值达到写文件的目的。

以上分析了整个项目的架构和实现逻辑,接下来就通过代码实现上面的这些逻辑。
talk is cheap, show me the code

一、项目结构:

使用SpringBoot3.2 + kafka3.6 + jdk17。
maven依赖如下:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>provided</scope><version>1.18.30</version>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId>
</dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>
<!-- redis相关依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency>
<!-- 数据库相关依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version><exclusions><exclusion><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>3.0.3</version>
</dependency>
<dependency><groupId>com.mysql</groupId><artifactId>mysql-connector-j</artifactId><version>8.0.33</version>
</dependency>

二、日志记录实现

写日志部分,就不单独实现了,采用logback框架写日志,单个日志文件大小设置为1GB,日志滚动策略采用大小+日期方式。配置文件在resource目录下的 logback-spring.xml 文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration><include resource="org/springframework/boot/logging/logback/defaults.xml" /><property name="LOG_PATH" value="logs"/><property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd HH:mm:ss.SSS}|%t|%-5p|%c|%m%n"/><!-- 业务日志:记录系统接收到的数据,用于数据丢失处理 --><appender name="redo" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/redo.log</file><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%m%n</pattern><charset>UTF-8</charset></encoder><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><FileNamePattern>${LOG_PATH}/redo-%i.log.%d{yyyy-MM-dd}</FileNamePattern><maxHistory>7</maxHistory><maxFileSize>1GB</maxFileSize><totalSizeCap>40GB</totalSizeCap></rollingPolicy></appender><logger name="redo" level="INFO" additivity="false"><appender-ref ref="redo"/></logger><!-- 控制台 --><appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"><encoder><pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}|%thread|[%-5level]|%logger{36}.%method|%msg%n</pattern><charset>UTF-8</charset></encoder></appender><!-- 错误日志:输出所有错误日志信息 --><appender name="FILE-ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/error/error.log</file><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/error/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern><maxHistory>30</maxHistory><maxFileSize>500MB</maxFileSize><totalSizeCap>10GB</totalSizeCap></rollingPolicy><encoder><pattern>${FILE_LOG_PATTERN}</pattern><charset>utf8</charset></encoder><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>ERROR</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- WARN日志:输出所有WARN日志信息 --><appender name="FILE-WARN" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/warn/warn.log</file><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/warn/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern><maxHistory>30</maxHistory><maxFileSize>500MB</maxFileSize><totalSizeCap>10GB</totalSizeCap></rollingPolicy><encoder><pattern>${FILE_LOG_PATTERN}</pattern><charset>utf8</charset></encoder><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>WARN</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 消息日志:输出所有消息日志信息 --><appender name="FILE-INFO" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/info/info.log</file><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/info/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern><maxHistory>30</maxHistory><maxFileSize>500MB</maxFileSize><totalSizeCap>10GB</totalSizeCap></rollingPolicy><encoder><pattern>${FILE_LOG_PATTERN}</pattern><charset>utf8</charset></encoder><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>INFO</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 调试日志:输出所有消息日志信息 --><appender name="FILE-DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender"><file>${LOG_PATH}/debug/debug.log</file><rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"><fileNamePattern>${LOG_PATH}/debug/%d{yyyy-MM-dd}-%i.log.gz</fileNamePattern><maxHistory>30</maxHistory><maxFileSize>500MB</maxFileSize><totalSizeCap>10GB</totalSizeCap></rollingPolicy><encoder><pattern>${FILE_LOG_PATTERN}</pattern><charset>utf8</charset></encoder><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>DEBUG</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><root level="INFO"><appender-ref ref="CONSOLE"/><appender-ref ref="FILE-ERROR"/><appender-ref ref="FILE-WARN"/><appender-ref ref="FILE-INFO"/><appender-ref ref="FILE-DEBUG"/></root>
</configuration>

日志格式为: 日志时间|消息主题|消息内容 ,消息主题就是kafka的主题,时间戳是接收到消息的时间戳,这个时间戳用于异常退出时下次启动服务恢复消息时判断数据是在标记点前后的依据,消息内容就是具体消费的信息。示例数据如下:

2024-09-23 15:03:19.378|batch-message|{"data":"{\"id\":\"1727074999121\",\"code\":\"code1--1\",\"name\":\"name1--1\",\"random\":\"0eaf04363d5a49419967d500dc149e5a\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work1Data"}
2024-09-23 15:03:19.734|batch-message|{"data":"{\"id\":\"1727074999359\",\"code\":\"code2--1\",\"name\":\"name2--1\",\"random\":\"89b8b315f4e84fedb5af799ce4860c56\",\"createTime\":\"2024-09-23 15:03:19\"}","dataType":"Work2Data"}

三、消费KAFKA数据

数据是通过消费kafka队列接收消息,如果消息被成功接收并且写入本地日志和缓存队列,那么就提交ack给kafka,实现代码如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.xingo.common.JacksonUtils;
import org.xingo.consumer.batch.BatchWorkFactory;
import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;
import org.xingo.domain.Work1Data;
import org.xingo.domain.Work2Data;import java.util.List;/*** @Author xingo* @Date 2024/9/13*/
@Slf4j
@Component
public class KafkaConsumer {Logger redoLogger = LoggerFactory.getLogger("redo");@Autowiredprivate BatchWorkFactory batchWorkFactory;/*** 数据消费者*/@KafkaListener(topics = "batch-message")public void linsten01(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {if(records != null) {for (ConsumerRecord<String, String> record : records) {// 写本地日志long ts = System.currentTimeMillis();System.out.println(record.value());redoLogger.info("{}|{}", record.topic(), record.value());// 解析消息内容,保存到本地缓存队列try {JsonNode json = JacksonUtils.getObjectMapper().readTree(record.value());String dataType = json.get("dataType").asText();if("Work1Data".equals(dataType)) {Work1Data work1Data = JacksonUtils.parseObject(json.get("data").asText(), Work1Data.class);work1Data.setTs(ts);batchWorkFactory.get(BatchWork1.class).add(work1Data);} else if("Work2Data".equals(dataType)) {Work2Data work2Data = JacksonUtils.parseObject(json.get("data").asText(), Work2Data.class);work2Data.setTs(ts);batchWorkFactory.get(BatchWork2.class).add(work2Data);}} catch (JsonProcessingException e) {log.error("解析数据异常", e);}}//提交offset消费成功ack.acknowledge();}}
}

这里面json反序列化使用到了Jackson工具类:

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Locale;
import java.util.TimeZone;/*** json工具** @Author xingo* @Date 2023/12/15*/
public class JacksonUtils {private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();static {// Long类型处理,避免前端处理长整型时精度丢失SimpleModule module1 = new SimpleModule();module1.addSerializer(Long.class, ToStringSerializer.instance);module1.addSerializer(Long.TYPE, ToStringSerializer.instance);JavaTimeModule module2 = new JavaTimeModule();// java8日期处理module2.addSerializer(LocalDateTime.class,new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));module2.addSerializer(LocalDate.class,new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));module2.addSerializer(LocalTime.class,new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));module2.addDeserializer(LocalDateTime.class,new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));module2.addDeserializer(LocalDate.class,new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));module2.addDeserializer(LocalTime.class,new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));OBJECT_MAPPER// 添加modules.registerModules(module1, module2, new Jdk8Module())// 日期类型不转换为时间戳.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).configure(SerializationFeature.WRITE_DURATIONS_AS_TIMESTAMPS, false)// 反序列化的时候如果多了其他属性,不抛出异常.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)// 如果是空对象的时候,不抛异常.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)// 空对象不序列化.setSerializationInclusion(JsonInclude.Include.NON_NULL)// 日期格式化.setDateFormat(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"))// 设置时区.setTimeZone(TimeZone.getTimeZone("GMT+8"))// 驼峰转下划线// .setPropertyNamingStrategy(PropertyNamingStrategies.SNAKE_CASE)// 语言.setLocale(Locale.SIMPLIFIED_CHINESE);}/*** 反序列化* @param json      json字符串* @param clazz     发序列化类型* @return* @param <T>*/public static <T> T parseObject(String json, Class<T> clazz) {if(json == null) {return null;}try {return OBJECT_MAPPER.readValue(json, clazz);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public static <T> T parseObject(String json, TypeReference<T> type) {if(json == null) {return null;}try {return OBJECT_MAPPER.readValue(json, type);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public static <T> T parseObject(byte[] bytes, TypeReference<T> type) {if(bytes == null) {return null;}try {return OBJECT_MAPPER.readValue(bytes, type);} catch (Exception e) {e.printStackTrace();}return null;}/*** 反序列化列表* @param json* @return* @param <T>*/public static <T> List<T> parseArray(String json) {if(json == null) {return null;}try {TypeReference<List<T>> type = new TypeReference<List<T>>(){};return OBJECT_MAPPER.readValue(json, type);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}/*** 写为json串* @param obj   对象* @return*/public static String toJSONString(Object obj) {if(obj == null) {return null;}try {return OBJECT_MAPPER.writeValueAsString(obj);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}public static byte[] toJSONBytes(Object obj) {if(obj == null) {return null;}try {return OBJECT_MAPPER.writeValueAsBytes(obj);} catch (Exception e) {e.printStackTrace();}return null;}/*** 获取jackson对象* @return*/public static ObjectMapper getObjectMapper() {return OBJECT_MAPPER;}
}

四、数据缓存队列

缓存数据的队列有两种数据结构可选:数组和链表,链表适用于数据量不定的场景,随机增减节点非常方便;数组长度一定,随机增减性能比较低,对于顺序读写性能比链表要好。这里的缓存主要是尾部添加,一次读取入库的场景,综合来看,数组性能会好一些,由于缓存数据条数不一定,只有最大长度限制,选择ArrayList这个结构非常方便。至于并发问题,在读写时加锁处理。
对于数据缓存这部分逻辑,我们定义一个抽象类叫BatchWork,它主要用于公共处理逻辑的抽取,对于不同数据结构的对象,只需要继承这个父类,并且实现该对象批量处理数据的逻辑即可。
批处理抽象父类定义如下:

import lombok.extern.slf4j.Slf4j;
import org.xingo.domain.WorkData;import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;/*** 批处理任务父类* * @Author xingo* @Date 2024/9/13*/
@Slf4j
public abstract class BatchWork<T extends WorkData> {/*** 最大时间间隔,超过这个时间间隔还没有执行数据批量插入就执行定时任务*/public static final int DIFF_TIMESTAMP = 30_000;/*** 执行时间间隔:30s*/public static final int period = 30;    // 执行的间隔时间/*** 批量插入数据最大条数*/protected short batchSize = 1000;/*** 最近一次执行数据插入时间戳*/protected long lastTimestamp = 0L;/*** 任务名称*/protected BatchWorkEnum batchWork;/*** 系统操作线程池:构建一个核心池大小是8、线程池最大线程数是16的线程池,可以根据主机CPU核数进行调整*/public static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(8, 16, 5, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(8), new ThreadFactory() {private final AtomicInteger number = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("batch-savedb-" + number.getAndIncrement());return thread;}});/*** 定时任务线程池:定时检查缓存队列,通过判断缓存队列最后一次执行时间来判断是否要将缓存队列数据插入数据库表中*/private static final ScheduledExecutorService taskExecutor = Executors.newScheduledThreadPool(16, new ThreadFactory() {private final AtomicInteger number = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("batch-work-" + number.getAndIncrement());return thread;}});/*** 缓存数据列表*/private List<T> datas = new ArrayList<>(batchSize);/*** 锁对象*/private final ReentrantLock lock = new ReentrantLock();/*** 启动定时执行任务方法,在项目启动时启动*/public void run() {if(batchWork == null) {throw new RuntimeException("属性 [batchWork] 不能为空");}// 最后更新时间与当前时间差大于时间间隔,将执行一次批处理任务Runnable task = () -> {long diff = System.currentTimeMillis() - lastTimestamp;if(diff >= DIFF_TIMESTAMP) {batch();}};Random random = new Random();long initialDelay = random.nextInt(10, 30);     // 初始延迟时间// 为了让任务尽可能均匀分布,所有要批量处理的任务初始时间随机生成taskExecutor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);BatchWorkFactory.WORK_NUMS.incrementAndGet();log.info("{}批量任务开始执行", batchWork);}/*** 服务关闭时执行*/public void clear() {log.info("项目关闭开始清理{}批量任务", batchWork);this.batch();BatchWorkFactory.WORK_NUMS.decrementAndGet();log.info("{}批量任务已结束", batchWork);}/*** 添加数据方法:将要插入数据添加到缓存队列* @param data*/public void add(T data) {lock.lock();try {datas.add(data);} finally {lock.unlock();}this.cache(data);// 添加数据后判断缓存数据条数,如果缓存条数超过了批处理阈值,执行一次批处理,结合定时任务就实现了“最大缓存条数或时间间隔”内插入数据的逻辑if(datas.size() >= batchSize) {batch();}}/*** 批量新增数据:将一批数据添加到缓存队列* @param batch*/public void batchAdd(List<T> batch) {lock.lock();try {datas.addAll(batch);} finally {lock.unlock();}if(datas.size() >= batchSize) {batch();}}/*** 批量处理方法:具体子类要实现这个方法*/public abstract void batchInsertDb(List<T> datas);/*** 数据缓存:缓存最新一条数据* @param data*/public abstract void cache(T data);/*** 批处理方法:* 将当前缓存的批处理数据赋值给其他对象,当前缓存列表重新申请一个空间接收数据*/private void batch() {log.info("调用{}批量处理方法", batchWork);if(!datas.isEmpty()) {List<T> copyList = null;lock.lock();try {if(!datas.isEmpty()) {// 定义一个局部变量指向原有的数组,原有的数组重新申请空间用于接收数据copyList = datas;datas = new ArrayList<>(batchSize);}} finally {lock.unlock();}if(copyList != null && !copyList.isEmpty()) {final List<T> dbList = copyList;// 插入数据是比较耗时的过程,这里放入异步线程池慢慢执行,主线程继续接收数据threadPool.execute(() -> {log.info("{}写入数据库开始|{}", batchWork, dbList.size());long s = System.currentTimeMillis();this.batchInsertDb(dbList);lastTimestamp = System.currentTimeMillis();long _ts = dbList.get(dbList.size() - 1).getTs();log.info("{}写入数据库完成|{}|{}|{}ms", batchWork, dbList.size(), new Timestamp(_ts), (lastTimestamp - s));// 插入数据完成后,更新标记点MarkPointLog.markPoint(batchWork, _ts);});}}}}

demo项目模拟了两类数据批处理:
批处理任务1:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.domain.Work1Data;
import org.xingo.common.JacksonUtils;
import org.xingo.mapper.Work1Mapper;import java.util.List;
import java.util.concurrent.TimeUnit;/*** 批处理子类1* * @Author xingo* @Date 2024/9/13*/
@Slf4j
@Service
public class BatchWork1 extends BatchWork<Work1Data> {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate Work1Mapper work1Mapper;public BatchWork1() {this.batchWork = BatchWorkEnum.Work1;}@Overridepublic void batchInsertDb(List<Work1Data> datas) {try {work1Mapper.batchInsert(datas);} catch (Exception e) {log.error("批量插入数据异常", e);// 这里是批量插入数据如果异常,转为单条插入数据,一般的异常都是数据库主键冲突导致的datas.forEach(data -> {try {work1Mapper.insert(data);} catch (Exception ex) {log.error("单条插入数据异常", ex);}});}log.info("批量处理数据|{}|{}", this.batchWork, datas.size());}@Overridepublic void cache(Work1Data data) {// 缓存最新一条数据供其他业务使用redisTemplate.opsForValue().set("data:work1", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);}
}

批处理任务2:

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.xingo.consumer.batch.BatchWork;
import org.xingo.consumer.batch.BatchWorkEnum;
import org.xingo.common.JacksonUtils;
import org.xingo.domain.Work2Data;
import org.xingo.mapper.Work2Mapper;import java.util.List;
import java.util.concurrent.TimeUnit;/*** 批处理子类2* * @Author xingo* @Date 2024/9/13*/
@Slf4j
@Service
public class BatchWork2 extends BatchWork<Work2Data> {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate Work2Mapper work2Mapper;public BatchWork2() {this.batchWork = BatchWorkEnum.Work2;}@Overridepublic void batchInsertDb(List<Work2Data> datas) {try {work2Mapper.batchInsert(datas);} catch (Exception e) {log.error("批量插入数据异常", e);datas.forEach(data -> {try {work2Mapper.insert(data);} catch (Exception ex) {log.error("单条插入数据异常", ex);}});}log.info("批量处理数据|{}|{}", this.batchWork, datas.size());}@Overridepublic void cache(Work2Data data) {redisTemplate.opsForValue().set("data:work2", JacksonUtils.toJSONString(data), 30, TimeUnit.SECONDS);}
}

这里只有两个批处理任务,如果批处理任务有更多个,那么就需要一个统一的地方管理这些任务:
定义一个工厂类管理批量任务,创建、销毁、获取这些批量任务都通过这个工厂类来实现:

import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.core.type.filter.AssignableTypeFilter;
import org.springframework.stereotype.Component;import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** 批量任务工厂** @Author xingo* @Date 2024/9/13*/
@Slf4j
@Component
public class BatchWorkFactory {/*** 批量任务集合*/private final Map<Class<? extends BatchWork>, BatchWork> map = new HashMap<>();@Autowiredprivate ApplicationContext applicationContext;@Autowiredprivate MarkPointLog markPointLog;/*** 工厂初始化是否完成*/private boolean ok = false;/*** 任务启动完成*/public static final AtomicInteger WORK_NUMS = new AtomicInteger(0);@PostConstructpublic void run() {// 构建bean缓存:通过spring提供的类扫描目录下的所有子类加载到集合中做统一管理ClassPathScanningCandidateComponentProvider provider = new ClassPathScanningCandidateComponentProvider(false);provider.addIncludeFilter(new AssignableTypeFilter(BatchWork.class));Set<BeanDefinition> components = provider.findCandidateComponents("org/xingo/consumer/batch/impl");for (BeanDefinition component : components) {try {Class<? extends BatchWork> clazz = (Class<? extends BatchWork>) Class.forName(component.getBeanClassName());BatchWork bean = applicationContext.getBean(clazz);map.put(clazz, bean);log.info("初始化加载类实例信息|{}|{}", component.getBeanClassName(), bean);} catch (Exception e) {log.error("加载类异常", e);}}// 初始标记服务markPointLog.initMarkPointLog();// 批处理任务启动方法map.values().forEach(BatchWork::run);try {// 标记任务启动成功while (WORK_NUMS.get() != map.size()) {TimeUnit.MICROSECONDS.sleep(5);}ok = true;} catch (Exception e) {e.printStackTrace();}log.info("批处理任务全部启动");}/*** 正常关闭处理*/@PreDestroypublic void clear() {try {// 批处理任务关闭方法map.values().forEach(BatchWork::clear);// 等待批量任务全部处理完成while (WORK_NUMS.get() != 0) {TimeUnit.SECONDS.sleep(1);log.info("等待批处理任务结束,剩余{}个任务关闭", WORK_NUMS.get());}// 清理标记文件markPointLog.destroyMarkPointLog();log.info("批处理任务全部关闭");} catch (Exception e) {log.error("清理本地批处理任务异常", e);}}/*** 获取批量执行任务类* @param workName* @return*/public BatchWork get(Class<? extends BatchWork> workName) {if(ok) {return map.get(workName);} else {int cnt = 0;while (!ok) {try {TimeUnit.MILLISECONDS.sleep(3);if(cnt++ >= 10000) {break;}} catch (Exception e) {log.error("获取批处理任务异常", e);}}return map.get(workName);}}}

在BathWork抽象类中,数据批量插入数据库后要记录下当前插入数据的时间点工具类:MarkPointLog:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.math.BigDecimal;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** 数据标记点,每次插入数据后就要更新标记点* * @Author xingo* @Date 2024/9/13*/
@Slf4j
@Component
public class MarkPointLog {@Value("${user.dir}")private String dirPath;@Autowiredprivate LoadDataService loadDataService;/*** 标记服务正常关闭文件路径:* 服务正常关闭时会将文件删除,* 下次启动时如果文件存在,表示上次是异常关闭的,那么就会处理历史消息重新发布一次处理*/private String filePath = null;/*** 写文件*/private static MappedByteBuffer buffer = null;/*** 锁对象*/private static final ReentrantLock lock = new ReentrantLock();/*** 初始化*/public void initMarkPointLog() {filePath = dirPath.endsWith(File.separator) ? (dirPath + "work") : (dirPath + File.separator + "work");File file = new File(filePath);if(file.exists()) {RandomAccessFile rw = null;try {long minTs = System.currentTimeMillis();rw = new RandomAccessFile(file, "r");SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");for (BatchWorkEnum mark : BatchWorkEnum.values()) {long ts = rw.readLong();if(ts < minTs) {minTs = ts;}log.error("系统异常关闭最后提交数据时间戳|{}|{}|{}", mark, ts, datetimeFormat.format(ts));}final long min = minTs;new Thread(() -> loadDataService.loadData(min), "loadLogWork").start();} catch (Exception e) {log.error("读取状态文件异常", e);} finally {if(rw != null) {try {rw.close();} catch (IOException e) {log.error("读取状态文件异常", e);}}}}// 映射运行状态文件FileChannel channel = null;try {channel = new RandomAccessFile(file, "rw").getChannel();int size = BatchWorkEnum.values().length * 8;buffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, size);buffer.putLong(0);} catch (Exception e) {e.printStackTrace();} finally {try {if (channel != null) {channel.close();}} catch (IOException e) {e.printStackTrace();}}}/*** 清理标记点文件*/public void destroyMarkPointLog() {if(filePath != null) {File file = new File(filePath);if(file.exists()) {boolean rs = file.delete();log.info("删除标识文件完成|{}|{}", filePath, rs);}}}/*** 最大标记点* @param workEnum* @param ts*/public static void markPoint(BatchWorkEnum workEnum, long ts) {lock.lock();try {int position = workEnum.getIdx() * 8;buffer.position(position);buffer.putLong(ts);buffer.force();} finally {lock.unlock();}}}

当数据在缓存队列中还未插入数据库之前系统宕机了,这就存在数据丢失风险,那么在系统启动后就要重新消费这部分数据:

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;/*** @Author xingo* @Date 2024/9/23*/
@Slf4j
@Component
public class LoadDataService {@Value("${user.dir}")private String dirPath;@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 加载数据到系统等待再次入库处理** @param timestamp*/public void loadData(long timestamp) {// 最小时间再减去一个时间间隔,认为这段时间内的数据都是不安全的long startTimestamp = timestamp - BatchWork.DIFF_TIMESTAMP;long endTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(5);String logPath = dirPath + File.separator + "logs";File kafkaLogs = new File(logPath);File file1 = null, file2 = null;String yesterday = LocalDate.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));// 查找当前的日志文件if(kafkaLogs.exists() && kafkaLogs.isDirectory()) {File[] files = kafkaLogs.listFiles();if(files != null) {for(File file : files) {String fname = file.getName();if(fname.equals("redo.log")) {log.info("匹配日志文件|{}", fname);file1 = file;} else if(fname.endsWith(yesterday)) {log.info("匹配历史日志文件|{}", fname);file2 = file;}}}}if(file1 != null) {sendDataToQueue(file1, startTimestamp, endTimestamp);}if(file2 != null) {sendDataToQueue(file2, startTimestamp, endTimestamp);}}/*** 一行数据最大长度,用于数据查找时回退*/private int LINE_MAX_SIZE = 1000;/*** 发送数据到消息* @param file* @param startTimestamp* @param endTimestamp*/private void sendDataToQueue(File file, long startTimestamp, long endTimestamp) {// 查找数据:二分法查找要加载的数据RandomAccessFile raf = null;SimpleDateFormat datetimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");try {raf = new RandomAccessFile(file, "rw");long start = 0L;long end = file.length();long point = 0L;String line = null;// 2024-09-23 10:20:25.387|batch-message|{"data":{"id":"1727058025075","code":"code1--1727058025075","name":"name1--1727058025075","random":"bdfa8ca9d5084c25b9267f34e7bff6cf","createTime":"2024-09-23 10:20:25"},"dataType":"Work1Data"}boolean find = false;while (true) {if(end - start < 60) {break;}point = (start + end) / 2;raf.seek(Math.max(0, point - LINE_MAX_SIZE));line = raf.readLine();if (StringUtils.isBlank(line)) {break;}String[] arr = line.split("\\|");if (arr.length != 3) {line = raf.readLine();if (line == null || "".equals(line)) {break;}arr = line.split("\\|");} else {try {datetimeFormat.parse(arr[0]);} catch (ParseException e) {line = raf.readLine();if (line == null || "".equals(line)) {break;}arr = line.split("\\|");}}long time = datetimeFormat.parse(arr[0]).getTime();if (time < startTimestamp) {start = point;} else {find = true;break;}}if (find) {raf.seek(start);line = raf.readLine();String[] arr = line.split("\\|");if (arr.length != 3) {line = raf.readLine();} else {try {datetimeFormat.parse(arr[0]);} catch (ParseException e) {line = raf.readLine();}}while (StringUtils.isNotBlank(line)) {arr = line.split("\\|");long time = datetimeFormat.parse(arr[0]).getTime();if(time > endTimestamp) {break;}// 这里将可能丢失的数据重新发送到消息队列再次消费持久化if(time >= startTimestamp) {kafkaTemplate.send(arr[1], arr[2]);log.info("加载可能丢失的数据|{}", arr[2]);}line = raf.readLine();}}} catch (Exception e) {log.error("读取文件异常", e);} finally {try {if(raf != null) {raf.close();}} catch (IOException e) {log.error("关闭文件异常", e);}}}
}

是否加载数据主要是根据项目目录下释放存在一个标记文件:work。这个文件在项目启动时创建,在项目正常关闭时删除这个文件。在数据批量插入成功后会把标记点也写入到这个文件中,由于存在多个缓存队列,每个缓存队列需要写入到这个文件的不同位置,这里需要定义一个枚举类记录每个缓存队列写入到这个文件的位置:

import org.xingo.consumer.batch.impl.BatchWork1;
import org.xingo.consumer.batch.impl.BatchWork2;/*** 批量任务名** @Author xingo* @Date 2024/9/13*/
public enum BatchWorkEnum {/*** 批量任务1*/Work1((byte) 0, BatchWork1.class),/*** 批量任务2*/Work2((byte) 1, BatchWork2.class),;/*** 批量任务队列的序列号,是一个从0开始递增的值;* 它主要用于批量数据入库后最新数据时间点记录文件查找和写入时使用* 如果新增缓存队列这个序号递增,不能更改已经存在的序号* 如果要调整或删除已有的序号,要保证系统正常退出后再操作,否则会有数据丢失风险*/private byte idx;/*** 批量任务对应的类*/private Class<? extends BatchWork> work;BatchWorkEnum(byte idx, Class<? extends BatchWork> work) {this.idx = idx;this.work = work;}public byte getIdx() {return idx;}public Class<? extends BatchWork> getWork() {return work;}
}

五、数据对象处理

上面这些逻辑都完成了,最后一步就是数据持久化,我这里使用mysql数据库做持久化测试,数据对象和持久化代码如下:
统一的父类:

import java.io.Serializable;/*** @Author xingo* @Date 2024/9/13*/
public class WorkData implements Serializable {/*** 数据放入缓存队列时间*/protected long ts;public Long getTs() {return ts;}public void setTs(Long ts) {this.ts = ts;}
}

两个实体类:

实体类1:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.time.LocalDateTime;/*** @Author xingo* @Date 2024/9/13*/
@Data
@TableName("t_work1")
public class Work1Data extends WorkData {private Long id;private String code;private String name;private String random;private LocalDateTime createTime;
}

实体类2:

import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;import java.time.LocalDateTime;/*** @Author xingo* @Date 2024/9/13*/
@Data
@TableName("t_work2")
public class Work2Data extends WorkData {private Long id;private String code;private String name;private String random;private LocalDateTime createTime;
}

xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work1Mapper"><insert id="batchInsert" parameterType="hashmap">INSERT INTO t_work1 (id, code, name, random, ts, create_time) VALUES<foreach collection="datas" item="item" index="index" separator=",">(#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})</foreach></insert>
</mapper>
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.xingo.mapper.Work2Mapper"><insert id="batchInsert" parameterType="hashmap">INSERT INTO t_work2 (id, code, name, random, ts, create_time) VALUES<foreach collection="datas" item="item" index="index" separator=",">(#{item.id}, #{item.code}, #{item.name}, #{item.random}, #{item.ts}, #{item.createTime})</foreach></insert>
</mapper>

Mapper接口:

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work1Data;import java.util.List;/*** @Author xingo* @Date 2024/9/23*/
@Mapper
public interface Work1Mapper extends BaseMapper<Work1Data> {void batchInsert(List<Work1Data> datas);
}
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.xingo.domain.Work2Data;import java.util.List;/*** @Author xingo* @Date 2024/9/23*/
@Mapper
public interface Work2Mapper extends BaseMapper<Work2Data> {void batchInsert(List<Work2Data> datas);
}

数据库建表SQL语句:

CREATE TABLE `t_work1`  (`id` bigint NOT NULL,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`create_time` datetime NULL DEFAULT NULL,`ts` bigint NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;CREATE TABLE `t_work2`  (`id` bigint NOT NULL,`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`name` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`random` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,`create_time` datetime NULL DEFAULT NULL,`ts` bigint NULL DEFAULT NULL,PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;

六、测试验证

  1. 正常情况:验证正常插入情况非常简单,编写一个测试方法不断的向kafka的主题中写入数据,观察数据消费和入库情况,通过对比发送数据和接收数据是否存在差异来确定服务是否有问题。
  2. 正常退出:在正常运行情况下,对批量处理服务执行 kill -15 关闭程序,正常退出情况下项目根目录下的 work 文件会被清除,那么下次程序启动时就不需要重新加载日志数据。
  3. 异常退出:在正常运行情况下对批量处理服务执行kill -9 命令杀死进程,观察项目根目录下的 work 文件还存在,并没有被清除,这就表明程序在上次退出时是异常退出的,那么在程序再次启动时会重新加载日志数据,避免程序异常导致数据丢失的风险。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149398.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

fiddler抓包11_列表显示服务器IP (配置文件)

请求列表默认不显示服务器IP字段&#xff0c;也无法从定制列窗口添加&#xff0c;可以修改CustomRules.js实现。 ① 菜单栏“Rules”&#xff08;规则&#xff09; - “Customize Rules...”&#xff08;自定义规则&#xff09;&#xff0c;打开CustomRules.js文件。 &#xf…

基于stm32的跑步机控制系统设计-设计说明书

设计摘要&#xff1a; 随着人们对健康和健身的关注增加&#xff0c;跑步机逐渐成为室内健身的主要设备之一。本文提出了一种基于STM32的跑步机控制系统设计&#xff0c;旨在实现对跑步机的运行速度、倾斜角度和运动模式等参数的精确控制&#xff0c;提供更好的健身体验。 首先…

vue3开发中易遗漏的常见知识点

文章目录 组件样式的特性Scoped CSS之局部样式的泄露Scoped CSS之深度选择器CSS Modules在CSS中使用v-bind 非props属性继承组件通信父子组件的相互通信props/$emit父组件传递数据给子组件子组件传递数据给父组件 非父子组件的相互通信Provide/inject全局事件总线 组件插槽作用…

LVGL 控件之消息框(lv_msgbox)

目录 一、概述二、消息框1、创建消息框2、获取消息框的组成部分3、关闭消息框部件4、消息框部件事件5、API 函数 一、概述 消息框部件是由多个小部件构建而成的&#xff0c;包括&#xff1a;lv_obj、lv_btn、lv_label 和 lv_btnmatrix 部件&#xff0c;示意图如下所示&#xf…

简单题83. 删除排序链表中的重复元素 (Java)20240920

问题描述&#xff1a; java&#xff1a; /*** Definition for singly-linked list.* public class ListNode {* int val;* ListNode next;* ListNode() {}* ListNode(int val) { this.val val; }* ListNode(int val, ListNode next) { this.val val; th…

【LLM学习之路】9月23日24日 第十、十一天 Attention代码解读

【LLM学习之路】9月23日24日 第十、十一天 Attention代码解读 Transformer模型大致分为三类 纯 Encoder 模型&#xff08;例如 BERT&#xff09;&#xff0c;又称自编码 (auto-encoding) Transformer 模型&#xff1b;纯 Decoder 模型&#xff08;例如 GPT&#xff09;&#…

基于python的django微博内容网络分析系统,实现文本划分词结构

本项目旨在开发一个基于Python的Django框架的微博内容网络分析系统&#xff0c;聚焦于微博文本的分词处理、名词提取和主成分分析。该系统通过数据收集与预处理、分词及结构化文本分析&#xff0c;为舆情监测、话题分析和用户行为研究提供了一体化的解决方案。 主要功能包括&a…

lkhgjfjghkbhjk

&#x1f4e2;博客主页&#xff1a;https://blog.csdn.net/2301_779549673 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01; &#x1f4e2;本文由 JohnKi 原创&#xff0c;首发于 CSDN&#x1f649; &#x1f4e2;未来很长&#…

LED驱动电路

LED驱动电路简介 摘要&#xff1a; LED照明是今年来快速兴起发展的一种新型光源&#xff0c;它的许多良好特点使得它的应用面越来越广。LED的单向导电特性使人一般认为应该用直流驱动&#xff0c;但是对直流恒压和限流的装置在保证比较好的限流特性时&#xff0c;自身功耗是很…

路政通 | OPENAIGC开发者大赛高校组AI创新之星奖

在第二届拯救者杯OPENAIGC开发者大赛中&#xff0c;涌现出一批技术突出、创意卓越的作品。为了让这些优秀项目被更多人看到&#xff0c;我们特意开设了优秀作品报道专栏&#xff0c;旨在展示其独特之处和开发者的精彩故事。 无论您是技术专家还是爱好者&#xff0c;希望能带给…

深圳某局联想SR850服务器黄灯 不开机维修

深圳 福田区1台Lenovo Thinksystem SR850 四路服务器黄灯问题现场处理。 1&#xff1a;型号&#xff1a;联想SR850 机架式2U服务器 2&#xff1a;故障&#xff1a;能通电&#xff0c;开机按钮快闪&#xff0c;随后叹号警告灯常亮 3&#xff1a;用户自行折腾无果后找到我们tech …

QT客户端发送HTTP请求此时服务器到底收到了哪些数据?

一个Http请求包括 请求行 请求头 空行 请求体 下面是示例&#xff1a; 1,2,3,4分别代表上面的四个部分&#xff0c;我只是做了一些解析&#xff0c;具体可以结合代码 1. post / HTTP/1.1 2.GET请求头包括Host(主机名),user-agent&#xff08;客户端标识符&#xff09;&am…

C++类和对象(中)【下篇】

&#x1f31f;个人主页&#xff1a;落叶 &#x1f31f;当前专栏: C专栏 目录 赋值运算符重载 运算符重载 赋值运算符重载 日期类实现 运算符重载<和运算符重载 运算符重载进行复用 运算符重载< 运算符重载> 运算符重载> 运算符重载! 获取某年某月的天数…

解决方案 | 镭速助力动漫游戏行业突破跨网文件交换瓶颈

在数字化浪潮推动下&#xff0c;动漫游戏行业蓬勃发展。随着高清技术的普及和云游戏的兴起&#xff0c;动漫游戏行业对动画的画质要求越来越高&#xff0c;数据量呈现爆炸式增长。然而&#xff0c;行业内的跨网文件交换难题也日益凸显&#xff0c;成为制约行业发展的瓶颈。 行业…

pyqt瀑布流布局

最近研究瀑布流布局&#xff0c;发现都是收费的&#xff0c;所以只能自己写算法写布局。 所以啥都不说直接上代码 ImageLabel 参考 pyqt5 QLabel显示网络图片或qfluentwidgets官网 代码 import math import sys from pathlib import Pathfrom PyQt5.Qt import * from qflue…

erlang学习:Linux命令学习4

顺序控制语句学习 if&#xff0c;else对文件操作 判断一个文件夹是否存在&#xff0c;如果存在则进行删除&#xff0c;如果不存在则创建该文件夹&#xff0c;并复制一份该脚本后&#xff0c;删除该脚本 if [ -d "/erlangtest/testdir"]; then echo "删除文件夹…

JavaWeb--小白笔记07-2:超链接以及servlet对表单数据的完整处理

一.超链接 Html使用标签<a></a>来设置超链接&#xff0c;<a>有一个属性href"" 必须加进去&#xff0c;里面就是链接地址 注意&#xff1a;链接里必须包含https://前缀 <a></a>里面可以是一个字&#xff0c;一个词或者一副图...点击…

27 C 语言标准库 <stdio.h> 中的两个重要字符串函数:sprintf、sscanf

目录 1 sprintf 1.1 函数原型 1.2 功能说明 1.3 案例演示 1.4 注意事项 2 sscanf 2.1 函数原型 2.2 功能说明 2.3 案例演示 2.4 注意事项 1 sprintf 1.1 函数原型 sprintf 函数是 C 语言标准库中的一个函数&#xff0c;用于将格式化的数据写入字符串。其函数原型定义…

【软件测试】详解测试中常用的几种测试方法

目录 一、集成测试二、 系统测试三、验收测试四、回归测试 总结 一、集成测试 术语 集成测试是继组件测试之后的又一个层次。集成测试假定交给这个层次的测试对象已经经过了组件测试&#xff0c;并且任何组件内部的缺陷都已经尽可能地被纠正。 集成 开发人员、测试人员和专…

【裸机装机系列】14.kali(ubuntu)-linux装机在分区时采用manual手动形式该怎么做

推荐阅读&#xff1a; 1.kali(ubuntu)-为什么弃用ubuntu&#xff0c;而选择基于debian的kali操作系统 如果在装机的时候选则了manual手动模式&#xff0c;可以根据以下步骤一步步做: 1> 在“partition disks”这个地方选择了manual,也就是手动自己分区的方式 点击"c…