Linux下RocketMQ下载安装教程以及RocketMQ使用教程和代码_rocketmq linux4.3.2下载安装-程序员宅基地

技术标签: spring  JAVA基础和应用  java  队列  

一、安装及启动

1、将zip文件上传到linux的~下,然后解压:

执行:unzip rocketmq-all-4.4.0-bin-release.zip,若unzip命令不存在,执行:yum install -y unzip zip

2、移动并重命名

执行:mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq 

3、进入/usr/local/rocketmq/bin目录下

执行:cd /usr/local/rocketmq/bin

4、如果是在真实的linux服务器上安装rocketmq,则内存是足够的,如果是在虚拟机上安装rocketmq进行学习用,可能虚拟机的内存只分配了1G,那么是不够用的。启动rocketmq可能会启动不成功。此时就先修改一下启动所需的内存参数。

(1)修改runserver.sh 

执行:vi runserver.sh

将其内存修改为128m并保存,如下:

 

(2)修改runbroker.sh

同理将其修改为128m(与修改runserver.sh一样)

5、启动nameserver,在bin目录下执行:nohup sh mqnamesrv &,查看启动是否成功的日志命令:tail -f ~/logs/rocketmqlogs/namesrv.log ,打印如下表示启动成功:

 

6、启动broker,在bin目录下执行:nohup sh mqbroker -n localhost:9876 & ,也可以 nohup sh mqbroker -c ../conf/broker.conf  -n 192.168.153.128:9876 autoCreateTopicEnable=true &  
这样启动的服务器可以自动创建主题(客户端),不过生产一般不推荐.

查看日志命令:tail -f ~/logs/rocketmqlogs/broker.log,打印如下表示启动成功:

 

三、安装可视化插件

1、下载地址:https://pan.baidu.com/s/16ZoOkYEBtNq1sGp0cuk3cA  提取码:ms2g

2、上传到linux自己指定一个目录并使用unzip命令解压(步骤略,可参考上面rocketmq的上传解压步骤)

3、进入rocketmq-console/src/main/resources目录下,打开application.properties并修改保存如下:

 

4、进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成,该过程耗时较长,最终生成成功后如下:

 

5、便已完成,会生成target目录,进入target目录后执行:java -jar rocketmq-console-ng-1.0.1.jar,启动完成后,

在浏览器输入:http://192.168.153.128:9876,出现如下界面

 

完毕!

生产者代码

MyProducer.java

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class MyProducer {

private final Logger logger = LoggerFactory.getLogger(MyProducer.class);

private DefaultMQProducer defaultMQProducer;

private String producerGroup;

private String namesrvAddr;


/**

 * Spring bean init-method

 */
public void init() throws MQClientException {

// 参数信息

logger.info("DefaultMQProducer initialize!");

logger.info(producerGroup);

logger.info(namesrvAddr);

// 初始化

defaultMQProducer = new DefaultMQProducer(producerGroup);

defaultMQProducer.setNamesrvAddr(namesrvAddr);

defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));


defaultMQProducer.start();

System.out.println("DefaultMQProudcer start success!");

}

/**

 * Spring bean destroy-method

 */

public void destroy() {

defaultMQProducer.shutdown();

}

public DefaultMQProducer getDefaultMQProducer() {

return defaultMQProducer;

}

// ---------------setter -----------------

public void setProducerGroup(String producerGroup) {

this.producerGroup = producerGroup;

}

public void setNamesrvAddr(String namesrvAddr) {

this.namesrvAddr = namesrvAddr;

}

}

 

消费者代码

MyConsumer .java


import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
public class MyConsumer {

	private final Logger logger = LoggerFactory.getLogger(MyConsumer.class);
	    
	private DefaultMQPushConsumer defaultMQPushConsumer;

	private String namesrvAddr;
	    
	private String consumerGroup;

