golang nats[3] queue模式-程序员宅基地

技术标签: golang  

队列订阅模式

此模式中,订阅者要指定两个属性,主题和队列(queue,其实就是队列名称)

注意:下面所有前提=必须订阅同一个主题

发布消息后,N个具有同样的主题和queue的订阅者,只有一个会收到消息。(random算法)

说明:queue=工作组,工作组中有N个worker,发布消息后,同一个工作组中,仅有一个worker会收到消息。

相同主题,不同queue的订阅者之间,不符合上面的描述。这种情况下,可以把同一个queue的订阅者们,当成一个订阅者来处理,这样就和普通的发布订阅模式一样了。

主题subj1,queue=q1的订阅者有sub1-q1,sub2-q1,sub3-q1
主题subj1,queue=q2的订阅者有sub1-q2,sub2-q2,sub3-q2
一个主题,两组订阅者,每组订阅者中各有3个订阅者。
对sub1发布消息,q1,q2两个组都会收到消息(发布订阅模式),q1,q2每个组中,分别仅有一个订阅者会收到消息(queue模式)

server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag" ) const ( //url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) var ( nc *nats.Conn err error ) func init() { if nc, err = nats.Connect(url); checkErr(err) { // } } func main() { var ( servername = flag.String("servername", "y", "name for server") queueGroup = flag.String("group", "", "group name for Subscribe") subj = flag.String("subj", "", "subject name") ) flag.Parse() log.Println(*servername, *queueGroup, *subj) startService(*subj, *servername+" worker1", *queueGroup) startService(*subj, *servername+" worker2", *queueGroup) startService(*subj, *servername+" worker3", *queueGroup) select {} } //receive message func startService(subj, name, queue string) { go async(nc, subj, name, queue) } func async(nc *nats.Conn, subj, name, queue string) { nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) { log.Println(name, "Received a message From Async : ", string(msg.Data)) }) } func checkErr(err error) bool { if err != nil { log.Println(err) return false } return true } 

