rocketMq立即消费与延时消费_XStorms的博客-程序员宅基地_rocketmq消费时间

技术标签: spring  java  RocketMq  

pom依赖:

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.9.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>4.9.0</version>
            </dependency>

配置类:

/**
 * @author: datszhang
 * @Date: 2021/8/21 14:58
 * @Description:
 */
public class RocketMqConfig {

    /**
     * 支付成功订单主题
     */
    public static final String PAYED_ORDER_TOPIC = "TRADE_PAYED_ORDER_TOPIC";
    public static final String PAYED_ORDER_GROUP = "TRADE_PAYED_ORDER_GROUP";
}

发送类:

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqSender {

    @Autowired(required = false)
    private RocketMQTemplate rocketMqTemplate;

    private final static int DEFAULT_TIMEOUT = 3000;

    /**
     * 发送重试消息
     */
    public <T> void sendRetry(String topic, MqRetryMessage<T> mqRetryMessage){
        sendDelay(topic, mqRetryMessage, mqRetryMessage.getDelayLevel());
    }

    /**
     * 发送重试消息
     */
    public <T> void sendRetry(String topic, MqRetryMessage<T> mqRetryMessage, SendCallback sendCallback){
        sendDelay(topic, mqRetryMessage, sendCallback, mqRetryMessage.getDelayLevel());
    }

    /**
     * 延时消息
     */
    public <T> void sendDelay(String topic, T message, SendCallback sendCallback, MqDelayLevel delayLevel){
        rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, delayLevel.getLevel());
    }

    /**
     * 实时消息
     */
    public <T> void send(String topic, T message, SendCallback sendCallback){
        rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, 0);
    }


    /**
     * 延时消息
     */
    public <T> void sendDelay(String topic, T message, MqDelayLevel delayLevel){
        sendDelay(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送消息成功 {}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("发送消息失败", throwable);
            }
        }, delayLevel);
    }



    /**
     * 实时消息
     */
    public <T> void send(String topic, T message){
        send(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送消息成功 {}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("发送消息失败", throwable);
            }
        });
    }

    /**
     * 延时消息
     */
    public <T> void sendRocketDelay(String topic, T message,String from, MqDelayLevel delayLevel){
        sendDelay(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("{} 发送消息成功 {}",from, sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("{} 发送消息失败",from, throwable);
            }
        }, delayLevel);
    }



    /**
     * 实时消息
     */
    public <T> void sendRocket(String topic, T message,String from){
        send(topic, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("{} 发送消息成功 {}",from, sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("{} 发送消息失败",from, throwable);
            }
        });
    }


}

延时级别:同原有的配置,只是加了自己需要的时间

import lombok.AllArgsConstructor;
import lombok.Getter;

@AllArgsConstructor
@Getter
public enum MqDelayLevel {

    //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 1w 1m 1y
    ONE_SEC(1),
    FIVE_SEC(2),
    TEN_SEC(3),
    THIRTY_SEC(4),
    ONE_MIN(5),
    TWO_MIN(6),
    THREE_MIN(7),
    FOUR_MIN(8),
    FIVE_MIN(9),
    SIX_MIN(10),
    SEVEN_MIN(11),
    EIGHT_MIN(12),
    NINE_MIN(13),
    TEN_MIN(14),
    TWENTY_MIN(15),
    THIRTY_MIN(16),
    ONE_HOUR(17),
    TWO_HOUR(18),
    ONE_DAY(19),
    ONE_WEEK(20),
    ONE_MONTH(21),
    ONE_YEAR(22)
    ;

    private int level;
}

MQ消息封装类:

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;


@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqMessage <T> implements Message<T> {

    private T body;

    private MessageHeaders headers;

    @Override
    public T getPayload() {
        return body;
    }

    @Override
    public MessageHeaders getHeaders() {
        return headers;
    }
}

延时尝试类:

import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.weibu.cloud.common.utils.DateUtil;
import com.weibu.cloud.common.utils.uuid.UUID;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.time.LocalDateTime;


