zmq pub/sub使用详解_zmq pub sub-程序员宅基地

技术标签: pub/sub  并发发布  详解  zmq  

关于zmq的基本简介,请参考ZeroMQ基础入门

pub/sub模式介绍

发布/订阅模式,全称为Publish/Subscribe,支持多个发布者/多订阅者,使用在消息单向传输的应用场景,消息总是从发布者发送到订阅者。

一般的使用流程为:

pub/sub

pub端:

  • 创建context
  • 创建socket,设置ZMQ_PUB模式
  • bind端口
  • 循环发布消息send

socket特性:

特性
兼容的对端套接字 ZMQ_SUB
方向性 单向
发送/接收模式 仅发送
进入路由策略
流出路由策略 扇出(呈扇形发出)
ZMQ_HWM 选项行为 丢弃

sub端:

  • 创建context
  • 创建socket,设置ZMQ_SUB模式
  • connect到pub端
  • setsockopt设置ZMQ_SUBSCRIBE订阅的消息
  • 循环接收recv
特性
兼容的对端套接字 ZMQ_PUB
方向性 单向
发送/接收模式 仅接收
进入路由策略 平等排队
流出路由策略
ZMQ_HWM 选项行为 丢弃
注意事项

  • pub端socket不能使用recv函数,同样,sub端不能使用send函数
  • 当pub端由于到达了高水位而使ZMQ_PUB套接字进入静默模式的时候,所有发送到这个有问题的订阅者的消息都会被丢弃,直到静默模式终止
  • pub端socket的zmq_send()函数永远不会阻塞
  • sub端刚创建socket后是无法订阅到任何消息的,必须使用setsockopt设置订阅的消息后才能接收到
  • sub端是根据参数前缀进行过滤的。订阅的内容以pub端发出的内容从头开始匹配为过滤条件,完全匹配订阅内容的消息会被sub接收,如订阅内容为"a"时,所有以a开头的消息都会接收
  • 当订阅内容为"",长度为0时为订阅所有内容,因为所有消息都匹配成功
  • 如果存在某个pub没有被任何sub连接,则该pub会丢弃所有的消息
  • 如果采用tcp的连接方式,sub很慢时,消息将会堆积在pub,可以通过设置ZMQ_HWM选项来保护发布者,对于重要的消息,也可以写入硬盘等待发送
  • pub和sub谁bind谁connect并无严格要求(虽本质并无区别),但仍建议pub使用bind,sub使用connect,在实际测试中,使用sub绑定而pub connect时,sub端无法接收到消息
  • 一个显著的问题是,“slow joiner”可能导致发布者的第一笔消息总是丢失,下文会进一步说明该问题
  • 一个订阅者(subcriber)可以链接超过一个发布者(publisher)。数据到达后将交叉存取(公平队列),以保证发布者之间的数据不会淹没
  • 从ZeroMQ v3.x版本开始,使用tcp://或者ipc://协议连接时会在发布者进行消息过滤,使用epgm://协议仍在订阅者过滤;在ZeroMQ v2.x,所有消息过滤都发生在订阅者
关于slow joiner问题

一般描述为,即使是先启动订阅者,再启动发布者,订阅者也有可能会丢失前一部分数据。你无法得知SUB是何时开始接收消息的。

这是因为订阅者向发布者建立连接也是耗费时间的,虽然时间极短,但不为0。这个时间内发布者发布的内容将没有订阅者能够接收。

几种处理方式:

  • 不处理

对于可以容忍数据丢失的应用来说,不必理会丢失的数据。比如接收天气信息的应用,你可能只关注最新的天气信息。

  • 使用延时发布策略

因为数据丢失的原因是发布者在没有稳定接收者的情况下仍然发送了数据,所以可以让发布者等待一段时间再发送数据。

最简单的方式是sleep一段时间。

缺点是:

  1. 不知道sleep多久合适
  2. 即使sleep也不能保证sub者一定就准备好了
  3. 在程序中加sleep不是一种优雅的设计
  • 通知发布者模式

发布者可以在确保订阅者已经启动成功的情况下再发送数据,只需要订阅者在准备好后通知发布者。

这个不难实现,订阅者在准备好后,首先使用req/rep模式向发布者发送一个特定的请求,发布者接收请求并应答,然后再发布真正的数据。