client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "strconv" "github.com/pborman/uuid" "flag" "time" ) const ( //url = "nats://192.168.3.125:4222" url = nats.DefaultURL ) var ( nc *nats.Conn err error ) func init() { if nc, err = nats.Connect(url); checkErr(err) { // } } func main() { var ( subj = flag.String("subj", "", "subject name") ) flag.Parse() log.Println(*subj) startClient(*subj) time.Sleep(time.Second) } //send message to server func startClient(subj string) { for i := 0; i < 1; i++ { id := uuid.New() log.Println(id) nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i))) nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i))) nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i))) nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i))) } } func checkErr(err error) bool { if err != nil { log.Println(err) return false } return true } 

启动server A queue=g1,订阅主题=weather

./main -servername=A -group=g1 -subj=weather
2018/08/18 11:32:16 A g1 weather

启动server B queue=g1,订阅主题=weather

./main -servername=B -group=g1 -subj=weather
2018/08/18 11:32:21 B g1 weather

发送消息

./main -subj=weather
2018/08/18 11:32:24 weather
2018/08/18 11:32:24 3005ae7c-85ab-42d3-ad09-d44688d129ad

结果 server A收到消息

2018/08/18 11:32:24 A worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Rain 0
2018/08/18 11:32:24 A worker2 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Sun 0

结果 server B收到消息

2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Fog 0
2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Cloudy 0

主题相同,queue不同

启动server c queue=test,订阅主题=weather

> ./main -servername=C -group=test -subj=weather
2018/08/18 11:37:43 C test weather

发消息

./main -subj=weather
2018/08/18 11:37:47 weather
2018/08/18 11:37:47 b4e201dd-ea4a-4ec3-aa45-99489695f0c2

Server c 收到了全部消息

2018/08/18 11:37:47 C worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server A 收到3条消息

2018/08/18 11:37:47 A worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server B 收到1条消息

2018/08/18 11:37:47 B worker2 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0

总结:queue模式,在分发消息时,进行负载均衡,随机发送给同一组中的任意一个订阅者,可以随时增加删除订阅者,配合响应的监控数据和统计数据,对下游的业务进行自动伸缩。

提高系统的可用性,避免业务在单点处理导致系统瓶颈。

栗子:
比如用户登录,对login主题发送消息,积分系统订阅了login主题,收到login的消息后,对用户的积分进行处理。为了保证积分处理的高可用,可以使用相同的queue=score,启动多个积分处理服务。
监控积分业务的处理时间,如果某个积分处理服务,业务执行时间过长(比如由于某些/某类用户的特殊情况,积分算法不同等),造成了消息积压,不能及时处理。

在积分系统的下游仍有处理能力的时候(比如依赖下游的某个接口,此接口的处理能力依然是正常的),可以自动启动多个积分处理服务,订阅主题login,queue=score,分散计算压力。
如果是下游的处理能力受限,则可能要进行限流处理,不但不能启动多个积分处理服务,还要限制积分业务的处理速度。



作者:luckyase
链接:https://www.jianshu.com/p/5cc237d98416
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

转载于:https://www.cnblogs.com/gao88/p/10007687.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/ahyz9638/article/details/101561566

智能推荐

微信小程序怎么动态的添加css,微信小程序点击view动态添加样式过程解析-程序员宅基地

文章浏览阅读3.2k次。这篇文章主要介绍了微信小程序点击view动态添加样式过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下基本逻辑:1.给每个view自定义dataIndex属性,从0开始2.自定义一个名为selected的class,作为被选中后的样式3.在wx.js中给viewId属性赋为0,用于默认显示。4.给每个view添加一个点击事件select,在点..._小程序 引入的 webview页面,可以在这个页面 增加样式吗

C语言--栈与队列1_#defineelementtype int是啥意思-程序员宅基地

文章浏览阅读378次。C语言顺序栈的基本操作---创建,入栈,出栈,显示栈内容,计算栈当前容量,判断是否为空,清空,销毁_#defineelementtype int是啥意思

H264—MP4格式及在MP4文件中提取H264的SPS、PPS及码流_android 判断mp4文件是 h264-程序员宅基地

文章浏览阅读2.3k次。一、MP4格式基本概念MP4格式对应标准MPEG-4标准(ISO/IEC14496)二、MP4封装格式核心概念1 MP4封装格式对应标准为 ISO/IEC 14496-12(信息技术 视听对象编码的第12部分: ISO 基本媒体文件格式/Information technology Coding of audio-visual objects Part 12: _android 判断mp4文件是 h264

java 下载文件的几种方法_java 文件下载-程序员宅基地

文章浏览阅读747次。1.以流的方式下载public HttpServletResponse download(String path, HttpServletResponse response) { try { // path是指欲下载的文件的路径。 File file = new File(path); // 取得文件名。..._java 文件下载

人月神话读后感_软件危机银弹读后感-程序员宅基地

文章浏览阅读925次。“人月神话”作为程序员一本不可多得的好书从不同方面讲述了软件工程中遇到的问题。直到细读之后发现,美国30年前软件项目面临的问题到现在依然没有很好的解决,大家依旧在焦油坑里挣扎,而且看上去没有办法。1 焦油坑-人们在苦苦挣扎但却无从逃脱的窘境 大型软件系统开发的问题由无数个小问题的纠缠造成了整个系统的问题,同时也引出了软件构件和大型系统的区别以及程序员的在创造程序中获得的乐趣,以_软件危机银弹读后感

iOS开发-调试网络限速(真机和模拟器)_ios 模拟器如何调整网络速度设置-程序员宅基地

文章浏览阅读7.2k次。作为开发者,在开发项目过程中,我们需要进行对于网络环境的调试。Mac环境下模拟慢速网络可以使用苹果官方提供的工具:Network Link Conditioner。_ios 模拟器如何调整网络速度设置

随便推点

Android 系统之 AudioTrack 回顾小结_audiotrack.write_non_blocking-程序员宅基地

文章浏览阅读5.2k次。AudioTrack1. MODE_STATIC 和 MODE_STREAM2. audio buffer3. 应用层AudioTrack的使用关于write()关于StreamTypegetMinBufferSize()4. Framework native层AudioTrack的创建5. AudioTrack的处理几个音频概念transfer_typeAudioT..._audiotrack.write_non_blocking

邻水职中计算机公开课教案,计算机基础公开课教案.doc-程序员宅基地

文章浏览阅读178次。PAGEPAGE 4计算机应用基础公开课教案授课人:袁涛 授课对象:机电工程系2011级学生时间:2011年12月8日 星期四 上午一、二节课题:excel中数据的基本处理一、教学目标:知识与技能1、掌握一些常见函数的使用方法2、会对一组数据排序、筛选(二)过程与方法1、锻炼学生恰当、自如地使用函数的能力;2、培养学生收集、分析、处理数据的能力;3、培养自主探索,合作交流能力..._计算机应用基础公开课教案

vue日期范围选择器_Vue.js日期范围选择器,具有多个范围和预设-程序员宅基地

文章浏览阅读3.9k次。vue日期范围选择器 vue-mj-daterangepicker (vue-mj-daterangepicker)Vue.js date range picker with multiples ranges and presets (vue 2.x) . Vue.js日期范围选择器,具有多个范围和预设(vue 2.x)。 View demo查看演示 Download Source 下载..._mj-daterange-picker

适用于Spring Boot的数据字典翻译Starter_springboot 自动翻译数据字段数据-程序员宅基地

文章浏览阅读2.1k次,点赞6次,收藏14次。适用于Spring Boot的字典翻译扩展在常见的web应用中,有很多数据库字段会使用字典值,但是在数据查询时,我们需要将存储的字典值转换成对应的字典标签(value>>name),用于展示给用户。常见的转换方式为从数据库查询、逻辑包装等,这样的字段一旦有很多的话,就非常的不方便,所以我就做了这个扩展项目。总述我做的是一个基于Spring Boot的扩展starter,项目代码已经上传到github:dict-traslate-starter ,这篇文档先说一下用法然后讲一下设计思路与_springboot 自动翻译数据字段数据

MonkeyTalk Java API 使用教程_monkeytalk使用教程-程序员宅基地

文章浏览阅读1k次。MonkeyTalk简介简单来说,MonkeyTalk是一个移动端的自动化测试框架,支持Android,iOS,Webapp的功能自动化测试。用户既可以使用MonkeyTalk集成开发环境进行测试步骤的录制回放,也可以使用Java API编写测试脚本进行自动化测试。详细介绍可以参考访问MonkeyTalk的官方网站。这里,我们主要介绍如何使用MonkeyTalk的Java API进行自_monkeytalk使用教程

linux中的系统函数库,linux 系统库函数-程序员宅基地

文章浏览阅读387次。linux系统库函数(2010-08-12 17:36:04)标签:杂谈第1章 字符测试函数isalnum(测试字符是否为英文字母或数字)isalpha(测试字符是否为英文字母)isascii(测试字符是否为ASCII码字符)isblank(测试字符是否为空格字符)iscntrl(测试字符是否为ASCII码的控制字符)isdigit(测试字符是否为阿拉伯数字)isgraph(测试字符是否为可打印..._linux 求浮点数的绝对值用什么函数

推荐文章

热门文章

相关标签