技术标签: 微服务 spring cloud 分布式 rabbitmq
MQ,中文就是消息队列(MessageQueue),字面意思就是存放消息的队列,也就是事件驱动架构中的broker
比较常见的MQ实现:
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
单机部署
下载镜像:
docker pull rabbitmq:3.8-management
安装MQ:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:
实现publisher
建立连接
//建立连接
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
Connection connection = factory.newConnection();
创建Channel
//创建通道Channel
Channel channel = connection.createChannel();
声明队列
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
发送消息
//发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
关闭连接和channel
//关闭通道和连接
channel.close();
connection.close();
实现consumer
建立连接
// 建立连接
ConnectionFactory factory = new ConnectionFactory();
//设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
Connection connection = factory.newConnection();
创建Channel
//创建通道Channel
Channel channel = connection.createChannel();
声明队列
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
订阅消息
//订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
//处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
总结:
基本消息队列的消息发送流程:
建立connection
创建channel
利用channel声明队列
利用channel向队列发送消息
基本消息队列的消息接收流程:
建立connection
创建channel
利用channel声明队列
定义consumer的消费行为handleDelivery()
利用channel将消费者与队列绑定
仅仅一个简单队列模式,写着就如此复杂,所以我们可以使用SpringAMOP来进行
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123456 # 密码
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
spring:
rabbitmq:
host: 192.168.150.101 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: root # 用户名
password: 123456 # 密码
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
可以看到,在订阅模型中,多了一个exchange,过程也有略微变化。
注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的作用是什么?
声明队列、交换机、绑定关系的Bean是什么?
在广播模式下,消息发送流程是这样的:
在广播模式下,交换机会将消息转发给所有绑定的队列
在消费者创建一个config类,用来声明交换机和队列
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
//消费者1
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {
"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
//消费者2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {
"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
//在发送时,需要指定一个key
话题的key可以由多个单词组成,只有一个key,用.分割
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
//消费者1
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
//消费者2
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
Spring会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:
在publisher和consumer两个服务中都引入依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
在启动类中添加一个Bean即可:
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
文章浏览阅读451次。dev/mem: 物理内存的全镜像。可以用来访问物理内存。/dev/kmem: kernel看到的虚拟内存的全镜像。可以用来访问kernel的内容。调试嵌入式Linux内核时,可能需要查看某个内核变量的值。/dev/kmem正好提供了访问内核虚拟内存的途径。现在的内核大都默认禁用了/dev/kmem,打开的方法是在 make menuconfig中选中 device drivers --> ..._dev/mem 源码实现
文章浏览阅读7.1k次,点赞2次,收藏19次。vxe-table,一个小众但功能齐全并支持excel操作的vue表格组件_vxe-table
文章浏览阅读62次。参考:http://www.ruanyifeng.com/blog/2016/01/babel.htmlBabelBabel是一个广泛使用的转码器,可以将ES6代码转为ES5代码,从而在现有环境执行// 转码前input.map(item => item + 1);// 转码后input.map(function (item) { return item..._让开发环境支持bable
文章浏览阅读2.8k次,点赞6次,收藏29次。摘要:FPGA视频处理FIFO的典型应用,视频输入FIFO的作用,视频输出FIFO的作用,视频数据跨时钟域FIFO,视频缩放FIFO的作用_fpga 频分复用 视频
文章浏览阅读575次。【代码】R语言:设置工作路径为当前文件存储路径。_r语言设置工作目录到目标文件夹
文章浏览阅读452次。格式:background: linear-gradient(direction, color-stop1, color-stop2, ...);<linear-gradient> = linear-gradient([ [ <angle> | to <side-or-corner>] ,]? &l..._background线性渐变
文章浏览阅读1k次,点赞26次,收藏8次。第十三届蓝桥杯青少年组python编程省赛真题一、题目要求(注:input()输入函数的括号中不允许添加任何信息)1、编程实现给定一个正整数N,输出正整数N中各数位最大的那个数字。例如:N=132,则输出3。2、输入输出输入描述:只有一行,输入一个正整数N输出描述:只有一行,输出正整数N中各数位最大的那个数字输入样例:
文章浏览阅读2.2k次。一个网络协议主要由以下三个要素组成:1.语法数据与控制信息的结构或格式,包括数据的组织方式、编码方式、信号电平的表示方式等。2.语义即需要发出何种控制信息,完成何种动作,以及做出何种应答,以实现数据交换的协调和差错处理。3.时序即事件实现顺序的详细说明,以实现速率匹配和排序。不完整理解:语法表示长什么样,语义表示能干什么,时序表示排序。转载于:https://blog.51cto.com/98..._网络协议三要素csdn
文章浏览阅读153次。主要的思想,将所有的系统都可以看作两部分,真正的数据log系统和各种各样的query engine所有的一致性由log系统来保证,其他各种query engine不需要考虑一致性,安全性,只需要不停的从log系统来同步数据,如果数据丢失或crash可以从log系统replay来恢复可以看出kafka系统在linkedin中的重要地位,不光是d..._the log: what every software engineer should know about real-time data's uni
文章浏览阅读746次。伟大是熬出来的 目录 前言 引言 时间熬成伟大:领导者要像狼一样坚忍 第一章 内圣外王——领导者的心态修炼 1. 天纵英才的自信心 2. 上天揽月的企图心 3. 誓不回头的决心 4. 宠辱不惊的平常心 5. 换位思考的同理心 6. 激情四射的热心 第二章 日清日高——领导者的高效能修炼 7. 积极主动,想到做到 8. 合理掌控自己的时间和生命 9. 制定目标,马..._当狼拖着受伤的右腿逃生时,右腿会成为前进的阻碍,它会毫不犹豫撕咬断自己的腿, 以
文章浏览阅读285次。在当今的大数据时代,人们对高速度和高带宽的需求越来越大,迫切希望有一种新型产品来作为高性能计算和数据中心的主要传输媒质,所以有源光缆(AOC)在这种环境下诞生了。有源光缆究竟是什么呢?应用在哪些领域,有什么优势呢?易天将为您解答!有源光缆(Active Optical Cables,简称AOC)是两端装有光收发器件的光纤线缆,主要构成部件分为光路和电路两部分。作为一种高性能计..._aoc 光缆
文章浏览阅读2.2k次。在“桌面”上按快捷键“Ctrl+R”,调出“运行”窗口。接着,在“打开”后的输入框中输入“Gpedit.msc”。并按“确定”按钮。如下图 找到“用户配置”下的“Windows设置”下的“Internet Explorer 维护”的“连接”,双击选择“自动浏览器配置”。如下图 选择“自动启动配置”,并在下面的“自动代理URL”中填写相应的PAC文件地址。如下..._設置proxy腳本