这种方式增加了简单一步操作,但保证了数据的完整。

多线程发送问题

注意:context是线程安全的,但socket非线程安全。在多个线程中使用同一个socket会导致程序崩溃(不提倡使用锁,它会导致竞争并减慢性能,不符合zmq的设计理念)。

  1. 使用proxy

一种比较常见的场景是,发布者使用多个线程来发布不同的数据,所有的数据通过一个endpoint发送。

这与动态发现问题类似,对于一个应用场景,可能会随时增加发布者或者订阅者,构建一个合适的系统可以减小编码和出错的机会。

在这里插入图片描述

如图所示,使用一个proxy可以轻松解决这个问题。

  • 增加了中间层。中间层是静态的,它不会经常变动
  • pub可以随意增减,它们都connect到xsub
  • sub也可以随意增减,它们都connect到xpub
  • xsub收到的所有数据会通过proxy转发到xpub,从而发布到各sub

对于这种情况,可以在多个发布线程中分别创建socket,connect到xsub,从而避免多线程共用socket。

这种情况下,当发布线程较多时,会导致socket堆积,最终导致系统文件描述符过多而失败。可以使用线程池,每个线程使用自己的socket。

还有一种使用代理的情况是:

在这里插入图片描述

这里proxy类似于一个桥,连接了两个不同的网络。也可以作为协议网关,用于连接两个使用不同协议的网络。

结合天气服务,实现一个Proxy的例程如下:

//
//  Weather proxy device C++
//
// Olivier Chamoux <[email protected]>
//

#include "zhelpers.hpp"

int main (int argc, char *argv[])
{
    
    zmq::context_t context(1);

    //  This is where the weather server sits
    zmq::socket_t frontend(context, ZMQ_SUB);
    frontend.connect("tcp://192.168.55.210:5556");

    //  This is our public endpoint for subscribers
    zmq::socket_t backend (context, ZMQ_PUB);
    backend.bind("tcp://10.1.1.0:8100");

    //  Subscribe on everything
    frontend.setsockopt(ZMQ_SUBSCRIBE, "", 0);

    //  Shunt messages out to our own subscribers
    while (1) {
    
        while (1) {
    
            zmq::message_t message;
            int more;
            size_t more_size = sizeof (more);

            //  Process all parts of the message
            frontend.recv(&message);
            frontend.getsockopt( ZMQ_RCVMORE, &more, &more_size);
            backend.send(message, more? ZMQ_SNDMORE: 0);
            if (!more)
                break;      //  Last message part
        }
    }
    return 0;
}
  1. 使用push/pull

使用push/pull建立一个发布端模型,pull接收到所有数据通过proxy转到pub,再发送出去。

这种方式是使用xpub/xsub的变体。因为实际使用中,在单进程内使用sub绑定,pub连接的方式无法接收到数据,而push/pull是一个可行的替代。

启动代理:

// 代码片断
void StartPubProxy(string& port)
{
    
    try
    {
    
        // 面向client的socket,多线程发来所有数据
        zmq::socket_t frontend(m_ctx, ZMQ_PULL);
        frontend.bind("inproc://#0");

        // 面向services的socket,提供对外端口,并实际发布数据
        zmq::socket_t backend(m_ctx, ZMQ_PUB);
        string strBind = "tcp://*:" + port;
        backend.bind(strBind);

        // 创建proxy
        zmq::proxy(static_cast<void*>(frontend), static_cast<void*>(backend), nullptr);
    }
    catch(const std::exception& e)
    {
    
        std::cerr <<"zmq启动转发代理失败:" << e.what() << '\n';
    }
}

原发布线程向"inproc://#0"push数据即可。

  1. 使用boost asio单线程处理数据

在有些情况下,加锁的多线程未必比无锁单线程更快。

对于多线程发布模型来说,可以把它们要发送的数据通过strand.post到io_service的单线程队列里,由工作线程异步处理。

这种方式简单方便,关于使用asio创建线程的使用可以参考:boost::asio::io_service创建线程池简单实例

保护发布者

前方讲述,当sub端处理较慢时,pub端在到达高水位线后会丢弃数据。对于重要的应用,由于不能对sub端的性能作出任何假设,所以需要一定的策略来保证。

ZMQ_HWM