	    /**
		 * 
		 *      * Spring bean init-method
		 * 
		 *      
		 */
	public void init() throws InterruptedException, MQClientException {

        // 参数信息

        logger.debug("DefaultMQPushConsumer initialize!");

        logger.debug(consumerGroup);

        logger.debug(namesrvAddr);

        // 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>

        // 注意:ConsumerGroupName需要由应用来保证唯一

        defaultMQPushConsumer = new DefaultMQPushConsumer(consumerGroup);

        defaultMQPushConsumer.setNamesrvAddr(namesrvAddr);

        defaultMQPushConsumer.setInstanceName(String.valueOf(System.currentTimeMillis()));

        // 订阅指定MyTopic下tags等于MyTag

        defaultMQPushConsumer.subscribe("MyTopic", "MyTag");

        // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>

        // 如果非第一次启动,那么按照上次消费的位置继续消费

        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 设置为集群消费(区别于广播消费)

        defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);

        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {


            // 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息

            @Override

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {



                MessageExt msg = msgs.get(0);

                if (msg.getTopic().equals("MyTopic")) {

                    // TODO 执行Topic的消费逻辑

                 System.out.println("接收到了消息啊=========="+msg.toString());

                    if (msg.getTags() != null && msg.getTags().equals("MyTag")) {

                        // TODO 执行Tag的消费
                    }

                }
                // 如果没有return success ,consumer会重新消费该消息,直到return success

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }

        });
        // Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>

        defaultMQPushConsumer.start();

        logger.debug("DefaultMQPushConsumer start success!");

    }   	   
	/**
	  *  Spring bean destroy-method
	  *      
	  */

	public void destroy() {

        defaultMQPushConsumer.shutdown();

    }
	    // ----------------- setter --------------------

	public void setNamesrvAddr(String namesrvAddr) {

        this.namesrvAddr = namesrvAddr;

    }

	public void setConsumerGroup(String consumerGroup) {

        this.consumerGroup = consumerGroup;

    }

}

生产者配置文件producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans

       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

       http://www.springframework.org/schema/jee

      http://www.springframework.org/schema/jee/spring-jee-3.0.xsd

      http://www.springframework.org/schema/context

       http://www.springframework.org/schema/context/spring-context-3.0.xsd">

 <bean id="myProducer" class="net.sinolbs.rocketmq.MyProducer" init-method="init"         
    destroy-method="destroy" scope="singleton">

    <property name="producerGroup" value="MyProducerGroup" />

    <property name="namesrvAddr" value="127.0.0.1:9876" />

 </bean>

</beans>

消费者配置文件:consumer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"                 
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans

       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

       http://www.springframework.org/schema/tx

       http://www.springframework.org/schema/tx/spring-tx-3.0.xsd

       http://www.springframework.org/schema/jee

      http://www.springframework.org/schema/jee/spring-jee-3.0.xsd

      http://www.springframework.org/schema/context

       http://www.springframework.org/schema/context/spring-context-3.0.xsd">

<bean class="net.sinolbs.test.rocketmq.MyConsumer" init-method="init"

destroy-method="destroy" scope="singleton">

<property name="consumerGroup" value="MyConsumerGroup" />

<property name="namesrvAddr" value="127.0.0.1:9876" />

</bean>

</beans>

然后将这两个xml导入到spring的配置文件里

<import resource="classpath:rocketmq/producer.xml" />
<import resource="classpath:rocketmq/consumer.xml" />

到此就可以使用了。

 

rocketMq发送端代码:

package com.wechat.tool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

public class RocketMqProducer {

    private final Logger logger = LoggerFactory.getLogger(RocketMqProducer.class);

    private DefaultMQProducer defaultMQProducer;
    private String producerGroup;
    private String namesrvAddr;

    /**
     * Spring bean init-method
     */
    public void init() throws MQClientException {
        // 参数信息
        logger.info("DefaultMQProducer initialize!");
        logger.info(producerGroup);
        logger.info(namesrvAddr);

        // 初始化
        defaultMQProducer = new DefaultMQProducer(producerGroup);
        defaultMQProducer.setNamesrvAddr(namesrvAddr);
        defaultMQProducer.setInstanceName(String.valueOf(System.currentTimeMillis()));
        
        defaultMQProducer.start();

        logger.info("DefaultMQProudcer start success!");

    }

    /**
     * Spring bean destroy-method
     */
    public void destroy() {
        defaultMQProducer.shutdown();
    }

    public DefaultMQProducer getDefaultMQProducer() {
        return defaultMQProducer;
    }

    // ---------------setter -----------------

