C++编程:ZeroMQ进程间(订阅-发布)通信配置优化

文章目录

    • 0. 概述
    • 1. 发布者同步发送(pub)与订阅者异步接收(sub)
      • 示例代码
      • 相比于传统的**直接阻塞式订阅者(sub)**
      • 可能的副作用:
    • 2. 使用`poll`替代异步接收
      • 示例代码
      • 可能的副作用:
    • 3. 适度增加缓存和队列
      • 示例代码
      • 可能的副作用:
    • 4. 动态的IPC通道管理
      • 示例代码
      • 可能的副作用:
    • 5. 接收消息的超时设置
      • 示例代码
      • 可能的副作用:
    • 6. 增加I/O线程数量
      • 示例代码
      • 可能的副作用:
    • 7. 异步消息发送(使用`dontwait`标志)
      • 示例代码
      • 可能的副作用:
    • 8. 其他可以考虑的优化项
      • 8.1 立即发送(ZMQ_IMMEDIATE)
        • 示例代码
        • 可能的副作用:
      • 8.2 消息压缩(ZMQ_CONFLATE)
        • 示例代码
        • 可能的副作用:
    • 结论

0. 概述

ZeroMQ是一款高性能的进程间通信(IPC)中间件,广泛应用于分布式系统和实时通信中。本文将介绍几种优化ZeroMQ订阅-发布通信的常用方法,并通过代码实例详细展示这些方法的应用。

1. 发布者同步发送(pub)与订阅者异步接收(sub)

使用发布者同步发送和订阅者异步接收是一种高效的通信模式,适合高吞吐量和实时性要求高的系统。发布者同步发送确保消息可靠传输,而订阅者异步接收提高了系统的处理效率。

示例代码

同步发送:

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("ipc:///tmp/pub");// 同步发送消息,确保消息已成功加入队列
zmq::message_t message(data, data_size);
publisher.send(message, zmq::send_flags::none);

异步接收:

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("ipc:///tmp/pub");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);// 非阻塞接收
zmq::message_t message;
if (!subscriber.recv(message, zmq::recv_flags::dontwait)) {// 接收失败后,记录日志并进行阻塞重试std::cerr << "异步接收失败,进行阻塞重试..." << std::endl;if (subscriber.recv(message)) {std::cout << "阻塞重试成功接收到消息。" << std::endl;}
}

相比于传统的直接阻塞式订阅者(sub)

相比于传统的直接阻塞式订阅者(sub),使用发布者同步发送和订阅者异步接收的模式在性能和系统资源利用方面具有显著的优势:

  • 提高系统效率:异步接收模式允许订阅者在没有可用消息时继续执行其他任务,而不会因等待消息而浪费资源。相比之下,直接阻塞式订阅者会导致订阅者线程在没有新消息时长时间空转,可能导致系统资源被不必要地占用。

  • 避免订阅者卡顿:在负载较重或消息频繁的系统中,异步接收可以避免订阅者由于处理消息积压而被阻塞。这种模式允许订阅者更灵活地响应系统负载变化,而直接阻塞式订阅可能因为等待消息而导致订阅者线程被完全锁定。

  • 更好的系统响应性:异步接收允许订阅者在有消息时快速响应,而在没有消息时不会浪费CPU资源。阻塞式订阅则会在等待消息期间锁住CPU资源,降低整体系统的响应速度。

  1. 优化资源利用率:异步接收减少了系统的CPU消耗,尤其是在多任务处理的场景中,资源调度更加高效。而阻塞式接收可能导致CPU资源被长时间占用,无法处理其他任务。

通过异步接收,订阅者可以实现更加灵活高效的消息处理,在应对高并发和频繁消息传递的场景中表现得更加出色。

可能的副作用:

  • 同步发送:可能导致发送延迟,尤其是在消息传递过程繁忙时。
  • 异步接收:由于消息接收失败,可能导致某些消息被忽略。

2. 使用poll替代异步接收

在某些场景中,使用zmq::poll来替代纯异步接收是一种更高效的做法。poll方法允许我们监听多个socket,并在有消息时触发接收操作,从而提高资源利用率。

