C++使用开源ConcurrentQueue库处理自定义业务数据类

ConcurrentQueue开源库介绍

ConcurrentQueue是一个高性能的、线程安全的并发队列库。它旨在提供高效、无锁的数据结构,适用于多线程环境中的数据交换。concurrentqueue 支持多个生产者和多个消费者,并且提供了多种配置选项来优化性能和内存使用。

ConcurrentQueue使用

0x01 使用场景说明

我的数据平台在接收到四种不同的业务数据时,需要按数据分类写进RocketMQ。

0x02 自定义类用来存放和区分数据流

  • 设计BusinessFlowMsg
  1. 该类有定义消息的类型
  2. 该类中设计ST_BusinessSign结构体消息头,用来区分消息和获取消息体的长度
  3. BusinessFlowMsg类可以存放的数据长度为512KB
#ifndef BUSINESSFLOWMSG_HPP
#define BUSINESSFLOWMSG_HPP#include <string.h>#define MSG_ROCKETMQ_PNG 0x01
#define MSG_ROCKETMQ_AIS 0x02
#define MSG_ROCKETMQ_ROUTE 0x03
#define MSG_ROCKETMQ_VOYAGE 0x04#pragma pack(push)
#pragma pack(1)// 消息头
typedef struct s_BusinessSign
{int sign; // 业务标识unsigned int length; // 消息体的长度
}ST_BusinessSign;#pragma pack(pop)class BusinessFlowMsg
{
public:BusinessFlowMsg() = default;~BusinessFlowMsg() = default;char* get_data(){return _data;}int data_size(){return businessSign.length;}char* get_body(){return _data + sizeof(ST_BusinessSign);}int body_size(){return data_size() - sizeof(ST_BusinessSign);}ST_BusinessSign* header(){return &businessSign;}bool set_data(const char* data, int length, int sign){if(length > (max_body_len + sizeof(ST_BusinessSign))){return false;}businessSign.sign = sign;businessSign.length = length;memcpy(_data + sizeof(ST_BusinessSign), data, length);return true;}private:enum{max_body_len = 512 * 1024 // 512KB};ST_BusinessSign businessSign;char _data[max_body_len];
};#endif // BUSINESSFLOWMSG_HPP

0x03 创建PngUnit类模拟接到不同的业务数据

  • PngUnit类型创建了四个线程来模拟不同的数据流。
  • PngUnit类多线程中并未使用互斥锁,因为ConcurrentQueue是一个线程安全的并发队列库,事实证明确实如此。
#ifndef PNGUNIT_H
#define PNGUNIT_H#include <thread>
#include <mutex>class PngUnit
{
public:PngUnit();~PngUnit() = default;void start();void sendPNG(int sign);void sendAIS(int sign);void sendRoute(int sign);void sendVoyage(int sign);private:std::thread m_th_png;std::thread m_th_ais;std::thread m_th_route;std::thread m_th_voyage;// std::mutex queue_mutex;  // 互斥锁
};#endif // PNGUNIT_H

#include "pngunit.h"
#include <unistd.h>
#include "rocketmqutils.h"
#include "BusinessFlowMsg.hpp"
#include "json11/json11.hpp"PngUnit::PngUnit()
{}void PngUnit::start()
{// m_th = std::thread([this](){//     sendPNG(100);// });if(m_th_png.joinable()){printf("[%s:%d] %s\n", __FILE__, __LINE__, "m_th_png is running");return;}m_th_png = std::thread(std::bind(&PngUnit::sendPNG, this, MSG_ROCKETMQ_PNG));m_th_ais= std::thread(std::bind(&PngUnit::sendAIS, this, MSG_ROCKETMQ_AIS));m_th_route= std::thread(std::bind(&PngUnit::sendRoute, this, MSG_ROCKETMQ_ROUTE));m_th_voyage= std::thread(std::bind(&PngUnit::sendVoyage, this, MSG_ROCKETMQ_VOYAGE));
}void PngUnit::sendPNG(int sign)
{while (true){BusinessFlowMsg pngMsg;const char* pngFile = "1234567890ABCDEF";int fileLen = strlen(pngFile) + 1;pngMsg.set_data(pngFile, fileLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(pngMsg)){printf("Failed to set PNG message data");}}sleep(1);}
}void PngUnit::sendAIS(int sign)
{json11::Json::object obj = {{"message", "AIS"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg aisMsg;int jsonStrLen = jsonStr.size();aisMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(aisMsg)){printf("Failed to set AIS message data");}}sleep(2);}
}void PngUnit::sendRoute(int sign)
{json11::Json::object obj = {{"message", "Route"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg routeMsg;int jsonStrLen = jsonStr.size();routeMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(routeMsg)){printf("Failed to set ROUTE message data");}}sleep(3);}
}void PngUnit::sendVoyage(int sign)
{json11::Json::object obj = {{"message", "Voyage"},{"response", "success"}};std::string jsonStr = json11::Json(obj).dump();while (true){BusinessFlowMsg voyageMsg;int jsonStrLen = jsonStr.size();voyageMsg.set_data(jsonStr.c_str(), jsonStrLen, sign);{// std::lock_guard<std::mutex> lock(queue_mutex);if(!RocketMQUtils::Instance()->g_businessQueue.enqueue(voyageMsg)){printf("Failed to set VOYAGE message data");}}sleep(4);}
}

0x04 创建RocketMQUtils类,在ConcurrentQueue队列中获取数据写进RocketMQ

  • RocketMQUtils类是一个单例类
#ifndef ROCKETMQUTILS_H
#define ROCKETMQUTILS_H#include <thread>
#include <concurrentqueue/moodycamel/concurrentqueue.h>
#include "BusinessFlowMsg.hpp"class RocketMQUtils
{
public:static RocketMQUtils* Instance();private:RocketMQUtils();~RocketMQUtils()=default;RocketMQUtils(const RocketMQUtils &) = delete;RocketMQUtils& operator=(const RocketMQUtils &) = delete;RocketMQUtils(RocketMQUtils &&) = delete;RocketMQUtils& operator=(RocketMQUtils &&) = delete;public:void start();void push();void poll();bool write(char *data, int len, int sign);public:moodycamel::ConcurrentQueue<BusinessFlowMsg> g_businessQueue;private:static RocketMQUtils* _instance;std::thread _pushThread;
};#endif // ROCKETMQUTILS_H

#include "rocketmqutils.h"
#include <unistd.h>RocketMQUtils * RocketMQUtils::_instance = nullptr;RocketMQUtils *RocketMQUtils::Instance()
{if(_instance == nullptr){_instance = new RocketMQUtils();}return _instance;
}RocketMQUtils::RocketMQUtils()
{}void RocketMQUtils::start()
{if(_pushThread.joinable()){return;}_pushThread = std::thread(&RocketMQUtils::push, this);
}void RocketMQUtils::push()
{while (true){BusinessFlowMsg busiMsg;if(g_businessQueue.try_dequeue(busiMsg)){write(busiMsg.get_body(), busiMsg.header()->length, busiMsg.header()->sign);}else{printf("[%s:%d] %s\n", __FILE__, __LINE__, "g_businessQueue is empty");sleep(2);}}
}void RocketMQUtils::poll()
{}bool RocketMQUtils::write(char *data, int len, int sign)
{std::string msg(data, len);if (sign == MSG_ROCKETMQ_PNG){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_AIS){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_ROUTE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else if (sign == MSG_ROCKETMQ_VOYAGE){printf("[%s:%d] [%d] %s\n", __FILE__, __LINE__, sign, msg.c_str());}else{printf("[%s:%d] data sign error\n", __FILE__, __LINE__);}
}

0x05 使用演示

#include <iostream>
#include <memory>
#include "rocketmqutils.h"
#include "pngunit.h"using namespace std;int main()
{cout << "==Start==" << endl;RocketMQUtils* rocketmq = RocketMQUtils::Instance();rocketmq->start();std::shared_ptr<PngUnit> ptrPngUnit = std::make_shared<PngUnit>();ptrPngUnit->start();getchar();cout << "==Over==" << endl;return 0;
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/10872.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

中仕公考:2025年省考可以开始准备了!

“各省公务员考试”&#xff0c;是选拔和招录公务员的一种重要方式。该考试由各省级主管部门统一安排&#xff0c;编制归属于各个省份。 考试时间 各省的考试时间有所不同&#xff0c;但通常省联考的时间一般安排在3-5月之间。 户籍限制 部分岗位对考生的户籍有限制&#x…

保姆级教程,免费短链平台

神行短链 开源代码: https://github.com/EASTCATV/openShortLink.git 保姆级教程,5分钟打造属于自己的短链 免费短链平台 免费使用 短链生成 免费使用 地址: short.godsdo.com short.godsdo.com 打包命令 sbt clean && sbt packagedocker run -d \ --name shot…

三十六、Python基础语法(JSON操作)

JSON&#xff08;JavaScript Object Notation&#xff09;是一种基于文本&#xff0c;轻量级的数据交换格式。它易于人阅读和编写&#xff0c;同时也易于机器解析和生成&#xff0c;在自动化测试中经常用来存放测试数据。 JSON的特点&#xff1a; 基于文本&#xff0c;不包含图…

linux基础-完结(详讲补充)

linux基础-完结 一、Linux目录介绍 二、基础命令详细讲解 1. ls&#xff08;列出目录内容&#xff09; 2. cd&#xff08;更改目录&#xff09; 3. clear&#xff08;清除终端屏幕&#xff09; 4. pwd(显示你当前所在的目录) 5. vim(文本编辑器) 6. touch&#xff08;创…

【SAP】关于权限的继承

关于权限的父role和子role的权限继承&#xff0c;既可以 从子role主动去父role那里“取”。从父role“推”到子role 我自己之前一直用的是方法1&#xff0c;但由于子role很多&#xff0c;一个一个手工维护花了不少时间。 后来得知有方法2&#xff0c;特此测试。 我准备了父R…

信息安全数学基础(46)域和Galois理论

域详述 定义&#xff1a; 域是一个包含加法、减法、乘法和除法&#xff08;除数不为零&#xff09;的代数结构&#xff0c;其中加法和乘法满足交换律、结合律&#xff0c;并且乘法对加法满足分配律。同时&#xff0c;域中的元素&#xff08;通常称为数&#xff09;在加法和乘法…

时序约束进阶五:Set_Max_Delay与Set_Min_Delay详解

目录 一、背景 二、Max/Min_delay约束 2.1 约束设置参数 2.2 约束说明 三、场景说明 3.1 路径分段 3.1.1 无效的约束对象 3.1.2 设计代码 3.2 有效的约束对象 3.3 datapath only 3.3.1 工程设计 3.3.2 datapath only报告 3.4 clock group约束优先级 3.4.1 MAX/MIN…

搭建实验仪器知识库:从产品手册到智慧资源的飞跃

在科研、教学及工业生产领域&#xff0c;实验仪器作为探索未知、验证理论、提升效率的重要工具&#xff0c;其重要性不言而喻。然而&#xff0c;随着技术的不断进步和仪器的日益复杂化&#xff0c;如何高效、准确地使用这些仪器成为了科研人员、技术人员及学生面临的共同挑战。…

OA项目 python + vue3

准备工作 创建django项目 在setting.py进行数据库的配置&#xff1a; DATABASES {default: {ENGINE: django.db.backends.mysql,NAME: , #数据库名字USER: , #连接的数据库的用户名PASSWORD: ,HOST: 127.0.0.1,PORT: 3306,} }安装app&#xff1a; rest_framwork: 关闭csrf…

婚礼纪 9.5.57 | 解锁plus权益的全能结婚助手,一键生成结婚请柬

婚礼纪是一款结婚服务全能助手&#xff0c;深受9000万新人信赖的一站式结婚服务平台。解锁plus权益后&#xff0c;用户可以享受部分VIP会员功能。应用提供了丰富的结婚筹备工具和服务&#xff0c;包括一键生成结婚请柬、婚礼策划、婚纱摄影、婚宴预订等。婚礼纪旨在为新人提供全…

树形结构数据

树形结构数据 树形结构数据是一种基础且强大的数据结构&#xff0c;广泛应用于计算机科学和软件开发的各个领域。它模拟了自然界中树的层级关系&#xff0c;通过节点和它们之间的连接来组织数据。在本文中&#xff0c;我们将深入探讨树形结构数据的概念、特点、类型以及它们在…

洛古---越狱问题【快速幂】

今天和大家讲一个洛古的算法题&#xff0c;我觉得还是比较有含金量的&#xff0c;今天给大家分享一下 题目描述 监狱有 &#x1d45b;n个房间&#xff0c;每个房间关押一个犯人&#xff0c;有 &#x1d45a; 种宗教&#xff0c;每个犯人会信仰其中一种。如果相邻房间的犯人的宗…

【论文笔记】Parameter-Efficient Transfer Learning for NLP

&#x1f34e;个人主页&#xff1a;小嗷犬的个人主页 &#x1f34a;个人网站&#xff1a;小嗷犬的技术小站 &#x1f96d;个人信条&#xff1a;为天地立心&#xff0c;为生民立命&#xff0c;为往圣继绝学&#xff0c;为万世开太平。 基本信息 标题: Parameter-Efficient Tran…

《PyTorch深度学习快速入门教程》学习笔记(第20周)

目录 摘要 Abstract 1. 池化层原理 2. 二维池化层 3. 二维最大池化 4. 填充、步幅与多个通道 5. 平均池化层 6. 理论总结 7. 池化层处理数据 8. 池化层处理图片 摘要 本周报的目的在于汇报《PyTorch深度学习快速入门教程》课程第六周的学习成果&#xff0c;主要聚焦于…

C# 实现对指定句柄的窗口进行键盘输入的实现

在C#中实现对指定句柄的窗口进行键盘操作&#xff0c;可以通过多种方式来实现。以下是一篇详细的指南&#xff0c;介绍如何在C#中实现这一功能。 1. 使用Windows API函数 在C#中&#xff0c;我们可以通过P/Invoke调用Windows API来实现对指定窗口的键盘操作。以下是一些关键的…

c语言简单编程练习10

1、typedef和#define的区别 在用作数据类型替换时的区别&#xff1a; #include <stdio.h> #include <unistd.h>typedef char * A; //typedef需要&#xff1b; #define B char *int main(int argc, char *argv[]) {A a,b;B c,d;printf("a_size%ld\n"…

题目讲解15 合并两个排序的链表

原题链接&#xff1a; 合并两个排序的链表_牛客题霸_牛客网 思路分析&#xff1a; 第一步&#xff1a;写一个链表尾插数据的方法。 typedef struct ListNode ListNode;//申请结点 ListNode* BuyNode(int x) {ListNode* node (ListNode*)malloc(sizeof(ListNode));node->…

【freertos】FreeRTOS任务管理

FreeRTOS任务管理 一、任务的创建和删除1、函数xTaskCreate2、函数xTaskCreateStatic3、函数xTaskCreateRestricted4、函数vTaskDelete 二、任务的挂起和恢复1、函数vTaskSuspend2、函数vTaskResume3、函数vTaskResumeFromISR4、函数vTaskSuspendAll5、函数xTaskResumeAll 三、…

FreeRTOS 20:互斥量(互斥信号量)操作

互斥信号量其实就是一个拥有优先级继承的二值信号量&#xff0c;在同步的应用中&#xff08;任务与任务或中断与任务之间的同步&#xff09;二值信号量最适合。互斥信号量适合用于那些需要互斥访问的应用中。在互斥访问中互斥信号量相当于一把钥匙&#xff0c; 当任务想要访问共…

MongoDB笔记03-MongoDB索引

文章目录 一、前言1.1 概述1.2 MongoDB索引使用B-Tree还是BTree&#xff1f;1.3 B 树和 B 树的对比1.4 总结 二、索引的类型2.1 单字段索引2.2 复合索引2.3 其他索引 三、索引的管理操作3.1 索引的查看3.2 索引的创建3.2.1 单字段索引3.2.2 复合索引 3.3 索引的移除3.3.1 指定索…