    public void setProducerGroup(String producerGroup) {
        this.producerGroup = producerGroup;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

}

 

该对象的实例初始化与销毁都是由spring来控制,RocketMqProducer需要保持单例,主要是由spring配置文件控制,下面是RocketMqProducer相关配置

<bean id="rocketMqProducer" class="com.wechat.tool.RocketMqProducer"
     init-method="init"   destroy-method="destroy" scope="singleton">
    <property name="producerGroup" value="PushConsumer" />
    <property name="namesrvAddr" value="127.0.0.1:9876" />
</bean>

springmvc控制层这块主要调用RocketMqProducer单例对象,对RocketMq进行消息队列的发送,下列只列出部分控制层代码

@Autowired
private RocketMqProducer rocketMqProducer;


@RequestMapping(value = "/coupon/couponsend.htm")
@ResponseBody()
public Map<String,String> couponsend(){
String sendStatus ="";
String msgId ="";
try{

String msgtext="优惠券领取"+DTUtil.getTodayString4();
Message msg = new Message("PushTopic",   
"push",   
"coupon"+DTUtil.getNowLongTime(),   
msgtext.getBytes());  
          
SendResult result = rocketMqProducer.getDefaultMQProducer().send(msg);
System.out.println("id:" + result.getMsgId() +  
                    " result:" + result.getSendStatus());
msgId = result.getMsgId();
sendStatus=result.getSendStatus()+"";
}catch (Exception e) {
e.printStackTrace();
}
Map<String,String> m=new HashMap<String, String>();
m.put("sendStatus", sendStatus);
m.put("msgId", msgId);
return m;
}

rocketMq消费者端:

package com.wechat.test;

import java.util.List;  
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;  
import com.alibaba.rocketmq.common.message.Message;  
import com.alibaba.rocketmq.common.message.MessageExt;

public class PushConsumer {
public static void main(String[] args){  
        DefaultMQPushConsumer consumer =   
                new DefaultMQPushConsumer("PushConsumer");  
        consumer.setNamesrvAddr("127.0.0.1:9876");   
        try {  
            //订阅PushTopic下Tag为push的消息  
            consumer.subscribe("PushTopic", "push");  
            //程序第一次启动从消息队列头取数据  
            consumer.setConsumeFromWhere(  
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.registerMessageListener(  
                new MessageListenerConcurrently() {  
                    public ConsumeConcurrentlyStatus consumeMessage(  
                            List<MessageExt> list,  
                            ConsumeConcurrentlyContext Context) {  
                        Message msg = list.get(0);  
                        System.out.println(msg.getKeys());  
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
                    }  
                }  
            );  
            consumer.start();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }
}

消费者开启后就不用关闭

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhangbinch/article/details/105491606

智能推荐

从零开始搭建Hadoop_创建一个hadoop项目-程序员宅基地

文章浏览阅读331次。第一部分:准备工作1 安装虚拟机2 安装centos73 安装JDK以上三步是准备工作,至此已经完成一台已安装JDK的主机第二部分:准备3台虚拟机以下所有工作最好都在root权限下操作1 克隆上面已经有一台虚拟机了,现在对master进行克隆,克隆出另外2台子机;1.1 进行克隆21.2 下一步1.3 下一步1.4 下一步1.5 根据子机需要,命名和安装路径1.6 ..._创建一个hadoop项目

心脏滴血漏洞HeartBleed CVE-2014-0160深入代码层面的分析_heartbleed代码分析-程序员宅基地

文章浏览阅读1.7k次。心脏滴血漏洞HeartBleed CVE-2014-0160 是由heartbeat功能引入的,本文从深入码层面的分析该漏洞产生的原因_heartbleed代码分析

java读取ofd文档内容_ofd电子文档内容分析工具(分析文档、签章和证书)-程序员宅基地

文章浏览阅读1.4k次。前言ofd是国家文档标准,其对标的文档格式是pdf。ofd文档是容器格式文件,ofd其实就是压缩包。将ofd文件后缀改为.zip,解压后可看到文件包含的内容。ofd文件分析工具下载:点我下载。ofd文件解压后,可以看到如下内容: 对于xml文件,可以用文本工具查看。但是对于印章文件(Seal.esl)、签名文件(SignedValue.dat)就无法查看其内容了。本人开发一款ofd内容查看器,..._signedvalue.dat

基于FPGA的数据采集系统(一)_基于fpga的信息采集-程序员宅基地

文章浏览阅读1.8w次,点赞29次,收藏313次。整体系统设计本设计主要是对ADC和DAC的使用,主要实现功能流程为:首先通过串口向FPGA发送控制信号,控制DAC芯片tlv5618进行DA装换,转换的数据存在ROM中,转换开始时读取ROM中数据进行读取转换。其次用按键控制adc128s052进行模数转换100次,模数转换数据存储到FIFO中,再从FIFO中读取数据通过串口输出显示在pc上。其整体系统框图如下:图1:FPGA数据采集系统框图从图中可以看出,该系统主要包括9个模块:串口接收模块、按键消抖模块、按键控制模块、ROM模块、D.._基于fpga的信息采集

微服务 spring cloud zuul com.netflix.zuul.exception.ZuulException GENERAL-程序员宅基地

文章浏览阅读2.5w次。1.背景错误信息:-- [http-nio-9904-exec-5] o.s.c.n.z.filters.post.SendErrorFilter : Error during filteringcom.netflix.zuul.exception.ZuulException: Forwarding error at org.springframework.cloud..._com.netflix.zuul.exception.zuulexception

邻接矩阵-建立图-程序员宅基地

文章浏览阅读358次。1.介绍图的相关概念  图是由顶点的有穷非空集和一个描述顶点之间关系-边(或者弧)的集合组成。通常,图中的数据元素被称为顶点,顶点间的关系用边表示,图通常用字母G表示,图的顶点通常用字母V表示,所以图可以定义为:  G=(V,E)其中,V(G)是图中顶点的有穷非空集合,E(G)是V(G)中顶点的边的有穷集合1.1 无向图:图中任意两个顶点构成的边是没有方向的1.2 有向图:图中..._给定一个邻接矩阵未必能够造出一个图

随便推点

MDT2012部署系列之11 WDS安装与配置-程序员宅基地

文章浏览阅读321次。(十二)、WDS服务器安装通过前面的测试我们会发现,每次安装的时候需要加域光盘映像,这是一个比较麻烦的事情,试想一个上万个的公司,你天天带着一个光盘与光驱去给别人装系统,这将是一个多么痛苦的事情啊,有什么方法可以解决这个问题了?答案是肯定的,下面我们就来简单说一下。WDS服务器,它是Windows自带的一个免费的基于系统本身角色的一个功能,它主要提供一种简单、安全的通过网络快速、远程将Window..._doc server2012上通过wds+mdt无人值守部署win11系统.doc

python--xlrd/xlwt/xlutils_xlutils模块可以读xlsx吗-程序员宅基地

文章浏览阅读219次。python–xlrd/xlwt/xlutilsxlrd只能读取,不能改,支持 xlsx和xls 格式xlwt只能改,不能读xlwt只能保存为.xls格式xlutils能将xlrd.Book转为xlwt.Workbook,从而得以在现有xls的基础上修改数据,并创建一个新的xls,实现修改xlrd打开文件import xlrdexcel=xlrd.open_workbook('E:/test.xlsx') 返回值为xlrd.book.Book对象,不能修改获取sheett_xlutils模块可以读xlsx吗

关于新版本selenium定位元素报错:‘WebDriver‘ object has no attribute ‘find_element_by_id‘等问题_unresolved attribute reference 'find_element_by_id-程序员宅基地

文章浏览阅读8.2w次,点赞267次,收藏656次。运行Selenium出现'WebDriver' object has no attribute 'find_element_by_id'或AttributeError: 'WebDriver' object has no attribute 'find_element_by_xpath'等定位元素代码错误,是因为selenium更新到了新的版本,以前的一些语法经过改动。..............._unresolved attribute reference 'find_element_by_id' for class 'webdriver

DOM对象转换成jQuery对象转换与子页面获取父页面DOM对象-程序员宅基地

文章浏览阅读198次。一:模态窗口//父页面JSwindow.showModalDialog(ifrmehref, window, 'dialogWidth:550px;dialogHeight:150px;help:no;resizable:no;status:no');//子页面获取父页面DOM对象//window.showModalDialog的DOM对象var v=parentWin..._jquery获取父window下的dom对象

什么是算法?-程序员宅基地

文章浏览阅读1.7w次,点赞15次,收藏129次。算法(algorithm)是解决一系列问题的清晰指令,也就是,能对一定规范的输入,在有限的时间内获得所要求的输出。 简单来说,算法就是解决一个问题的具体方法和步骤。算法是程序的灵 魂。二、算法的特征1.可行性 算法中执行的任何计算步骤都可以分解为基本可执行的操作步,即每个计算步都可以在有限时间里完成(也称之为有效性) 算法的每一步都要有确切的意义,不能有二义性。例如“增加x的值”,并没有说增加多少,计算机就无法执行明确的运算。 _算法

【网络安全】网络安全的标准和规范_网络安全标准规范-程序员宅基地

文章浏览阅读1.5k次,点赞18次,收藏26次。网络安全的标准和规范是网络安全领域的重要组成部分。它们为网络安全提供了技术依据,规定了网络安全的技术要求和操作方式,帮助我们构建安全的网络环境。下面,我们将详细介绍一些主要的网络安全标准和规范,以及它们在实际操作中的应用。_网络安全标准规范