@Getter
@ToString
@Slf4j
public class MqRetryMessage <T> implements Serializable {

    private static final long serialVersionUID = 408371795120089055L;
    /**
     * 默认延时级别
     */
    private final static MqDelayLevel[] DEFAULT_DELAY_LEVEL = {
            MqDelayLevel.ONE_MIN
            , MqDelayLevel.FIVE_MIN
            , MqDelayLevel.TEN_MIN
            , MqDelayLevel.THIRTY_MIN
            , MqDelayLevel.ONE_HOUR
            , MqDelayLevel.TWO_HOUR
    };

    /**
     * 重试ID
     */
    private final String retryId;

    /**
     * 当前重试次数
     */
    private final int retryTimes;
    /**
     * 重试开始时间
     */
    @JsonFormat(pattern = DateUtil.PATTERN)
    private final LocalDateTime startTime;
    /**
     * 最大重试次数
     */
    private final int maxAttempts;
    /**
     * 延时级别
     */
    private MqDelayLevel[] delayLevelList = DEFAULT_DELAY_LEVEL;
    /**
     * 消息体
     */
    private final T body;

    public MqRetryMessage(){
        this(null, 6);
    }

    /**
     * 构造函数
     */
    public MqRetryMessage(T body){
        this(body, 6);
    }

    public MqRetryMessage(T body, int maxAttempts){
        this(body, maxAttempts, 0, null, UUID.randomUUID().toString());
    }

    private MqRetryMessage(T body, int maxAttempts, int retryTimes, MqDelayLevel[] delayLevelList, String retryId){
        this.retryTimes = retryTimes;
        this.maxAttempts = maxAttempts;
        this.startTime = LocalDateTime.now();
        this.body = body;
        this.retryId = retryId;
        this.setDelayLevelList(delayLevelList);
    }

    /**
     * 是否可以重试
     */
    public boolean canRetry(){
        //maxAttempt 为0时无限制
        if(maxAttempts == 0){
            return true;
        }
        return retryTimes + 1 <= maxAttempts;
    }

    /**
     * 获取下一个重试消息
     */
    public MqRetryMessage <T> nextRetry(){
        if(!canRetry()){
            throw new IllegalStateException("重试次数已到达最大无法重试");
        }

        return new MqRetryMessage<>(body, maxAttempts, retryTimes + 1, delayLevelList, retryId);
    }

    /**
     * 获取当前延时级别
     */
    @JsonIgnore
    public MqDelayLevel getDelayLevel(){
        int index = retryTimes;
        if(retryTimes + 1 > delayLevelList.length){
            index = delayLevelList.length - 1;
        }

        return delayLevelList[index];
    }

    /**
     * 设置DelayLevel
     */
    public void setDelayLevelList(MqDelayLevel[] delayLevelList) {
        if(delayLevelList != null && delayLevelList.length > 0) {
            this.delayLevelList = delayLevelList;
        }
    }
}

MQ消息生产者:

import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.constant.OrderEnum;
import com.weibu.cloud.common.feign.user.UserApi;
import com.weibu.cloud.common.mq.MqSender;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.common.utils.DateUtil;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.dto.UserBrief;
import com.weibu.cloud.trade.entity.WbOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;