示例代码

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("ipc:///tmp/pub");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);// 设置poll项
zmq::pollitem_t items[] = {{ static_cast<void*>(subscriber), 0, ZMQ_POLLIN, 0 }
};while (true) {zmq::poll(items, 1, -1); // 无限等待直到有事件if (items[0].revents & ZMQ_POLLIN) {zmq::message_t message;subscriber.recv(message);std::cout << "接收到的消息: " << message.to_string() << std::endl;}
}

可能的副作用:

  • poll的等待时间不当设置可能导致过度占用CPU资源,尤其是当等待时间过短时。

3. 适度增加缓存和队列

通过调整发送和接收的高水位标记,可以减少在高负载下的消息丢失情况。合适的缓存设置确保系统在消息处理上更加稳定。

示例代码

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("ipc:///tmp/pub");int sndhwm = 10000; // 发送高水位标记
int rcvhwm = 10000; // 接收高水位标记
publisher.setsockopt(ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm));
publisher.setsockopt(ZMQ_RCVHWM, &rcvhwm, sizeof(rcvhwm));

可能的副作用:

  • 增加水位标记将占用更多内存,可能在资源紧张的环境中导致性能下降。

4. 动态的IPC通道管理

为每个Topic动态创建独立的IPC通道,能够有效提高消息隔离性,减少不同Topic之间的干扰。这适用于多主题并发发布的场景。

示例代码

zmq::context_t context(1);
std::vector<zmq::socket_t> publishers;for (int i = 0; i < num_topics; ++i) {zmq::socket_t pub(context, ZMQ_PUB);std::string ipc_address = "ipc:///tmp/topic" + std::to_string(i) + "_ipc";pub.bind(ipc_address);publishers.push_back(std::move(pub));
}

可能的副作用:

  • 管理多个IPC通道增加了系统的复杂度,每个IPC通道也会消耗额外的系统资源。

5. 接收消息的超时设置

通过设置接收消息的超时时间,避免订阅者长时间阻塞于消息接收操作,从而提升系统的响应性。

示例代码

zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("ipc:///tmp/pub");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);  // 订阅所有消息int timeout = 5000; // 5秒超时
subscriber.setsockopt(ZMQ_RCVTIMEO, &timeout, sizeof(timeout));zmq::message_t message;
if (!subscriber.recv(message)) {std::cerr << "接收超时,未接收到消息。" << std::endl;
}

可能的副作用:

  • 超时设置过短时,可能会因为网络延迟或负载增加而丢失一些消息。

6. 增加I/O线程数量

通过增加I/O线程数量,ZeroMQ可以在多核CPU的环境下提升并发处理能力。更多的I/O线程能帮助系统处理大量的消息。

示例代码

zmq::context_t context(4); // 使用4个I/O线程
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("ipc:///tmp/pub");

可能的副作用:

  • 增加线程数量会占用更多的CPU资源,在有限资源的环境下,其他任务的响应时间可能会受到影响。

7. 异步消息发送(使用dontwait标志)

通过dontwait标志进行异步消息发送,发布者可以避免在消息队列满时阻塞操作。这在高频率发送的场景中尤为适用。

示例代码

zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("ipc:///tmp/pub");zmq::message_t message(data, data_size);
bool sent = publisher.send(message, zmq::send_flags::dontwait);
if (!sent) {std::cerr << "异步发送失败。" << std::endl;
}

可能的副作用:

  • 如果队列满了,消息将无法发送并可能丢失,导致重要数据的丢失。

8. 其他可以考虑的优化项

8.1 立即发送(ZMQ_IMMEDIATE)

立即发送确保在接收方连接未完全建立时,也能立刻传输消息,适用于对响应时间要求极高的场景。

示例代码
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.setsockopt(ZMQ_IMMEDIATE, 1);
publisher.bind("ipc:///tmp/pub");zmq::message_t message(data, data_size);
publisher.send(message, zmq::send_flags::none);
可能的副作用:
  • 如果接收方连接不稳定,消息可能会被丢弃。

8.2 消息压缩(ZMQ_CONFLATE)

只保留最新的消息,适用于只关心最新状态更新的场景,特别是状态类数据更新的应用中。

示例代码
zmq::socket_t subscriber(context, ZMQ_SUB);
subscriber.connect("ipc:///tmp/pub");
subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber.setsockopt(ZMQ_CONFLATE, 1);zmq::message_t message;
while (subscriber.recv(message)) {// 处理最新的消息
}
可能的副作用:
  • 旧消息将被丢弃,对于需要完整历史记录的场景,这不是一个合适的选项。

