SpringCloud Hoxton——Stream服务消息驱动_spring cloud stream支持activemq吗-程序员宅基地

技术标签: # 【SpringCloud】  微服务  java  spring cloud  分布式  

1.开篇

什么是SpringCloudStream?官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

我们都知道消息中间件有很多种,常用的有4种(ActiveMQ、RabbitMQ、RocketMQ、Kafka),但是Spring Cloud Stream目前仅支持RabbitMQ、Kafka。

标准的MQ一般都是下面这张图的形式。

Message消息:生产者/消费者之间靠消息媒介传递消息内容。

MessageChannel消息通道:消息必须走特定的通道。

SubscribableChannel:由MessageHandler消息处理器所订阅,谁来负责收发处理。

前面我们提到了目前存在并且常用的MQ有四种,那么就产生了一个问题:比方说我们的项目种用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。(通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。)

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

Binder:很方便的连接中间件,屏蔽各中间件之间的差异。

Channel:消息通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储个转发的媒介。

Source、Sink:其参照对象是SpringCloud Stream自身,从Stream中发布消息就是输出Source,接收消息就是输入Sink。

在这其中常用的API、注解如下:


2.项目源码

github源码地址:https://github.com/2656307671/SpringCloud-Hoxton-Stream

gitee源码地址:汉卿/SpringCloud Hoxton-Stream服务消息驱动

首先要确保你的RabbitMQ环境已经搭建成功,并且启动,通过浏览器可以访问它的web管理界面窗口(虚拟机IP:15672)。我是在linux上启动的RabbitMQ;如果你是在windows上配置的,那就是localhost:15672。

这里一共有三个微服务模块,7001注册中心、8801消息发送模块(消息驱动之生产者)、8802消息接收模块(消息驱动之消费者)。

测试依次启动7001、8801、8802。

测试url:localhost:8801/sendMessage(发送四次请求,也就是发送四条消息)

上面的测试只有一个消息接收模块,下面再添加一个8803,它也是消息接收模块(消息驱动之消费者)。

此时,我们测试,仍然是依次启动7001、8801、8802、8803。

测试url:localhost:8801/sendMessage(发送两次请求,也就是发送两条消息)

从上面的截图可以看到,虽然8802、8803这两个消息接收方都成功接收到了消息,但是它俩收到的消息是完全一样的,这就存在着重复消费的问题。

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。

这时我们就可以使用Stream中的消息分组来解决。注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

Stream消息分组就是说:微服务应用放置于同一个group中,就能保证消息只会被其中的一个应用消费一次,因为同一个组内会发生竞争关系,要么你消费A,要么我消费A,不会出现我俩都消费A的情况。但是不同的组是可以重复消费的。

下面,我将8802、8803放置在不同的组(参见yml配置文件),然后测试启动顺序、url和上面的一样,看看结果如何?

从RabbitMQ的web管理界面中看到,这里的两个Exchange交换机对应着8802、8803,它们在不同的两个组中(the first group、the second group),而我们下面发了两条消息(执行两次localhost:8801/sendMessage),但是8802、8803仍然接收到了相同的消息。可见:将它俩放置在不同的组中,无法解决重复消费的问题。

那么下面自然就是将它俩放置在相同的组中(参见yml配置文件),再次测试。

此时再看,8802、8803各自收到一条消息。因为它俩在同组,就会出现竞争,解决了重复消费的问题。

最后,再来说一下Stream中的持久化问题。

我们先停掉8802、8803这两个微服务,然后将8802的分组去掉(也就是将它yml配置文件中的group标签删掉),8803的保留。

之后执行四次localhost:8801/sendMessage,发送四条消息到RabbitMQ,此时重启8802、8803。

得到的结果就不再截图了,我在这里说一下:8802无法获取RabbitMQ中的四条消息,因为8802此时没有分组,所以它的后台无法读取消息数据;而8803保留了group分组属性,它可以获取到刚刚发送的四条消息。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_43823808/article/details/119417766

智能推荐

动态内存管理_C_不申请内存,堆空间在减少-程序员宅基地