@Slf4j
@Component
public class PayedOrderStatisticalProducer {
    @Autowired
    private MqSender mqSender;
    @Autowired
    private UserApi userApi;

    
    public void send(OrderStatisticalItem item){
        mqSender.send(RocketMqConfig.ORDER_STATISTICAL_TOPIC, item, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("statistical send success.sendResult:{}", JSONObject.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable e) {
                log.error("statistical send failed . :{}",e);
            }
        });
    }

    @Async
    public void sendList(List<WbOrder> list){
        log.info("PayedOrderStatisticalProducer.sendList -> list:{}",list);
        if (CollectionUtils.isEmpty(list)){
            return;
        }
        list.forEach(wb -> {
            OrderStatisticalItem item = entityToItem(wb);
            this.send(item);
        });
    }

    public OrderStatisticalItem entityToItem(WbOrder entity){
        OrderStatisticalItem item = new OrderStatisticalItem();
        item.setOrderType(queryEnumType(entity.getOrderType()));
        item.setPayedAmount(entity.getAmountRealPay());
        item.setPayedDate(DateUtil.localDateTimeTransLocalDate(entity.getCreateTime()));
        item.setPayFlag(true);
        List<UserBrief> list = new ArrayList<>(2);
        list.add(UserBrief.build(entity.getAgentId(),2));
        list.add(UserBrief.build(entity.getMerchantId(),3));
        item.setUserIds(list);
        return item;
    }

    private OrderEnum.OrderType queryEnumType(Integer orderType){
        switch (orderType){
            case 0:
                return OrderEnum.OrderType.ALL;
            case 1:
                return OrderEnum.OrderType.AGENT;
            case 2:
                return OrderEnum.OrderType.BIZ;
            case 3:
                return OrderEnum.OrderType.MALL;
        }
        return OrderEnum.OrderType.BIZ;
    }
}

MQ消息消费者:

import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.service.impl.AsyncStatisticalService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqConfig.ORDER_STATISTICAL_TOPIC,
        consumerGroup = RocketMqConfig.ORDER_STATISTICAL_GROUP,
        consumeMode = ConsumeMode.ORDERLY, //并行处理,默认并行;ORDERLY 按顺序处理
        selectorExpression = "*")
public class PayedOrderStatisticalConsumer implements RocketMQListener<OrderStatisticalItem> {
    @Autowired
    private AsyncStatisticalService asyncStatisticalService;

    @Override
    public void onMessage(OrderStatisticalItem item) {
        log.info("PayedOrderStatisticalConsumer -> item:{}", JSONObject.toJSONString(item));
        statisticalOrder(item);
    }

    //重试机制
    
    //@Retryable(value = Throwable.class, backoff = @Backoff(value = 600000L, delay = 300000L, multiplier = 5, maxDelay = 3600000L))
    @Retryable(value = Throwable.class, backoff = @Backoff(delay = 60000L, multiplier = 2), recover = "liquidationNotifyFailed")
    public void statisticalOrder(OrderStatisticalItem item){
        log.info("statisticalOrder -> item:{}",JSONObject.toJSONString(item));
        asyncStatisticalService.updateUserOrderStatistical(item.getUserIds(),item.getPayedAmount(),item.getPayedDate(),item.getOrderType(),item.getPayFlag());
    }
}

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/datszhang/article/details/120696399

智能推荐

2018/7/10功能点解读_「已注销」的博客-程序员宅基地

2018/7/10功能点解读开盘和收盘 开盘:每天股市开始交易称为开盘 收盘:每天股市结束交易称为收盘 注: 沪深A股市场开盘时间是周一到周五,集合竞价时间9:15-9:25,连续竞价时间9:30-11:30,正常交易时间13:00-15:00,深市的14:57-15:00是集合竞价时间 港股证券市场开盘时间是周一至周五,开市前时段9:30-10:00,...

centos6.5中tomcat的安装_zhanghongmei0920的博客-程序员宅基地

一、1、创建tomcat用户和组,并加入组useradd  tomcatpasswd  tomcat (123)groupadd  tomcatusermod  -G tomcat   tomcat2、创建(wls…jar)目录:mkdir  -p /home/tomcat/app3、创建jdk目录:mkdir    /home/jdk          mkdir  /h

Java Annotations_LambdaCC的博客-程序员宅基地

本文译自Java AnnotationsJava Annotation Purposes一般来说,Java Annotations有以下三种用途:Compiler instructionsBuild-time instructionsRuntime instructions构建工具能够扫描Java代码中的annotations并基于这些annotations来生

java导出excel_weixin_37904139的博客-程序员宅基地