结论

通过合理配置ZeroMQ,可以显著提高IPC通信的性能和效率。发布者同步发送与订阅者异步接收的组合提供了可靠性和效率,poll机制则提供了一种高效的消息处理方式。其他诸如增加I/O线程、设置超时、异步消息发送等优化措施,适用于不同的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1527648.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

基于R语言的统计分析基础:操作XML文件与YAML文件

XML文件简介 在处理从文本文件中读取数据的任务时&#xff0c;用户承担着至关重要的责任&#xff0c;即需要充分理解和明确指定在创建该文件时所遵循的一系列约定和规范。这些约定涵盖了多个方面&#xff0c;包括但不限于&#xff1a; 注释字符&#xff1a;识别并忽略文件中用…

kubeadm 初始化 k8s 证书过期解决方案

概述 在使用 kubeadm 初始化的 Kubernetes 集群中&#xff0c;默认情况下证书的有效期为一年。当证书过期时&#xff0c;集群中的某些组件可能会停止工作&#xff0c;导致集群不可用。本文将详细介绍如何解决 kubeadm 初始化的 Kubernetes 集群证书过期的问题&#xff0c;并提…

CSP-J基础之常见的竞赛题库

文章目录 CSP-J基础之常见的竞赛题库1. 可达 (KEDA)2. 洛谷 (Luogu)3. Codeforces 洛谷账号的注册总结 CSP-J基础之常见的竞赛题库 在备战CSP-J&#xff08;Certified Software Professional Junior&#xff09;及其他信息学竞赛时&#xff0c;选手们常需要借助在线题库来进行…

android framework工程师遇到成长瓶颈迷茫怎么办?千里马经验分享

背景 近来有一些framework老司机粉丝朋友发来了一些framework工作中的一些疑问&#xff0c;具体描述如下&#xff1a; 这个同学遇到的问题&#xff0c;其实就是大部分framework开发者工作比较久后遇到的一个上升瓶颈问题。 具体总结有以下几个瓶颈问题 1、framework属于系…

Clion不识别C代码或者无法跳转C语言项目怎么办?

如果是中文会显示&#xff1a; 此时只需要右击项目&#xff0c;或者你的源代码目录&#xff0c;将这个项目或者源码目录标记为项目源和头文件即可。 英文如下&#xff1a;

STM32内部闪存FLASH(内部ROM)、IAP

1 FLASH简介 1 利用程序存储器的剩余空间来保存掉电不丢失的用户数据 2 通过在程序中编程(IAP)实现程序的自我更新 &#xff08;OTA&#xff09; 3在线编程&#xff08;ICP把整个程序都更新掉&#xff09; 1 系统的Bootloader写死了&#xff0c;只能用串口下载到指定的位置&a…

【MacOS】mac定位服务中删除已经卸载的软件

mac定位服务中删除已经卸载的软件 网上的帖子真不靠谱 直接右键 WeTypeSettings &#xff0c;查找位置&#xff0c;丢废纸篓即可&#xff01;会提示你卸载的&#xff01;

VLAN原理学习笔记

以太网是一种基于CSMA/CD的数据网络通信技术&#xff0c;其特征是共享通信介质。当主机数目较多时会导致安全隐患、广播泛滥、性能显著下降甚至造成网络不可用。 在这种情况下出现了VLAN (Virtual Local Area Network)技术解决以上问题。 1、VLAN快速配置 Vlan:Virtual Local…

C和指针:结构体(struct)和联合(union)

结构体和联合 结构体 结构体包含一些数据成员&#xff0c;每个成员可能具有不同的类型。 数组的元素长度相同&#xff0c;可以通过下标访问(转换为指针)。但是结构体的成员可能长度不同&#xff0c;所以不能用下标来访问它们。成员有自己的名字&#xff0c;可以通过名字访问…

springboot流浪天使乐园管理系统

基于springbootvue实现的流浪天使乐园管理系统&#xff08;源码L文ppt&#xff09;4-039 第4章 系统设计 4.1 总体功能设计 一般个人用户和管理者都需要登录才能进入流浪天使乐园管理系统&#xff0c;使用者登录时会在后台判断使用的权限类型&#xff0c;包括一般使用者…

