RabbitMQ的通配符模式Demo_rabbitmq 通配符模式 唯一-程序员宅基地

技术标签: RabbitMQ  中间件  

案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种 通知类型都接收的则两种通知都有效。
生产者:

package com.xuecheng.rabbitmq.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: 星仔
 * @Date: 2018/12/24 21:31
 * @Description:
 */
public class ProducerTest04 {
    


    //声明队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    //声明交换机的名称
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
    //声明RoutingKey
    private static final String ROUTINGKEY_SMS = "inform.#.sms.#";
    private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
    
        Connection connection = null;
        Channel channel = null;

        try {
    
            //创建连接工厂,建立连接
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            //创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
            factory.setVirtualHost("/");
            //创建于MQ服务的TCP连接
            connection = factory.newConnection();
            //创建于EXchange的通道,每一个通道都相当于一个会话事务
            channel = connection.createChannel();

            //声明交换机 String exchange,BuiltinExchangeType type
            /**
             * 参数明细:
             * exchange:交换机名称
             * type:交换机的类型
             *      fanout,topic,direct,headers
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //声明队列,如果RabbitMQ中没有该队列,则会创建
            /**
             *参数:String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> params
             *参数明细:
             * 1. queue:队列名称
             * 2. durable:是否持久化,如果持久化,将MQ重启之后队列还在
             * 3. exclusive: 是否独占连接,队列只允许在该队列中访问,一旦连接关闭,该队列将自动删除,如果将此参数设置为true,那么可用于临时队列的创建
             * 4. autoDelete:自动删除,队列不再使用时是否关闭,如果将此参数设置为true将exclusive设置为true,可用于创建临时队列
             * 5. params: 可以设置队列的一些扩展参数,比如设置存活时间等等
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //交换机和队列进行绑定String queue, String exchange, String routingKey
            /**
             * 参数明细:
             * queue:队列的名称
             * exchange:交换机的名称
             * routinfkey:路由key
             */
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            //发送消息
            /**
             * 参数:String exchange,String routingKey,String props,Byte[] body
             * 参数明细:
             * 1、exchange: 交换机,如果不使用,将使用MQ的默认交换机
             * 2、routingKey: 路由key,交换机根据路由key将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
             * 3、props:消息的属性
             * 4、body: 消息内容
             */
//            for (int i=0; i<5;i++){
    
//                String msg = "send message to sms"+i;
//                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,msg.getBytes());
//            }
//            for (int i=0; i<5;i++){
    
//                String msg = "send message to email"+i;
//                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,msg.getBytes());
//            }
            for (int i=0; i<5;i++){
    
                String msg = "send message to sms and email"+i;
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,msg.getBytes());
            }
        } catch (IOException e) {
    
            e.printStackTrace();
        } catch (Exception e) {
    
            e.printStackTrace();
        }finally{
    
            //关闭连接,先关闭通道,在关闭连接
            if(channel!=null){
    
                channel.close();
            }
            if(connection!=null){
    
                connection.close();
            }
        }
    }

}

消费者:

package com.xuecheng.rabbitmq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: 星仔
 * @Date: 2018/12/24 22:43
 * @Description:
 */
public class ConsumeTestSms {
    

    //声明队列名称
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    //声明交换机的名称
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
    //声明RoutingKey
    private static final String ROUTINGKEY_SMS = "inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
    
        //创建连接工厂,建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        factory.setVirtualHost("/");
        //创建于MQ服务的TCP连接
        Connection connection = factory.newConnection();
        //创建于EXchange的通道,每一个通道都相当于一个会话事务
        Channel channel = connection.createChannel();
        /**
         * 参数明细:
         * exchange:交换机名称
         * type:交换机的类型
         *      fanout,topic,direct,headers
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
        //声明队列
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //声明消费消息的方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
    
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException{
    
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id,mq在channel中用来标识消息的id,用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
            }
        };
        /**
         * 监听队列
         * 参数:String queue, boolean autoAck,Consumer callback
         * 参数明细:
         * 1.queue: 队列名称
         * 2.autoAck: 自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
         * 3.callback: 消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
    }
}
package com.xuecheng.rabbitmq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Auther: 星仔
 * @Date: 2018/12/24 22:43
 * @Description:
 */
public class ConsumeTestEmail {
    