ZMQ_SNDHWM:对向外发送的消息设置高水位(最大缓存量),ZMQ_RCVHWM:对进入socket的消息设置高水位。

ZMQ_SNDHWM属性将会设置socket参数指定的socket对外发送的消息的高水位。高水位是一个硬限制,它会限制每一个与此socket相连的在内存中排队的未处理的消息数目的最大值。

0值代表着没有限制,默认值为1000,就在bind/connect之前设置该属性。如果设置为无限,可能会导致发布者崩溃。

如果已经到达了规定的限制,socket就需要进入一种异常的状态,表现形式因socket类型而异。socket会进行适当的调节,比如阻塞或者丢弃已发送的消息。

总是给套接字设置一个基于期望的订阅方数量的最大值,你打算用于队列的内存的数量,和一个消息平均大小的高水位线。例如,如果你希望有5000个订阅方,有1G的内存可有,消息平均200字节,那么一个安全的高水位线应该是(1000000000/200/5000)=1000.


参考资料

?MQ - The Guide
Unable to receive messages when binding subscriber socket
What is a simple example of a working XSUB / XPUB proxy in zeromq
How to implement Pub-Sub Network with a Proxy by using XPUB and XSUB in ZeroMQ(C++)?
Proxy with inproc frontend
ZMQ模式详解——发布/订阅模式
ZeroMQ基础入门

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/guotianqing/article/details/101429319

智能推荐

分布式光纤传感器的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告_预计2026年中国分布式传感器市场规模有多大-程序员宅基地

文章浏览阅读3.2k次。本文研究全球与中国市场分布式光纤传感器的发展现状及未来发展趋势,分别从生产和消费的角度分析分布式光纤传感器的主要生产地区、主要消费地区以及主要的生产商。重点分析全球与中国市场的主要厂商产品特点、产品规格、不同规格产品的价格、产量、产值及全球和中国市场主要生产商的市场份额。主要生产商包括:FISO TechnologiesBrugg KabelSensor HighwayOmnisensAFL GlobalQinetiQ GroupLockheed MartinOSENSA Innovati_预计2026年中国分布式传感器市场规模有多大

07_08 常用组合逻辑电路结构——为IC设计的延时估计铺垫_基4布斯算法代码-程序员宅基地

文章浏览阅读1.1k次,点赞2次,收藏12次。常用组合逻辑电路结构——为IC设计的延时估计铺垫学习目的:估计模块间的delay,确保写的代码的timing 综合能给到多少HZ,以满足需求!_基4布斯算法代码

OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版-程序员宅基地

文章浏览阅读3.3k次,点赞3次,收藏5次。OpenAI Manager助手(基于SpringBoot和Vue)_chatgpt网页版

关于美国计算机奥赛USACO,你想知道的都在这_usaco可以多次提交吗-程序员宅基地

文章浏览阅读2.2k次。USACO自1992年举办,到目前为止已经举办了27届,目的是为了帮助美国信息学国家队选拔IOI的队员,目前逐渐发展为全球热门的线上赛事,成为美国大学申请条件下,含金量相当高的官方竞赛。USACO的比赛成绩可以助力计算机专业留学,越来越多的学生进入了康奈尔,麻省理工,普林斯顿,哈佛和耶鲁等大学,这些同学的共同点是他们都参加了美国计算机科学竞赛(USACO),并且取得过非常好的成绩。适合参赛人群USACO适合国内在读学生有意向申请美国大学的或者想锻炼自己编程能力的同学,高三学生也可以参加12月的第_usaco可以多次提交吗

MySQL存储过程和自定义函数_mysql自定义函数和存储过程-程序员宅基地

文章浏览阅读394次。1.1 存储程序1.2 创建存储过程1.3 创建自定义函数1.3.1 示例1.4 自定义函数和存储过程的区别1.5 变量的使用1.6 定义条件和处理程序1.6.1 定义条件1.6.1.1 示例1.6.2 定义处理程序1.6.2.1 示例1.7 光标的使用1.7.1 声明光标1.7.2 打开光标1.7.3 使用光标1.7.4 关闭光标1.8 流程控制的使用1.8.1 IF语句1.8.2 CASE语句1.8.3 LOOP语句1.8.4 LEAVE语句1.8.5 ITERATE语句1.8.6 REPEAT语句。_mysql自定义函数和存储过程