以太网交换机工作原理学习笔记

在网络中传输数据时需要遵循一些标准&#xff0c;以太网协议定义了数据帧在以太网上的传输标准&#xff0c;了解以太网协议是充分理解数据链路层通信的基础。以太网交换机是实现数据链路层通信的主要设备&#xff0c;了解以太网交换机的工作原理也是十分必要的。 1、以太网协议…

Qt/C++编写的Onvif调试助手调试神器工具/支持云台控制/预置位设置等/有手机版本

一、功能特点 广播搜索设备&#xff0c;支持IPC和NVR&#xff0c;依次返回。可选择不同的网卡IP进行对应网段设备的搜索。依次获取Onvif地址、Media地址、Profile文件、Rtsp地址。可对指定的Profile获取视频流Rtsp地址&#xff0c;比如主码流地址、子码流地址。可对每个设备设…

ESP32_获取心知天气

目录 前言 一、获取心知天气API 二、编写代码 1.下载代码 2.代码讲解 1.安装Arduino.Json库 2.输入WIFI名称和密码 3.添加API 4.关于API的补充 三.数据的打印和处理 1.获取的数据 2.数据输出 总结 前言 环境&#xff1a;Arduino 芯片&#xff1a;ESP32 软件&…

基于springboot+vue实现的农家乐管理系统

基于springbootvue实现的山庄农家乐管理系统前后端分离项目&#xff08;文末查看源码lw&#xff09;4-10 系统角色&#xff1a; 管理员、用户 主要功能&#xff1a; &#xff08;1&#xff09;用户关键功能包含用户注册登陆、个人信息修改、首页、农家乐、美食信息、民宿信息…

【LeetCode】20.有效的括号

题目要求 解题思路 利用栈来解决本道题&#xff0c;左括号进栈&#xff0c;右括号出栈。需要判断第一个字符是右括号的情况 代码实现 class Solution { public:bool isValid(string s) {//利用栈来解决stack<char> st;for(auto& e:s){//是左括号就进if(e(||e[||…

SpringBoot开启多端口探究--基于多ApplicationContext

文章目录 前情提要一、思路概要二、具体实现三、其他问题父子关系部分依赖 总结 前情提要 前面探讨了management端口开启&#xff0c;grpc端口开启&#xff0c;本文继续探讨在SpringApplication中开启多个端口的方式之多ApplicationContext, 相比management端口基于多WebServe…

java计算机毕设课设—停车管理信息系统(附源码、文章、相关截图、部署视频)

这是什么系统&#xff1f; 资源获取方式在最下方 java计算机毕设课设—停车管理信息系统(附源码、文章、相关截图、部署视频) 停车管理信息系统是为了提升停车场的运营效率和管理水平而设计的综合性平台。系统涵盖用户信息管理、车位管理、收费管理、违规车辆处理等多个功能…

基于Spring Boot的火车订票管理系统

你好呀&#xff0c;我是计算机学姐码农小野&#xff01;如果有相关需求&#xff0c;可以私信联系我。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;JAVA语言 Spring Boot框架 工具&#xff1a;IDEA/Eclipse、Navicat、Tomcat 系统展示 首页 管理…

【动手学深度学习】05 线性代数(个人向笔记)

1. 线性代数 向量的一些公式 ∣ ∣ a ∣ ∣ ||a|| ∣∣a∣∣ 表示向量 a 的范数&#xff0c;课上没有讲范数的概念 其中第一条为求向量的二范数 第四条表示如果a为标量&#xff0c;那么向量 ∣ ∣ a ⋅ b ∣ ∣ ||ab|| ∣∣a⋅b∣∣ 的长度等于 ∣ a ∣ ⋅ ∣ ∣ b ∣ ∣…

Ifream实现微前端效果

记得有人曾问过我&#xff0c;老旧的项目内容很多&#xff0c;项目卡&#xff0c;想要改造成类似微前端&#xff0c;领导想要快速&#xff0c;又不想系统重构、而且是不同子系统的协同&#xff0c;要怎么做&#xff1f;对方不想做太大的改造&#xff0c;所以想用ifream的方式动…