pom依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.9.0</version>
</dependency>
配置类:
/**
* @author: datszhang
* @Date: 2021/8/21 14:58
* @Description:
*/
public class RocketMqConfig {
/**
* 支付成功订单主题
*/
public static final String PAYED_ORDER_TOPIC = "TRADE_PAYED_ORDER_TOPIC";
public static final String PAYED_ORDER_GROUP = "TRADE_PAYED_ORDER_GROUP";
}
发送类:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqSender {
@Autowired(required = false)
private RocketMQTemplate rocketMqTemplate;
private final static int DEFAULT_TIMEOUT = 3000;
/**
* 发送重试消息
*/
public <T> void sendRetry(String topic, MqRetryMessage<T> mqRetryMessage){
sendDelay(topic, mqRetryMessage, mqRetryMessage.getDelayLevel());
}
/**
* 发送重试消息
*/
public <T> void sendRetry(String topic, MqRetryMessage<T> mqRetryMessage, SendCallback sendCallback){
sendDelay(topic, mqRetryMessage, sendCallback, mqRetryMessage.getDelayLevel());
}
/**
* 延时消息
*/
public <T> void sendDelay(String topic, T message, SendCallback sendCallback, MqDelayLevel delayLevel){
rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, delayLevel.getLevel());
}
/**
* 实时消息
*/
public <T> void send(String topic, T message, SendCallback sendCallback){
rocketMqTemplate.asyncSend(topic, MqMessage.builder().body(message).build(), sendCallback, DEFAULT_TIMEOUT, 0);
}
/**
* 延时消息
*/
public <T> void sendDelay(String topic, T message, MqDelayLevel delayLevel){
sendDelay(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息成功 {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("发送消息失败", throwable);
}
}, delayLevel);
}
/**
* 实时消息
*/
public <T> void send(String topic, T message){
send(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息成功 {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("发送消息失败", throwable);
}
});
}
/**
* 延时消息
*/
public <T> void sendRocketDelay(String topic, T message,String from, MqDelayLevel delayLevel){
sendDelay(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("{} 发送消息成功 {}",from, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("{} 发送消息失败",from, throwable);
}
}, delayLevel);
}
/**
* 实时消息
*/
public <T> void sendRocket(String topic, T message,String from){
send(topic, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("{} 发送消息成功 {}",from, sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("{} 发送消息失败",from, throwable);
}
});
}
}
延时级别:同原有的配置,只是加了自己需要的时间
import lombok.AllArgsConstructor;
import lombok.Getter;
@AllArgsConstructor
@Getter
public enum MqDelayLevel {
//1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d 1w 1m 1y
ONE_SEC(1),
FIVE_SEC(2),
TEN_SEC(3),
THIRTY_SEC(4),
ONE_MIN(5),
TWO_MIN(6),
THREE_MIN(7),
FOUR_MIN(8),
FIVE_MIN(9),
SIX_MIN(10),
SEVEN_MIN(11),
EIGHT_MIN(12),
NINE_MIN(13),
TEN_MIN(14),
TWENTY_MIN(15),
THIRTY_MIN(16),
ONE_HOUR(17),
TWO_HOUR(18),
ONE_DAY(19),
ONE_WEEK(20),
ONE_MONTH(21),
ONE_YEAR(22)
;
private int level;
}
MQ消息封装类:
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.NoArgsConstructor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MqMessage <T> implements Message<T> {
private T body;
private MessageHeaders headers;
@Override
public T getPayload() {
return body;
}
@Override
public MessageHeaders getHeaders() {
return headers;
}
}
延时尝试类:
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.weibu.cloud.common.utils.DateUtil;
import com.weibu.cloud.common.utils.uuid.UUID;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.time.LocalDateTime;
@Getter
@ToString
@Slf4j
public class MqRetryMessage <T> implements Serializable {
private static final long serialVersionUID = 408371795120089055L;
/**
* 默认延时级别
*/
private final static MqDelayLevel[] DEFAULT_DELAY_LEVEL = {
MqDelayLevel.ONE_MIN
, MqDelayLevel.FIVE_MIN
, MqDelayLevel.TEN_MIN
, MqDelayLevel.THIRTY_MIN
, MqDelayLevel.ONE_HOUR
, MqDelayLevel.TWO_HOUR
};
/**
* 重试ID
*/
private final String retryId;
/**
* 当前重试次数
*/
private final int retryTimes;
/**
* 重试开始时间
*/
@JsonFormat(pattern = DateUtil.PATTERN)
private final LocalDateTime startTime;
/**
* 最大重试次数
*/
private final int maxAttempts;
/**
* 延时级别
*/
private MqDelayLevel[] delayLevelList = DEFAULT_DELAY_LEVEL;
/**
* 消息体
*/
private final T body;
public MqRetryMessage(){
this(null, 6);
}
/**
* 构造函数
*/
public MqRetryMessage(T body){
this(body, 6);
}
public MqRetryMessage(T body, int maxAttempts){
this(body, maxAttempts, 0, null, UUID.randomUUID().toString());
}
private MqRetryMessage(T body, int maxAttempts, int retryTimes, MqDelayLevel[] delayLevelList, String retryId){
this.retryTimes = retryTimes;
this.maxAttempts = maxAttempts;
this.startTime = LocalDateTime.now();
this.body = body;
this.retryId = retryId;
this.setDelayLevelList(delayLevelList);
}
/**
* 是否可以重试
*/
public boolean canRetry(){
//maxAttempt 为0时无限制
if(maxAttempts == 0){
return true;
}
return retryTimes + 1 <= maxAttempts;
}
/**
* 获取下一个重试消息
*/
public MqRetryMessage <T> nextRetry(){
if(!canRetry()){
throw new IllegalStateException("重试次数已到达最大无法重试");
}
return new MqRetryMessage<>(body, maxAttempts, retryTimes + 1, delayLevelList, retryId);
}
/**
* 获取当前延时级别
*/
@JsonIgnore
public MqDelayLevel getDelayLevel(){
int index = retryTimes;
if(retryTimes + 1 > delayLevelList.length){
index = delayLevelList.length - 1;
}
return delayLevelList[index];
}
/**
* 设置DelayLevel
*/
public void setDelayLevelList(MqDelayLevel[] delayLevelList) {
if(delayLevelList != null && delayLevelList.length > 0) {
this.delayLevelList = delayLevelList;
}
}
}
MQ消息生产者:
import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.constant.OrderEnum;
import com.weibu.cloud.common.feign.user.UserApi;
import com.weibu.cloud.common.mq.MqSender;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.common.utils.DateUtil;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.dto.UserBrief;
import com.weibu.cloud.trade.entity.WbOrder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Component
public class PayedOrderStatisticalProducer {
@Autowired
private MqSender mqSender;
@Autowired
private UserApi userApi;
public void send(OrderStatisticalItem item){
mqSender.send(RocketMqConfig.ORDER_STATISTICAL_TOPIC, item, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("statistical send success.sendResult:{}", JSONObject.toJSONString(sendResult));
}
@Override
public void onException(Throwable e) {
log.error("statistical send failed . :{}",e);
}
});
}
@Async
public void sendList(List<WbOrder> list){
log.info("PayedOrderStatisticalProducer.sendList -> list:{}",list);
if (CollectionUtils.isEmpty(list)){
return;
}
list.forEach(wb -> {
OrderStatisticalItem item = entityToItem(wb);
this.send(item);
});
}
public OrderStatisticalItem entityToItem(WbOrder entity){
OrderStatisticalItem item = new OrderStatisticalItem();
item.setOrderType(queryEnumType(entity.getOrderType()));
item.setPayedAmount(entity.getAmountRealPay());
item.setPayedDate(DateUtil.localDateTimeTransLocalDate(entity.getCreateTime()));
item.setPayFlag(true);
List<UserBrief> list = new ArrayList<>(2);
list.add(UserBrief.build(entity.getAgentId(),2));
list.add(UserBrief.build(entity.getMerchantId(),3));
item.setUserIds(list);
return item;
}
private OrderEnum.OrderType queryEnumType(Integer orderType){
switch (orderType){
case 0:
return OrderEnum.OrderType.ALL;
case 1:
return OrderEnum.OrderType.AGENT;
case 2:
return OrderEnum.OrderType.BIZ;
case 3:
return OrderEnum.OrderType.MALL;
}
return OrderEnum.OrderType.BIZ;
}
}
MQ消息消费者:
import com.alibaba.fastjson.JSONObject;
import com.weibu.cloud.common.mq.RocketMqConfig;
import com.weibu.cloud.trade.dto.OrderStatisticalItem;
import com.weibu.cloud.trade.service.impl.AsyncStatisticalService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqConfig.ORDER_STATISTICAL_TOPIC,
consumerGroup = RocketMqConfig.ORDER_STATISTICAL_GROUP,
consumeMode = ConsumeMode.ORDERLY, //并行处理,默认并行;ORDERLY 按顺序处理
selectorExpression = "*")
public class PayedOrderStatisticalConsumer implements RocketMQListener<OrderStatisticalItem> {
@Autowired
private AsyncStatisticalService asyncStatisticalService;
@Override
public void onMessage(OrderStatisticalItem item) {
log.info("PayedOrderStatisticalConsumer -> item:{}", JSONObject.toJSONString(item));
statisticalOrder(item);
}
//重试机制
//@Retryable(value = Throwable.class, backoff = @Backoff(value = 600000L, delay = 300000L, multiplier = 5, maxDelay = 3600000L))
@Retryable(value = Throwable.class, backoff = @Backoff(delay = 60000L, multiplier = 2), recover = "liquidationNotifyFailed")
public void statisticalOrder(OrderStatisticalItem item){
log.info("statisticalOrder -> item:{}",JSONObject.toJSONString(item));
asyncStatisticalService.updateUserOrderStatistical(item.getUserIds(),item.getPayedAmount(),item.getPayedDate(),item.getOrderType(),item.getPayFlag());
}
}
2018/7/10功能点解读开盘和收盘 开盘:每天股市开始交易称为开盘 收盘:每天股市结束交易称为收盘 注: 沪深A股市场开盘时间是周一到周五,集合竞价时间9:15-9:25,连续竞价时间9:30-11:30,正常交易时间13:00-15:00,深市的14:57-15:00是集合竞价时间 港股证券市场开盘时间是周一至周五,开市前时段9:30-10:00,...
一、1、创建tomcat用户和组,并加入组useradd tomcatpasswd tomcat (123)groupadd tomcatusermod -G tomcat tomcat2、创建(wls…jar)目录:mkdir -p /home/tomcat/app3、创建jdk目录:mkdir /home/jdk mkdir /h
本文译自Java AnnotationsJava Annotation Purposes一般来说,Java Annotations有以下三种用途:Compiler instructionsBuild-time instructionsRuntime instructions构建工具能够扫描Java代码中的annotations并基于这些annotations来生
一、需求背景 在项目开发中,经常会遇到导出Excel报表文件的情况,因为很多情况下,我们需要打印Excel报表,虽然在网页上也可以生成报表,但是打印网上里的报表是无法处理排版问题的,所以最好的方式,还是生成Excel文件。 PageOffice封装了一组用于动态输出数据到Excel文档的相关类,全部包含在com.zhuozhengsoft.pageoffice.excelwriter...
现在之所以有这么多的小伙伴热衷于爬虫技术,无外乎是因为爬虫可以帮我们做很多事情,比如搜索引擎、采集数据、广告过滤等,以Python为例,Python爬虫可以用于数据分析,在数据抓取方面发挥巨大的作用。但是这并不意味着单纯掌握一门Python语言,就对爬虫技术触类旁通,要学习的知识和规范还有喜很多,包括但不仅限于HTML 知识、HTTP/HTTPS 协议的基本知识、正则表达式、数据库知识,常用抓包工...
打开创建的本地头文件,可以看见要实现c++的方法:/* * Class: com_cn_TestJni * Method: getjni * Signature: ()V */JNIEXPORT void JNICALL Java_com_cn_TestJni_getjni (JNIEnv *, jobject);/* * Class: com_cn_
HTTP SSL TCP TLS 说的啥TCP 建立连接过程HTTPS 的连接建立过程HTTPS 是啥HTTP 是明文传输,意味着端到端之间的任意节点都知道内容是消息传输内容是啥,这些节点可以是 路由器,代理等。HTTPS 就是来解决这个问题的,以安全为目的的 HTTP 通道,全称是 Hyper Text Transfer ProtocolSSL TLS 是啥SSL (secure Sockets Layer 安全套接字)TLS(Transport Layer security,
对koa搭建后台管理系统做一些总结,使用的是腾讯云开发和对象存储作为数据库和存储地点。1.首先要安装node,让后npm init初始化,目的是生成package.json文件,package.json将显示安装了哪些模块。2.安装所需的模块,如下cnpm install koa --save //koa框架cnpm install koa-json --save//返回给前端的json对象cnpm install koa-bodyparser --save//post提交的对......
xampp运行MySQL数据时出现 Error: MySQL shutdown unexpectedly.解决方案故障描述:12:15:46 [mysql] Attempting to start MySQL app...12:15:47 [mysql] Status change detected: running12:15:47 [mysql] Status change detecte...
【转自:http://www.ibm.com/developerworks/cn/opensource/os-cn-hadoop-yarn/】简介: 本文介绍了 Hadoop 自 0.23.0 版本后新的 map-reduce 框架(Yarn) 原理,优势,运作机制和配置方法等;着重介绍新的 yarn 框架相对于原框架的差异及改进;并通过 Demo 示例详细描述了在新的 yarn 框架下搭建和...
1.比特平面分层比特平面。取代突出灰度级范围,突出特定比特来为整个图像外观做出贡献。(数字图像处理(中卫第3版))以下的样例是该书上使用的实例。这里在matlab中进行实现,帮助大家理解,同一时候请大家提出宝贵的改动意见。思想:使用书中的图片(到官网进行下载)。在8比特下,遍历整个图像,用像素值与各比特面的值(2^(n-1),n为比特面)进行位与操作,推断该像素值在该比特面是否...