半导体基础知识与PN结_本征半导体电流为0-程序员宅基地

文章浏览阅读188次。半导体二极管——集成电路最小组成单元。_本征半导体电流为0

随便推点

【Unity3d Shader】水面和岩浆效果_unity 岩浆shader-程序员宅基地

文章浏览阅读2.8k次,点赞3次,收藏18次。游戏水面特效实现方式太多。咱们这边介绍的是一最简单的UV动画(无顶点位移),整个mesh由4个顶点构成。实现了水面效果(左图),不动代码稍微修改下参数和贴图可以实现岩浆效果(右图)。有要思路是1,uv按时间去做正弦波移动2,在1的基础上加个凹凸图混合uv3,在1、2的基础上加个水流方向4,加上对雾效的支持,如没必要请自行删除雾效代码(把包含fog的几行代码删除)S..._unity 岩浆shader

广义线性模型——Logistic回归模型(1)_广义线性回归模型-程序员宅基地

文章浏览阅读5k次。广义线性模型是线性模型的扩展,它通过连接函数建立响应变量的数学期望值与线性组合的预测变量之间的关系。广义线性模型拟合的形式为:其中g(μY)是条件均值的函数(称为连接函数)。另外,你可放松Y为正态分布的假设,改为Y 服从指数分布族中的一种分布即可。设定好连接函数和概率分布后,便可以通过最大似然估计的多次迭代推导出各参数值。在大部分情况下,线性模型就可以通过一系列连续型或类别型预测变量来预测正态分布的响应变量的工作。但是,有时候我们要进行非正态因变量的分析,例如:(1)类别型.._广义线性回归模型

HTML+CSS大作业 环境网页设计与实现(垃圾分类) web前端开发技术 web课程设计 网页规划与设计_垃圾分类网页设计目标怎么写-程序员宅基地

文章浏览阅读69次。环境保护、 保护地球、 校园环保、垃圾分类、绿色家园、等网站的设计与制作。 总结了一些学生网页制作的经验:一般的网页需要融入以下知识点:div+css布局、浮动、定位、高级css、表格、表单及验证、js轮播图、音频 视频 Flash的应用、ul li、下拉导航栏、鼠标划过效果等知识点,网页的风格主题也很全面:如爱好、风景、校园、美食、动漫、游戏、咖啡、音乐、家乡、电影、名人、商城以及个人主页等主题,学生、新手可参考下方页面的布局和设计和HTML源码(有用点赞△) 一套A+的网_垃圾分类网页设计目标怎么写

C# .Net 发布后,把dll全部放在一个文件夹中,让软件目录更整洁_.net dll 全局目录-程序员宅基地

文章浏览阅读614次,点赞7次,收藏11次。之前找到一个修改 exe 中 DLL地址 的方法, 不太好使,虽然能正确启动, 但无法改变 exe 的工作目录,这就影响了.Net 中很多获取 exe 执行目录来拼接的地址 ( 相对路径 ),比如 wwwroot 和 代码中相对目录还有一些复制到目录的普通文件 等等,它们的地址都会指向原来 exe 的目录, 而不是自定义的 “lib” 目录,根本原因就是没有修改 exe 的工作目录这次来搞一个启动程序,把 .net 的所有东西都放在一个文件夹,在文件夹同级的目录制作一个 exe._.net dll 全局目录

BRIEF特征点描述算法_breif description calculation 特征点-程序员宅基地

文章浏览阅读1.5k次。本文为转载,原博客地址:http://blog.csdn.net/hujingshuang/article/details/46910259简介 BRIEF是2010年的一篇名为《BRIEF:Binary Robust Independent Elementary Features》的文章中提出,BRIEF是对已检测到的特征点进行描述,它是一种二进制编码的描述子,摈弃了利用区域灰度..._breif description calculation 特征点

房屋租赁管理系统的设计和实现,SpringBoot计算机毕业设计论文_基于spring boot的房屋租赁系统论文-程序员宅基地

文章浏览阅读4.1k次,点赞21次,收藏79次。本文是《基于SpringBoot的房屋租赁管理系统》的配套原创说明文档,可以给应届毕业生提供格式撰写参考,也可以给开发类似系统的朋友们提供功能业务设计思路。_基于spring boot的房屋租赁系统论文