    //声明队列名称
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    //声明交换机的名称
    private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
    //声明RoutingKey
    private static final String ROUTINGKEY_EMAIL = "inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
    
        //创建连接工厂,建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        //创建虚拟主机,rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        factory.setVirtualHost("/");
        //创建于MQ服务的TCP连接
        Connection connection = factory.newConnection();
        //创建于EXchange的通道,每一个通道都相当于一个会话事务
        Channel channel = connection.createChannel();
        /**
         * 参数明细:
         * exchange:交换机名称
         * type:交换机的类型
         *      fanout,topic,direct,headers
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //声明队列
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
        //声明消费消息的方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
    
            /**
             * 消费者接收消息调用此方法
             * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
             * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送)
             * @param properties 消息属性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException{
    
                //交换机
                String exchange = envelope.getExchange();
                //路由key
                String routingKey = envelope.getRoutingKey();
                //消息id,mq在channel中用来标识消息的id,用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String msg = new String(body,"utf-8"); System.out.println("receive message.." + msg);
            }
        };
        /**
         * 监听队列
         * 参数:String queue, boolean autoAck,Consumer callback
         * 参数明细:
         * 1.queue: 队列名称
         * 2.autoAck: 自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
         * 3.callback: 消费消息的方法,消费者接收到消息后调用此方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
    }
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_43794897/article/details/85332026

智能推荐

微信支付退款 java_Java后台实现微信支付和微信退款-程序员宅基地

文章浏览阅读600次。微信支付流程都是我自己工作中开发的,亲测可用,不喜勿喷。controller中我是这么写的,你们需要根据自己的业务需求改动。ResponseBean是我自己封装的,你们可以改成你们想要的形式。/*** 微信统一下单接口* @return*/@RequestMapping(value = "/doUnifiedOrder", method = RequestMethod.POST)public Re..._java+微信支付 dounifiedrefund

SSDB - 官方文档 - 中文_ssdb 中文文档-程序员宅基地

文章浏览阅读1.2k次。文档SSDB 是一个 C/C++ 语言开发的高性能 NoSQL 数据库, 支持 KV, list, map(hash), zset(sorted set) 等数据结构, 用来替代或者与 Redis 配合存储十亿级别列表的数据.SSDB 是稳定的, 生产环境使用的, 已经在许多互联网公司得到广泛使用, 如奇虎 360, TOPGAME.开始FAQ: FAQ - 常见问_ssdb 中文文档

python通过cx_Oracle连接oracle_测试cx_oracle的连接-程序员宅基地

文章浏览阅读332次。python连接数据需要做两步:1、安装cx_Oracle:可以通过pip install cx_Oracle来安装2、安装instantclient:下载instantclient,注意对应的版本。官方下载:https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html或者下面百度云instantclient_11_2版本,linux和windows都有链接:.._测试cx_oracle的连接

Python踩坑之路-Python-3.6 安装pycrypto 2.6.1各种疑难杂症及解决方案_pip install pycrypto==2.6.1-程序员宅基地

文章浏览阅读7.3w次,点赞34次,收藏77次。最近接触公司后台管理系统的开发,其中涉及到加密模块pycrypto。 重点来了!!!!敲黑板!!!! pycrypto在PyCharm中跟其他的模块不一样,pip install pycrypto安装的是1.4.1版本,然后虽然模块能够install成功,但实际上代码还是无法关联起来,运行会报异常。 百度解决方案:下载pycrypto源码安装 不过安装pycrypto源码可不是这么简单的事情_pip install pycrypto==2.6.1

linux内核printk用法以及相关调试_kernel printf 判断空-程序员宅基地

文章浏览阅读647次。1、内核中打印信息,只能使用printk,不能使用printf。函数原型:int printk(const char *fmt, ...);2、printk打印等级#define KERN_EMERG “&amp;amp;amp;lt;0&amp;amp;amp;gt;” /* system is unusable /#define KERN_ALERT “&amp;amp;amp;lt;1&amp;amp;amp_kernel printf 判断空

Docker容器获取外部文件参考_docker中jar包外部的资源文件-程序员宅基地

文章浏览阅读6k次。容器中获取文件参考如果项目中需要用到操作文件,文件又放在项目的resource下面,通过 Resource resource = new ClassPathResource("templates/template.xls"); File file = resource.getFile(); 这种方式在编辑器中能成功,但是打成jar包之后就报错了,是因为灵雀云上的容器内部是jar包运行,jar包没有绝对路径,所有报错,下面给出两种解决方案:方案1(推荐)将需要操作的文件添加到docke_docker中jar包外部的资源文件

随便推点

SQL注入_WITH ROLLUP绕过_04 sql注入_with rollup绕过-程序员宅基地

文章浏览阅读930次。题目地址http://ctf5.shiyanbar.com/web/pcat/index.php<?php error_reporting(0); if (!isset($_POST['uname']) || !isset($_POST['pwd'])) { echo '<form action="" method="post">'...._04 sql注入_with rollup绕过

linux内核获取系统调用表地址,多种方法获取sys_call_table(Linux系统调用表)的地址...-程序员宅基地

文章浏览阅读496次。一.方法一:常用方式我们首先需要找到call table-with-offset的特征,先看下面的代码syscall_call:call *sys_call_table(,%eax,4)假设我们没有vmlinux可供gdb反汇编,那也只有采用模拟的方式了,模拟出一个call *sys_call_table(,%eax,4),然后看其机器码,然后在system_call的附近基于这个特征进行寻找 :..._sys_call_table

osg 三维gis开发_osgEarth在三维GIS开发中的研究与应用.pdf-程序员宅基地

文章浏览阅读288次。osgEarth在三维GIS开发中的研究与应用.pdf2017年4月 现代防御技术 Apr.2017第45卷第2期 MoDERNDEFENCETECHNoLoGY V01.45N0.2旺于_-t-e.栏气习韩哲1,刘玉明1,管文艳1,王雷2(1.经济领域系统仿真技术应用..._osg三维开发pdf

tensorflow 利用expand_dims和squeeze扩展和压缩tensor维度_tensor squeeze 压缩成一个维度-程序员宅基地

文章浏览阅读8.9k次,点赞5次,收藏10次。tensorflow 利用expand_dims和squeeze扩展和压缩tensor维度在利用tensorflow进行文本挖掘工作的时候,经常涉及到维度扩展和压缩工作。比如对文本进行embedding操作完成之后,若要进行卷积操作,就需要对embedded的向量扩展维度,将[batch_size, embedding_dims]扩展成为[batch_size, embedding_dims,..._tensor squeeze 压缩成一个维度

Android源码解析之(十五)-->Activity销毁流程_安卓activity调用onstop源码-程序员宅基地

文章浏览阅读2w次,点赞5次,收藏18次。继续我们的源码解析,上一篇文章我们介绍了Activity的启动流程,一个典型的场景就是Activity a 启动了一个Activity b,他们的生命周期回调方法是: onPause(a) –> onCreate(b) –> onStart(b) –> onResume(b) –> onStop(a) 而我们根据源码也验证了这样的生命周期调用序列,那么Activity的销毁流程呢?它的生命周期_安卓activity调用onstop源码

如何在Android TV 桌面添加自定义频道/节目-程序员宅基地

文章浏览阅读586次。最近在做Android TV O的项目,需要在TV 桌面添加自定义频道/节目,节目的背景图片要显示为SD卡或者缓存目录里面的图片。添加自定义频道节目背景显示本地目录的图片一、添加频道1. 首先新建频道、节目实体类,属性如下。public class MediaChannel { private final String mName; private final Str..._tvcontractcompat