文章目录
- 0. 概述
- 1 流程图
- 2. 代码解析
- 2.1 时间跳变检测
- 2.2 获取当前时间的字符串表示
- 2.3 线程优先级设置
- 2.4 日志轮转
- 3. 编译与运行
- 3.1 编译
- 3.2 运行
- 3.3 示例日志文件
- 4. 完整代码
0. 概述
之前写过单线程版本的高精度时间日志记录小程序:C++编程:实现简单的高精度时间日志记录小程序(单线程)
本文将介绍多线程版本高精度时间日志记录小程序,该程序具备以下功能:
- 自定义时间间隔和文件名:通过命令行参数设置时间间隔和输出文件名,默认值为20毫秒和
timestamps.txt
。 - 高精度定时:采用
std::chrono::steady_clock
和高精度睡眠函数clock_nanosleep
确保时间间隔的准确性,避免std::this_thread::sleep_for
带来的时间漂移。 - 线程安全队列:使用线程安全的队列缓存时间戳,减少频繁的文件I/O操作。
- 日志轮转:当日志文件大小超过50MB时,自动进行日志轮转。
- 时间跳变检测:检测日志中是否存在时间跳变(即后续时间戳小于前一个时间戳的情况),并使用
[TIME_JUMP]
标记异常事件。默认仅记录跳变前后的10个时间戳,避免日志文件中充斥过多正常的时间记录。
程序采用生产者-消费者模式,通过两个独立的线程处理时间记录和日志写入,确保高效且线程安全的操作。
- 消费者线程(Consumer Thread):负责从
LogQueue
中取出日志消息并写入文件,同时监控文件大小以执行日志轮转。 - 检测线程(Detection Thread):负责从
TimestampQueue
中取出时间戳,检测是否存在时间跳变,并将相关信息推送到LogQueue
。
1 流程图
2. 代码解析
2.1 时间跳变检测
检测时间跳变(即后续时间戳小于前一个时间戳)是为了识别系统时间调整或其他异常情况。检测到跳变后,程序会在日志中添加[TIME_JUMP]
标记,并记录跳变前后的时间戳。
// Detection thread function to handle time jump detection
void detectionFunction(TimestampQueue ×tampQueue, LogQueue &logQueue,int intervalMs, bool selectiveLogging) {constexpr const char *threadName = "detection_t";pthread_setname_np(pthread_self(), threadName);int newPriority = increaseThreadPriority(0);fprintf(stdout, "%s thread priority: %d\n", threadName, newPriority);uint64_t lastTimestampUs = 0;uint64_t secondLastTimestampUs = 0;bool inJumpMode = false;int jumpRemaining = 0;std::deque<uint64_t> preJumpTimestamps;const size_t preJumpBufferSize = 10;const int jumpRecordCount = 10;while (true) {uint64_t currentTimestampUs;if (timestampQueue.pop(currentTimestampUs)) {bool timeJump = false;if (lastTimestampUs != 0 && secondLastTimestampUs != 0 &&selectiveLogging) {if (currentTimestampUs < lastTimestampUs) {// Time regression detectedtimeJump = true;} else {// Check if interval is unexpectedly longuint64_t expectedIntervalUs =static_cast<uint64_t>(intervalMs) * 1000;uint64_t actualIntervalUs =currentTimestampUs - secondLastTimestampUs;uint64_t thresholdUs =static_cast<uint64_t>(expectedIntervalUs * 1.5);if (actualIntervalUs > thresholdUs) { // 检测实际间隔是否超过阈值// Print threshold and actual interval for debuggingfprintf(stderr,"thresholdUs: %" PRIu64 ", actualIntervalUs: %" PRIu64 "\n",thresholdUs, actualIntervalUs);timeJump = true;}}}// Update timestampssecondLastTimestampUs = lastTimestampUs;lastTimestampUs = currentTimestampUs;// Update pre-jump bufferif (preJumpTimestamps.size() >= preJumpBufferSize) {preJumpTimestamps.pop_front();}preJumpTimestamps.push_back(currentTimestampUs);if (selectiveLogging && timeJump && !inJumpMode) {// Time jump detected, enter jump modeinJumpMode = true;jumpRemaining = jumpRecordCount;// Prepare jump messageLogMessage jumpMsg;jumpMsg.type = LogMessage::Type::TIME_JUMP;jumpMsg.preJumpTimestamps.assign(preJumpTimestamps.begin(),preJumpTimestamps.end());jumpMsg.timestampUs = currentTimestampUs;logQueue.push(jumpMsg);} else if (inJumpMode) {// In jump mode, log subsequent timestampsLogMessage tsMsg;tsMsg.type = LogMessage::Type::TIMESTAMP;tsMsg.timestampUs = currentTimestampUs;logQueue.push(tsMsg);jumpRemaining--;if (jumpRemaining <= 0) {inJumpMode = false;}}} else if (timestampQueue.shouldExit()) {fprintf(stdout, "Detection thread exiting\n");break;}}
}
2.2 获取当前时间的字符串表示
使用std::chrono::steady_clock
获取当前时间,并将其格式化为字符串,精确到微秒。
// Function to format timestamp in microseconds to string
std::string formatTimestamp(uint64_t timestampUs) {time_t seconds = timestampUs / 1000000;suseconds_t microseconds = timestampUs % 1000000;struct tm tmInfo;localtime_r(&seconds, &tmInfo);char buffer[64];strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tmInfo);std::ostringstream oss;oss << buffer << "." << std::setw(6) << std::setfill('0') << microseconds;return oss.str();
}
2.3 线程优先级设置
为了提高程序的响应性和时间记录的准确性,程序尝试设置线程的实时调度策略和优先级。然而,这需要超级用户权限,且设置不当可能导致系统不稳定。因此,建议以root
权限运行程序,并谨慎调整优先级。
// Function to set thread priority (placeholder, actual implementation depends on requirements)
int increaseThreadPriority(int val = 0) {// 这里可以实现设置线程优先级的逻辑// 目前为占位函数,返回默认优先级return 0;
}
注意:当前increaseThreadPriority
函数为占位函数,实际实现需根据具体需求和系统权限进行调整。
2.4 日志轮转
当日志文件大小超过50MB时,程序自动关闭当前日志文件,并以时间戳为后缀重命名旧日志文件,然后创建新的日志文件。
// Rotate log file if necessary
size_t fileSize = getFileSize(filename);
if (fileSize >= maxSize) {// 获取当前时间字符串用于重命名std::string timestamp = formatTimestamp(message.timestampUs);std::replace(timestamp.begin(), timestamp.end(), ' ', '_');std::replace(timestamp.begin(), timestamp.end(), ':', '-');std::replace(timestamp.begin(), timestamp.end(), '.', '-');std::string newFilename = filename + "." + timestamp;// Close current log fileif (fclose(logFile) != 0) {fprintf(stderr, "Failed to close log file before rotation: %s\n",strerror(errno));continue;}// Rename the log fileif (rename(filename.c_str(), newFilename.c_str()) != 0) {fprintf(stderr, "Failed to rotate log file: %s\n", strerror(errno));// Attempt to reopen the log file even if rename failed} else {fprintf(stdout, "Rotated log file to %s\n", newFilename.c_str());}// Open a new log filelogFile = fopen(filename.c_str(), "a");if (!logFile) {fprintf(stderr, "Failed to open new log file after rotation: %s\n",strerror(errno));return;}
}
3. 编译与运行
3.1 编译
g++ -std=c++11 -pthread -o time_jump_check_multi time_jump_check_multi.cpp -O2 -g
3.2 运行
由于程序涉及到线程优先级的设置,可能需要超级用户权限。建议使用sudo
运行程序,以确保程序能够成功设置线程的实时调度策略。
-
使用默认设置(20毫秒间隔,输出文件为
timestamps.txt
,运行2小时):sudo ./time_jump_check_multi
-
自定义设置(例如,100毫秒间隔,输出文件为
output.txt
,运行1小时):sudo ./time_jump_check_multi -i 100 -f output.txt -t 3600
参数说明:
-i, --interval <milliseconds>
:设置时间间隔,单位为毫秒,默认值为20毫秒。-f, --file <filename>
:设置输出文件名,默认值为timestamps.txt
。-t, --time <seconds>
:设置运行时间,单位为秒,默认值为7200秒(2小时)。--disable_selective_logging
:禁用选择性日志记录功能。-h, --help
:显示帮助信息。
3.3 示例日志文件
以下是日志文件timestamps.txt
的一部分示例内容,其中标记了时间跳变:
2024-04-27 12:34:56.789123
2024-04-27 12:34:56.809456
2024-04-27 12:34:56.829789
2024-04-27 12:34:56.749012 [TIME_JUMP]
2024-04-27 12:34:56.769345
2024-04-27 12:34:56.789678
...
在上述示例中,第四行记录了一个时间跳变事件,标记为[TIME_JUMP]
,并记录了跳变前后的时间戳。
4. 完整代码
// g++ -o time_jump_check_multi time_jump_check_multi.cpp -O2 -g -pthread -std=c++11
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <deque>
#include <errno.h>
#include <inttypes.h>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <sstream>
#include <string>
#include <sys/stat.h>
#include <thread>
#include <time.h>
#include <unistd.h>
#include <vector>const int DEFAULT_INTERVAL_MS = 20; // Time interval in milliseconds
const char *DEFAULT_FILENAME = "timestamps.txt"; // Output filename
const int DEFAULT_RUN_TIME_SECONDS = 7200; // Run time in seconds
const size_t MAX_FILE_SIZE = 50 * 1024 * 1024; // 50MB
const size_t PRE_JUMP_BUFFER_SIZE = 10;
const int JUMP_RECORD_COUNT = 10;
const int LOG_EVERY_N = 1000;// Struct to store command-line arguments
struct Arguments {int intervalMs = DEFAULT_INTERVAL_MS; // Time interval in millisecondsstd::string filename = DEFAULT_FILENAME; // Output filenameint runTimeSeconds = DEFAULT_RUN_TIME_SECONDS; // Run time in secondsbool disableSelectiveLogging = false; // Flag to disable selective logging
};// The function is used to obtain the current thread priority and improve the
// level
int increaseThreadPriority(int val = 0) {pthread_t currentThread = pthread_self();sched_param schedParam;int policy;// Get the scheduling strategy and parameters of the current threadif (pthread_getschedparam(currentThread, &policy, &schedParam) != 0) {fprintf(stderr, "Error: pthread_getschedparam failed: %s\n",strerror(errno));exit(EXIT_FAILURE);}int currentPriority = schedParam.sched_priority;// If the current scheduling strategy is not a real -time strategy, try to// change it to SCHED_RRif (policy != SCHED_RR && policy != SCHED_FIFO) {policy = SCHED_RR; // Select SCHED_RR as a new scheduling strategyschedParam.sched_priority =sched_get_priority_min(policy); // Set to minimum priorityif (pthread_setschedparam(currentThread, policy, &schedParam) != 0) {fprintf(stderr, "Error: pthread_setschedparam to SCHED_RR failed: %s\n",strerror(errno));exit(EXIT_FAILURE);}}// Get the updated scheduling strategy and parametersif (pthread_getschedparam(currentThread, &policy, &schedParam) != 0) {fprintf(stderr, "Error: pthread_getschedparam failed: %s\n",strerror(errno));exit(EXIT_FAILURE);}currentPriority = schedParam.sched_priority;// Get the priority range under the current scheduling strategyint minPriority = sched_get_priority_min(policy);int maxPriority = sched_get_priority_max(policy);if (minPriority == -1 || maxPriority == -1) {fprintf(stderr, "Error: sched_get_priority_min/max failed: %s\n",strerror(errno));exit(EXIT_FAILURE);}// Calculate the new priority and increase the Val levelint newPriority = currentPriority + val;// Ensure that the new priority is within the effective rangeif (newPriority > maxPriority) {newPriority = maxPriority;} else if (newPriority < minPriority) {newPriority = minPriority;}// Set new priorityschedParam.sched_priority = newPriority;if (pthread_setschedparam(currentThread, policy, &schedParam) != 0) {fprintf(stderr, "Error: pthread_setschedparam failed: %s\n",strerror(errno));exit(EXIT_FAILURE);}return newPriority;
}// Function to parse command-line arguments
Arguments parseArguments(int argc, char *argv[]) {Arguments args;for (int i = 1; i < argc; ++i) {std::string arg = argv[i];if ((arg == "-i" || arg == "--interval") && i + 1 < argc) {args.intervalMs = std::atoi(argv[++i]);if (args.intervalMs <= 0) {fprintf(stderr,"Invalid interval. Using default value of %d milliseconds.\n",args.intervalMs);args.intervalMs = 20;}} else if ((arg == "-f" || arg == "--file") && i + 1 < argc) {args.filename = argv[++i];} else if ((arg == "-t" || arg == "--time") && i + 1 < argc) {args.runTimeSeconds = std::atoi(argv[++i]);if (args.runTimeSeconds <= 0) {fprintf(stderr,"Invalid run time. Using default value of %d seconds.\n",args.runTimeSeconds);args.runTimeSeconds = DEFAULT_RUN_TIME_SECONDS;}} else if (arg == "--disable_selective_logging") {args.disableSelectiveLogging = true;} else if (arg == "-h" || arg == "--help") {fprintf(stdout, "Usage: %s [options]\n", argv[0]);fprintf(stdout, "Options:\n");fprintf(stdout," -i, --interval <milliseconds> Set the time interval, ""default is %d milliseconds\n",args.intervalMs);fprintf(stdout," -f, --file <filename> Set the output filename, ""default is %s\n",args.filename.c_str());fprintf(stdout," -t, --time <seconds> Set the run time in seconds, ""default is %d seconds (2 hours)\n",args.runTimeSeconds);fprintf(stdout, " --disable_selective_logging Disable selective ""logging feature\n");fprintf(stdout," -h, --help Show this help message\n");exit(0);} else {fprintf(stderr, "Unknown argument: %s\n", arg.c_str());fprintf(stderr, "Use -h or --help to see available options.\n");exit(EXIT_FAILURE);}}return args;
}// Struct to represent different types of log messages
struct LogMessage {enum class Type { TIMESTAMP, TIME_JUMP } type;uint64_t timestampUs; // Valid for TIMESTAMP and TIME_JUMPstd::vector<uint64_t> preJumpTimestamps; // Valid for TIME_JUMP
};// Thread-safe queue for log messages
class LogQueue {
public:void push(const LogMessage &message) {std::unique_lock<std::mutex> lock(mutex_);queue_.push_back(message);condVar_.notify_one();}bool pop(LogMessage &message) {std::unique_lock<std::mutex> lock(mutex_);while (queue_.empty() && !exitFlag_) {condVar_.wait(lock);}if (!queue_.empty()) {message = queue_.front();queue_.pop_front();return true;}return false;}void setExit() {std::unique_lock<std::mutex> lock(mutex_);exitFlag_ = true;condVar_.notify_all();}bool shouldExit() const { return exitFlag_; }private:std::deque<LogMessage> queue_;mutable std::mutex mutex_;std::condition_variable condVar_;bool exitFlag_ = false;
};// Thread-safe queue for timestamps
class TimestampQueue {
public:void push(uint64_t timestampUs) {std::unique_lock<std::mutex> lock(mutex_);queue_.push_back(timestampUs);condVar_.notify_one();}bool pop(uint64_t ×tampUs) {std::unique_lock<std::mutex> lock(mutex_);while (queue_.empty() && !exitFlag_) {condVar_.wait(lock);}if (!queue_.empty()) {timestampUs = queue_.front();queue_.pop_front();return true;}return false;}void setExit() {std::unique_lock<std::mutex> lock(mutex_);exitFlag_ = true;condVar_.notify_all();}bool shouldExit() const { return exitFlag_; }private:std::deque<uint64_t> queue_;mutable std::mutex mutex_;std::condition_variable condVar_;bool exitFlag_ = false;
};// Function to format timestamp in microseconds to string
std::string formatTimestamp(uint64_t timestampUs) {time_t seconds = timestampUs / 1000000;suseconds_t microseconds = timestampUs % 1000000;struct tm tmInfo;localtime_r(&seconds, &tmInfo);char buffer[64];strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tmInfo);std::ostringstream oss;oss << buffer << "." << std::setw(6) << std::setfill('0') << microseconds;return oss.str();
}// Function to get the current timestamp in microseconds since epoch using
// system_clock
uint64_t getSystemTimestampUs() {auto now = std::chrono::system_clock::now();auto epoch = now.time_since_epoch();return static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::microseconds>(epoch).count());
}// Consumer thread function to handle log writing and rotation
void consumerFunction(LogQueue &logQueue, const std::string &filename,size_t maxSize) {constexpr const char *threadName = "consumer_t";pthread_setname_np(pthread_self(), threadName);int newPriority = increaseThreadPriority(0);fprintf(stdout, "%s thread priority: %d\n", threadName, newPriority);FILE *logFile = fopen(filename.c_str(), "a");if (!logFile) {fprintf(stderr, "Failed to open log file for appending: %s\n",filename.c_str());return;}while (true) {LogMessage message;if (logQueue.pop(message)) {if (message.type == LogMessage::Type::TIME_JUMP) {// Write jump headerfprintf(logFile, "\n--- TIME JUMP DETECTED ---\n");// Write pre-jump timestampsfor (const auto &ts : message.preJumpTimestamps) {std::string tsStr = formatTimestamp(ts);fprintf(logFile, "%s\n", tsStr.c_str());}// Write current jump timestamp with tagstd::string jumpStr =formatTimestamp(message.timestampUs) + " [TIME_JUMP]";fprintf(logFile, "%s\n", jumpStr.c_str());} else if (message.type == LogMessage::Type::TIMESTAMP) {// Write regular timestampstd::string tsStr = formatTimestamp(message.timestampUs);fprintf(logFile, "%s\n", tsStr.c_str());}fflush(logFile);// Rotate log file if necessarystruct stat statBuf;if (stat(filename.c_str(), &statBuf) != 0) {fprintf(stderr, "Failed to get file status for %s: %s\n",filename.c_str(), strerror(errno));continue;}if (static_cast<size_t>(statBuf.st_size) >= maxSize) {std::string newFilename = filename + ".old";// Close current log fileif (fclose(logFile) != 0) {fprintf(stderr, "Failed to close log file before rotation: %s\n",strerror(errno));continue;}// Rename the log fileif (rename(filename.c_str(), newFilename.c_str()) != 0) {fprintf(stderr, "Failed to rotate log file: %s\n", strerror(errno));// Attempt to reopen the log file even if rename failed} else {fprintf(stdout, "Rotated log file to %s\n", newFilename.c_str());}// Open a new log filelogFile = fopen(filename.c_str(), "a");if (!logFile) {fprintf(stderr, "Failed to open new log file after rotation: %s\n",strerror(errno));return;}}} else if (logQueue.shouldExit()) {break;}}// Close log fileif (fclose(logFile) != 0) {fprintf(stderr, "Failed to close log file: %s\n", strerror(errno));}
}// Detection thread function to handle time jump detection
void detectionFunction(TimestampQueue ×tampQueue, LogQueue &logQueue,int intervalMs, bool selectiveLogging) {constexpr const char *threadName = "detection_t";pthread_setname_np(pthread_self(), threadName);int newPriority = increaseThreadPriority(0);fprintf(stdout, "%s thread priority: %d\n", threadName, newPriority);uint64_t lastTimestampUs = 0;uint64_t secondLastTimestampUs = 0;uint64_t totalTimestamps = 0;bool inJumpMode = false;int jumpRemaining = 0;std::deque<uint64_t> preJumpTimestamps;while (true) {uint64_t currentTimestampUs;if (timestampQueue.pop(currentTimestampUs)) {bool timeJump = false;if (lastTimestampUs != 0 && secondLastTimestampUs != 0 &&selectiveLogging) {if (currentTimestampUs < lastTimestampUs) {// Time regression detectedtimeJump = true;} else {// Check if interval is unexpectedly longuint64_t expectedIntervalUs =static_cast<uint64_t>(intervalMs) * 1000;uint64_t actualIntervalUs =currentTimestampUs - secondLastTimestampUs;uint64_t thresholdUs =static_cast<uint64_t>(expectedIntervalUs * 1.5);if (actualIntervalUs < thresholdUs) {// Print threshold and actual interval for debuggingfprintf(stderr,"thresholdUs: %" PRIu64 ", actualIntervalUs: %" PRIu64 "\n",thresholdUs, actualIntervalUs);timeJump = true;}}}// Update timestampssecondLastTimestampUs = lastTimestampUs;lastTimestampUs = currentTimestampUs;// Update pre-jump bufferif (preJumpTimestamps.size() >= PRE_JUMP_BUFFER_SIZE) {preJumpTimestamps.pop_front();}preJumpTimestamps.push_back(currentTimestampUs);if (selectiveLogging && timeJump && !inJumpMode) {// Time jump detected, enter jump modeinJumpMode = true;jumpRemaining = JUMP_RECORD_COUNT;// Prepare jump messageLogMessage jumpMsg;jumpMsg.type = LogMessage::Type::TIME_JUMP;jumpMsg.preJumpTimestamps.assign(preJumpTimestamps.begin(),preJumpTimestamps.end());jumpMsg.timestampUs = currentTimestampUs;logQueue.push(jumpMsg);} else if (inJumpMode) {// In jump mode, log subsequent timestampsLogMessage tsMsg;tsMsg.type = LogMessage::Type::TIMESTAMP;tsMsg.timestampUs = currentTimestampUs;logQueue.push(tsMsg);jumpRemaining--;if (jumpRemaining <= 0) {inJumpMode = false;}}} else if (timestampQueue.shouldExit()) {fprintf(stdout, "Detection thread exiting\n");break;}// Regular logging: log every 1000 timestampstotalTimestamps++;if (totalTimestamps % LOG_EVERY_N == 0) {LogMessage tsMsg;tsMsg.type = LogMessage::Type::TIMESTAMP;tsMsg.timestampUs = currentTimestampUs;logQueue.push(tsMsg);}}
}int main(int argc, char *argv[]) {// Parse command-line argumentsArguments args = parseArguments(argc, argv);// Define interval durationauto intervalDuration = std::chrono::milliseconds(args.intervalMs);std::string filename = args.filename;int runTime = args.runTimeSeconds;bool selectiveLogging = !args.disableSelectiveLogging;// Output parameter informationfprintf(stdout, "Time Interval: %d milliseconds\n", args.intervalMs);fprintf(stdout, "Output File: %s\n", filename.c_str());fprintf(stdout, "Run Time: %d seconds\n", runTime);fprintf(stdout, "Selective Logging: %s\n",(selectiveLogging ? "Enabled" : "Disabled"));// Initialize queuesLogQueue logQueue;TimestampQueue timestampQueue;// Start consumer threadstd::thread consumerThread(consumerFunction, std::ref(logQueue), filename,MAX_FILE_SIZE);// Start detection thread// The detection thread handles time jump detectionstd::thread detectionThread(detectionFunction, std::ref(timestampQueue),std::ref(logQueue), args.intervalMs,selectiveLogging);constexpr const char *threadName = "main_t";int newPriority = increaseThreadPriority(1);fprintf(stdout, "%s thread priority: %d\n", threadName, newPriority);// Get program start timeauto endTime = std::chrono::steady_clock::now() +std::chrono::seconds(runTime); // 2 hoursstruct timespec req;req.tv_sec = 0; // sreq.tv_nsec = 1000000 * args.intervalMs; // ns// Main loop: record timestamps at intervals and push to timestampQueuewhile (std::chrono::steady_clock::now() < endTime) {// use CLOCK_MONOTONIC clockclock_nanosleep(CLOCK_MONOTONIC, 0, &req, NULL);// Get current timestamp in microsecondsuint64_t currentTimestampUs = getSystemTimestampUs();// Push timestamp to the timestamp queuetimestampQueue.push(currentTimestampUs);}// Signal the detection thread to exittimestampQueue.setExit();if (detectionThread.joinable()) {detectionThread.join();}// Signal the consumer thread to exitlogQueue.setExit();if (consumerThread.joinable()) {consumerThread.join();}// Print termination messagefprintf(stdout, "Program has ended.\n");return 0;
}