文章浏览阅读324次。一、函数介绍 以下四个函数都包含在头文件< stdlib.h >1.malloc 函数原型:void *malloc(size_t size); 作用:从堆空间申请内存 函数参数:需要申请的空间大小(字节数) 返回值:申请成功则返回一个指向申请到的内存的指针,失败则返回NULL2.calloc 函数原型:void *calloc(size_t num,siz..._不申请内存,堆空间在减少

ORACLE之rman备份:ORA-19809和ORA-19804的解决方法_cannot reclaim 67108864 bytes disk space from 1073-程序员宅基地

文章浏览阅读1.8k次。操作环境:redhat 6.4 oracle 12crman备份出错。之前出现过,好象重新backup database就可以了,今天又出现,纪录下RMAN-03009: failure of Contrl file and SPFILE Autobackup command on ORA_DISK_1 channel at 06/06/2018 13:43:50ORA-19809: limit..._cannot reclaim 67108864 bytes disk space from 10737418240 bytes limit

2022年雷达领域学术会议时间节点_iet radar conference 2023-程序员宅基地

文章浏览阅读5k次,点赞5次,收藏9次。IET Radar 2022radar2022October 2022 | Scotland, UKFull paper submission deadline 18 April 2022Author notification 27 June 2022Final revised paper deadline 4 July 2022Author registration deadline 18 July 2022Theme C - Remote Sensing from Airborne_iet radar conference 2023

Clion变量无法跳转到声明处_clion 找不到要转到的声明-程序员宅基地

文章浏览阅读8.1k次。问题:使用Clion查看代码时,对某变量ctrl+鼠标左键能跳转到该变量声明处,但有时会失败。解决方法:重启Clion,再次打开该工程,会自动重新加载符号,然后就能跳转成功。..._clion 找不到要转到的声明

分享一个It学习的好网站itsoku-程序员宅基地

文章浏览阅读264次。最近去学习一个新东西的时候,无意间发现一个比较不错的学习网站推荐给大家,www.itsoku.com 里面的学习教程和博文都不错。_itsoku

利用福禄克光纤测试仪了解综合布线_综合布线福禄克测试能测带宽-程序员宅基地

文章浏览阅读176次。对于布线基础设施,并非所有介质和应用程序都是相同的。因此,对于每个链接,从媒体类型和应用程序的角度了解您正在测试的内容都很重要。媒体混合泳通常,客户会确定他们只需要5e类或6类的某些应用程序,而需要6A类的其他应用程序,或者他们可能意识到需要升级某些但不是所有链接。以802.11ac Wi-Fi为例。随着新的Wave 2无线接入点(WAP)进入市场,能够提供高达7千兆的数据,一些人可能担心较长的5e或6类电缆敷设将导致吞吐量下降,因此决定为这些链路部署6A类。再以以太网供电为例。对于需要_综合布线福禄克测试能测带宽

随便推点

floor() 函数_floor()函数-程序员宅基地

文章浏览阅读4.9k次,点赞6次,收藏13次。查看更多https://www.yuque.com/docs/share/2a8366d1-bdc5-4faf-95ed-655beea8d8f9_floor()函数

74cms CSRF漏洞可以诱骗添加管理员-程序员宅基地

文章浏览阅读2.3k次。74cms CSRF漏洞可以诱骗添加管理员

少年郎,我这里有一份nginx配置,你拿走吧-程序员宅基地

文章浏览阅读5.7k次。首先,直接上干货:user root;worker_processes 4;error_log /var/log/nginx/error.log;#pid logs/nginx.pid;events { use epoll; worker_connections 65536; accept_mutex on; multi_acc..._large_client_header_buffers 4 128k; underscores_in_headers on; client_header

「小程序」信息无障碍初体验_微信小程序无障碍版本-程序员宅基地

文章浏览阅读1k次。是的,千呼万唤,微信小程序始出来。微信小程序正式发布后,中国信息无障碍产品联盟(CAPA)的信息无障碍工程师们在第一时间对其无障碍情况进行了体验。共选取了滴滴出行DiDi、饿了么外卖服务、今日头条lite三款小程序,在Android及iOS平台上分别通过读屏软件进行操作使用,并记录下其对应的表现。 体验所使用的机型及相关环境如下: 华为 P9 + Android 7.0 + EMui 5..._微信小程序无障碍版本

FPGA的基本组成结构_fpga组成-程序员宅基地

文章浏览阅读1.8w次,点赞11次,收藏146次。目前主流的FPGA芯片仍是基于查找表。FPGA芯片主要由以下6部分组成:(1)可编程输入输出单元(IOB)(2)基本可编程逻辑单元(CLB)(3)完整的时钟管理模块(4)丰富的布线资源(5)嵌入式块RAM(6)内嵌的底层功能单元和嵌入式专用硬核通过配置以上6个不同的部分,基本可以让FPGA实现任何你想要实现的功能。一、FPGA的结构解析对于一款芯片,我们肉眼看到..._fpga组成

java客户端go服务端_grpc(3):使用 golang 开发 grpc 服务端和客户端-程序员宅基地

文章浏览阅读207次。1,关于grpc-gogolang 可以可以做grpc的服务端和客户端。官网的文档:http://www.grpc.io/docs/quickstart/go.htmlhttps://github.com/grpc/grpc-go和之前写的java的grpc客户端调用相同。也需要使用protobuf的配置文件。但是golang下面的类库非常的简单,而且golang的性能也很强悍呢。有些简单的业务逻..._java proto3 客户端