一、需求背景  在项目开发中,经常会遇到导出Excel报表文件的情况,因为很多情况下,我们需要打印Excel报表,虽然在网页上也可以生成报表,但是打印网上里的报表是无法处理排版问题的,所以最好的方式,还是生成Excel文件。       PageOffice封装了一组用于动态输出数据到Excel文档的相关类,全部包含在com.zhuozhengsoft.pageoffice.excelwriter...

python学爬虫要多久,python 爬虫自学要多久_weixin_39540834的博客-程序员宅基地

现在之所以有这么多的小伙伴热衷于爬虫技术,无外乎是因为爬虫可以帮我们做很多事情,比如搜索引擎、采集数据、广告过滤等,以Python为例,Python爬虫可以用于数据分析,在数据抓取方面发挥巨大的作用。但是这并不意味着单纯掌握一门Python语言,就对爬虫技术触类旁通,要学习的知识和规范还有喜很多,包括但不仅限于HTML 知识、HTTP/HTTPS 协议的基本知识、正则表达式、数据库知识,常用抓包工...

简介jni(二)_蒲锦_up的博客-程序员宅基地

打开创建的本地头文件,可以看见要实现c++的方法:/* * Class: com_cn_TestJni * Method: getjni * Signature: ()V */JNIEXPORT void JNICALL Java_com_cn_TestJni_getjni (JNIEnv *, jobject);/* * Class: com_cn_

随便推点

HTTP SSL TCP TLS 说的啥_wangxiaoming的博客-程序员宅基地

HTTP SSL TCP TLS 说的啥TCP 建立连接过程HTTPS 的连接建立过程HTTPS 是啥HTTP 是明文传输,意味着端到端之间的任意节点都知道内容是消息传输内容是啥,这些节点可以是 路由器,代理等。HTTPS 就是来解决这个问题的,以安全为目的的 HTTP 通道,全称是 Hyper Text Transfer ProtocolSSL TLS 是啥SSL (secure Sockets Layer 安全套接字)TLS(Transport Layer security,

koa简单用法总结_我爱的昵称为什么都存在的博客-程序员宅基地_koa使用

对koa搭建后台管理系统做一些总结,使用的是腾讯云开发和对象存储作为数据库和存储地点。1.首先要安装node,让后npm init初始化,目的是生成package.json文件,package.json将显示安装了哪些模块。2.安装所需的模块,如下cnpm install koa --save //koa框架cnpm install koa-json --save//返回给前端的json对象cnpm install koa-bodyparser --save//post提交的对......

xampp mysql关机意外_xampp运行MySQL shutdown unexpectedly解决方法_程紫颜的博客-程序员宅基地

xampp运行MySQL数据时出现 Error: MySQL shutdown unexpectedly.解决方案故障描述:12:15:46 [mysql] Attempting to start MySQL app...12:15:47 [mysql] Status change detected: running12:15:47 [mysql] Status change detecte...

Hadoop 新 MapReduce 框架 Yarn 详解【转】_weixin_33809981的博客-程序员宅基地

【转自:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/】简介: 本文介绍了 Hadoop 自 0.23.0 版本后新的 map-reduce 框架(Yarn) 原理,优势,运作机制和配置方法等;着重介绍新的 yarn 框架相对于原框架的差异及改进;并通过 Demo 示例详细描述了在新的 yarn 框架下搭建和...

分段线性变换函数——比特平面分层_weixin_30312557的博客-程序员宅基地

1.比特平面分层比特平面。取代突出灰度级范围,突出特定比特来为整个图像外观做出贡献。(数字图像处理(中卫第3版))以下的样例是该书上使用的实例。这里在matlab中进行实现,帮助大家理解,同一时候请大家提出宝贵的改动意见。思想:使用书中的图片(到官网进行下载)。在8比特下,遍历整个图像,用像素值与各比特面的值(2^(n-1),n为比特面)进行位与操作,推断该像素值在该比特面是否...

推荐文章

热门文章

相关标签