技术标签: java websocket spring cloud redis
问题思路
1.为什么采用Redis的ZSet实现延迟任务?
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
2.为什么任务需要存储在MySQL数据库中?
延迟任务是一个通用的服务,任何需要延迟得任务都可以调用该服务,需要考虑数据持久化的问题,存储数据库中是一种数据安全的考虑。
3.在添加zset数据的时候,为什么只存储未来5分钟内的任务?
任务模块是一个通用的模块,项目中任何需要延迟队列的地方,都可以调用这个接口,要考虑到数据量的问题,如果数据量特别大,为了防止阻塞,只需要把未来几分钟要执行的数据存入缓存即可。
Dto:Task对象用于存储每个任务的数据,可以在微服务之间传输任务数据时使用
package cn.lskiot.flow.compose.vo.android;
import lombok.Data;
@Data
public class Task{
/**
* 任务id
*/
private Long taskId;
/**
* 类型
*/
private Integer taskTopic;
/**
* 执行时间
*/
private long executeTime;
/**
* task参数
*/
private String parameters;
}
定义任务状态常量类:
package cn.lskiot.flow.model.constance;
/**
* task状态
*/
public class ScheduleConstants {
//初始化状态
public static final int SCHEDULED = 0;
//已执行状态
public static final int EXECUTED = 1;
//已取消状态
public static final int CANCELLED = 2;
//上牌
public static final int TASK_TOPIC_FLOW_UPPER = 3;
//下牌
public static final int TASK_TOPIC_FLOW_LOWER = 4;
//翻牌
public static final int TASK_TOPIC_FLOW_TURN = 5;
//结束迎宾
public static final int TASK_TOPIC_GREET_OVER = 6;
//跨排位翻牌
public static final int TASK_TOPIC_CROSS_TURN = 7;
}
定义Redis常量类
public abstract class RedisConstants {
public static String TASK_TOPIC_PREFIX = "task_topic_";
}
定义任务详情类
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo")
public class Taskinfo implements Serializable {
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
}
定义任务执行记录类
package cn.lskiot.flow.model.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
@Data
@TableName("taskinfo_logs")
public class TaskinfoLogs implements Serializable {
/**
* 任务id
*/
@TableId(type = IdType.ID_WORKER)
private Long taskId;
/**
* 执行时间
*/
@TableField("execute_time")
private Date executeTime;
/**
* 参数
*/
@TableField("parameters")
private String parameters;
/**
* 任务主题
*/
@TableField("task_topic")
private Integer taskTopic;
/**
* 版本号,用乐观锁
*/
@Version
private Integer version;
/**
* 状态 0=int 1=EXECUTED 2=CANCELLED
*/
@TableField("status")
private Integer status;
}
Taskinfo
package cn.lskiot.flow.application.mapper;
import cn.lskiot.flow.model.entity.Taskinfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskinfoMapper extends BaseMapper<Taskinfo> {
}
package cn.lskiot.flow.proxy.service;
import cn.lskiot.flow.model.entity.Taskinfo;
import com.jbangit.application.base.BaseService;
import java.util.Date;
import java.util.List;
public interface TaskinfoService extends BaseService<Taskinfo> {
List<Taskinfo> selectList(Date futureDate);
}
package cn.lskiot.flow.application.service.impl;
import cn.lskiot.flow.application.mapper.TaskinfoMapper;
import cn.lskiot.flow.model.entity.Taskinfo;
import cn.lskiot.flow.proxy.service.TaskinfoService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;
import java.util.Date;
import java.util.List;
@DubboService
public class TaskinfoServiceImpl extends ServiceImplPlus<TaskinfoMapper, Taskinfo> implements TaskinfoService {
@Override
public List<Taskinfo> selectList(Date futureDate) {
return super.lambdaQuery().le(Taskinfo::getExecuteTime, futureDate).list();
}
}
TaskinfoLogs
package cn.lskiot.flow.application.mapper;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskinfoLogsMapper extends BaseMapper<TaskinfoLogs> {
}
package cn.lskiot.flow.proxy.service;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import com.jbangit.application.base.BaseService;
public interface TaskinfoLogsService extends BaseService<TaskinfoLogs> {
}
package cn.lskiot.flow.application.service.impl;
import cn.lskiot.flow.application.mapper.TaskinfoLogsMapper;
import cn.lskiot.flow.model.entity.TaskinfoLogs;
import cn.lskiot.flow.proxy.service.TaskinfoLogsService;
import com.jbangit.application.base.ServiceImplPlus;
import org.apache.dubbo.config.annotation.DubboService;
@DubboService
public class TaskinfoLogsServiceImpl extends ServiceImplPlus<TaskinfoLogsMapper, TaskinfoLogs> implements TaskinfoLogsService {
}
/**
* 添加延迟任务
*
* @param task
* @return
*/
public Long addTask(@RequestBody Task task) {
//把任务添加到DB
addTaskToDb(task);
//把任务添加Redis
addTaskToCache(task);
return task.getTaskId();
}
/**
* 更新延迟任务
*
* @param task
* @return
*/
public Long updateTask(@RequestBody Task task) {
updateTaskToDb(task);
updateTaskToCache(task);
return task.getTaskId();
}
/**
* 删除延时任务
*
* @param task
* @return
*/
private Long removeTask(Task task) {
removeTaskToDb(task);
removeTaskToCache(task);
return task.getTaskId();
}
添加任务
/**
* 把任务添加Redis
*
* @param task
*/
private void addTaskToCache(Task task) {
//获取当前时间的未来5分钟的时间
long futureTime = DateTime.now().plusMinutes(5).getMillis();
//判断任务的执行时间是否在未来5分钟以内
if (task.getExecuteTime() <= futureTime) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
}
}
/**
* 把任务添加到DB
*
* @param task
*/
private void addTaskToDb(Task task) {
try {
//添加任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfo = taskinfoService.saveAndReturnT(taskinfo);
//把新产生的任务ID赋值给Task对象
task.setTaskId(taskinfo.getTaskId());
//添加任务日志表
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsService.save(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
更新任务
/**
* 更新Redis中的任务
*
* @param task
*/
private void updateTaskToCache(Task task) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().add(key, JsonUtil.toJson(task), task.getExecuteTime());
}
/**
* 更新DB中的任务
*
* @param task
*/
private void updateTaskToDb(Task task) {
try {
//添加任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoService.updateById(taskinfo);
//添加任务日志表
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
删除任务
/**
* 从缓存删除延时任务
*
* @param task
*/
private void removeTaskToCache(Task task) {
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + task.getTaskTopic();
redisTemplate.opsForZSet().remove(key, JsonUtil.toJson(task));
}
/**
* 从数据库删除延时任务
*
* @param task
*/
private void removeTaskToDb(Task task) {
try {
//删除任务表
taskinfoService.removeById(task.getTaskId());
//更新任务日志表状态
TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
//标记为已执行
taskinfoLogs.setStatus(ScheduleConstants.CANCELLED);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException();
}
}
一分钟同步一次数据
/**
* 定时从MySQL导入任务到缓存
*/
@Scheduled(cron = " 0/60 * * * * ? ")
public void importTaskToCache() {
log.debug("从MySQL导入任务到缓存...");
//从任务表中查询未来5分钟将要执行任务
Date futureDate = DateTime.now().plusMinutes(5).toDate();
List<Taskinfo> taskinfoList = taskinfoService.selectList(futureDate);
if (CollectionUtil.isEmpty(taskinfoList)) {
return;
}
for (Taskinfo taskinfo : taskinfoList) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
//添加到redis中
addTaskToCache(task);
}
}
/**
* 消费延迟任务
*/
public List<Task> pollTask(Integer taskTopic) {
//在Redis中查询符合执行条件的任务
String key = FlowRedisConstance.TASK_TOPIC_PREFIX + taskTopic;
Set<String> taskSet = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis());
List<Task> taskList = new ArrayList<>();
if (!CollectionUtils.isEmpty(taskSet)) {
for (String taskStr : taskSet) {
Task task = JSONUtil.toBean(taskStr, Task.class);
//清理Db的消费过的任务
clearTaskDb(task);
//清理缓存的消费过的任务
clearTaskCache(key, taskStr);
taskList.add(task);
}
}
return taskList;
}
清理缓存数据
/**
* 清理缓存中消费过的任务
*
* @param taskStr
*/
private void clearTaskCache(String key, String taskStr) {
redisTemplate.opsForZSet().remove(key, taskStr);
}
清理数据库数据
/**
* 清理缓存中消费过的任务
*
* @param task
*/
private void clearTaskDb(Task task) {
try {
//删除任务表
taskinfoService.removeById(task.getTaskId());
//更新任务日志表状态
TaskinfoLogs taskinfoLogs = taskinfoLogsService.getById(task.getTaskId());
//标记为已执行
taskinfoLogs.setStatus(ScheduleConstants.EXECUTED);
taskinfoLogsService.updateById(taskinfoLogs);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
例如在我的业务中指定员工休息, 超出时间会做矿工处理
/**
* 消费超时自动下牌任务
*/
@Scheduled(fixedRate = 1000)// 每隔1秒1次
public void publishAutoLowerTask() {
//查询Redis符合条件的任务
List<Task> taskList = pollTask(ScheduleConstants.TASK_TOPIC_FLOW_LOWER);
if (CollectionUtil.isNotEmpty(taskList) && taskList.size() != 0) {
for (Task task : taskList) {
//取出任务参数
String param = task.getParameters();
FlowRankIntervalItem flowRankIntervalItem = JSONUtil.toBean(param, FlowRankIntervalItem.class);
flowRankIntervalItem = flowRankIntervalItemService.getById(flowRankIntervalItem.getId());
Long rankIntervalId = flowRankIntervalItem.getRankIntervalId();
FlowRankInterval flowRankInterval = flowRankIntervalService.getById(rankIntervalId);
Response<Long> response = rankController.employeeLower(flowRankInterval.getRotationRecordId(), EnumIsDefault.yes.getCode());
ArrayList<Long> flowRankIds = new ArrayList<>();
flowRankIds.add(response.getData());
flowRankIds.add(flowRankService.getFlowRankByName(flowRankInterval.getStoreId(),EnumFlowRank.other.getMsg()).getId());
WebSocketMsg webSocketMsg = new WebSocketMsg()
.setSignal(EnumWebsocket.websocket_flush)
.setData(JSONUtil.toJsonStr(flowRankIds))
.setToUser(flowRankIntervalItem.getUserid());
flowProjectCommon.webSocketSend(webSocketMsg);
log.info("倒计时结束自动超时下牌");
}
}
}
这里出现了一个业务需求, 服务器要去自动发送刷新请求, 经过讨论使用的是websocket长连接
我参考了网上搭建websocket聊天室的教程, 搭建了一个websocket服务, 地址如下:
Java WebSocket实现网络聊天室(群聊+私聊)_几人憔悴几人泪的博客-程序员宅基地_java websocket 聊天室
贴一下我自己编写的代码
package cn.lskiot.flow.compose.webSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author hqr
*/
@Slf4j
@ServerEndpoint("/webSocket/{userId}")
@Component
public class WebSocketServer {
/**
* 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
*/
private static int onlineCount = 0;
/**
* webSocketMap,用来存放每个客户端对应的MyWebSocket对象。
*/
public static final ConcurrentHashMap<String, WebSocketServer> WEBSOCKET_MAP = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收userId
*/
private String userId = "";
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
session.setMaxIdleTimeout(60*1000);
this.session = session;
this.userId = userId;
if (WEBSOCKET_MAP.containsKey(userId)) {
WEBSOCKET_MAP.remove(userId);
WEBSOCKET_MAP.put(userId, this);
} else {
WEBSOCKET_MAP.put(userId, this);
addOnlineCount();
}
log.info("用户连接:" + userId + ",当前在线人数为:" + getOnlineCount());
try {
sendMessage("连接成功");
} catch (IOException e) {
log.error("用户:" + userId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
try {
if (WEBSOCKET_MAP.containsKey(userId)) {
WEBSOCKET_MAP.remove(userId);
subOnlineCount();
}
log.info("用户退出:" + userId + ",当前在线人数为:" + getOnlineCount());
} catch (Exception e) {
log.error("退出异常");
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
// log.info("用户:" + userId + ",报文:" + message);
}
/**
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.userId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
WebSocketServer.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketServer.onlineCount--;
}
}
定义消息实体类
package cn.lskiot.flow.compose.vo.android;
import enums.EnumWebsocket;
import lombok.Data;
import lombok.experimental.Accessors;
@Data
@Accessors(chain = true)
public class WebSocketMsg {
private EnumWebsocket signal;
private String toUser;
private String data;
}
发送消息公共方法
/**
* 使用webSocket发送消息
*/
public void webSocketSend(WebSocketMsg webSocketMsg) {
try {
String toUser = webSocketMsg.getToUser();
String key;
if (toUser.contains(":")) {
key = toUser.split(":")[0];
} else {
key = toUser.split("%3A")[0];
}
ConcurrentHashMap<String, WebSocketServer> websocketMap = WebSocketServer.WEBSOCKET_MAP;
websocketMap.forEach((k, webSocketServer) -> {
if (k.contains(":")) {
k = toUser.split(":")[0];
} else {
k = toUser.split("%3A")[0];
}
if (key.equals(k)) {
try {
webSocketServer.sendMessage(JSONUtils.toString(webSocketMsg));
} catch (IOException e) {
log.error("消息发送失败");
}
}
});
} catch (Exception e) {
e.printStackTrace();
throw new ServiceException("websocket异常");
}
}
利用websocket实现了扫码登录
@PostMapping("checkEmployeeId")
@ApiOperation("扫码校验")
public Response<String> checkEmployeeId(
@ApiParam("员工id") @RequestParam("employeeId") Long employeeId,
@ApiParam("水牌机用户id") @RequestParam("userId") String userId,
@ApiParam("上下牌判断") @RequestParam("flag") boolean flag) {
EnumWebsocket EnumWebsocket = flag ? enums.EnumWebsocket.websocket_upper : enums.EnumWebsocket.websocket_lower;
long temp = sessionHelper.getOrganizationEmployeeId();
if (Objects.equals(temp, employeeId)) {
HashMap<String, String> data = new HashMap<>();
data.put("employeeId", String.valueOf(employeeId));
data.put("userId", userId);
WebSocketMsg webSocketMsg = new WebSocketMsg()
.setData(JSONUtil.toJsonStr(data))
.setToUser(userId)
.setSignal(EnumWebsocket);
flowProjectCommon.webSocketSend(webSocketMsg);
return onSuccess();
} else {
return onFailure(ErrorCode.normal, "请扫描自己的二维码");
}
}
以上就是我在项目实战中遇到的一些难点以及解决方案, 希望对你有帮助, 码字不易, 请点赞支持一下哦!
由于最近在开发项目需要大量关于消息推送,IM的系统,所以最近在研究openfire spark smack 等开源项目。 最近也看了DDPUSH 任意门推送的源码,是基于udp 和tcp实现的,没有使用XMPP 、MQTT协议,测试发现还是有一些问题,也许正是因为UDP协议本质所决定的吧,发送包有一些没法接收到,最近做类似滴滴这样的项目如果用这样的推送可以无法保证消息的准确性和及
本课程将对Xilinx提供的一款IP核——AXI VDMA(Video Direct Memory Access) 进行详细讲解,为后续的学习和开发做好准备。内容安排如下:首先分析为什么要使用VDMA、VDMA的作用;然后详细介绍VDMA的特点、寄存器作空间; 最后阐述如何使用VDMA,包括IP核的配置方法、代码编写流程等。本章主要是理论学习,学习完本章,会对VDMA有全面的认识,有利于学
问题:C++友元函数的最简单案例本程序通过VC++ 6.0编译与测试,程序的目的是求两个点之间的中点坐标,具体代码如下://没有使用友元类,报错,错误分析见代码注释#include &lt;iostream&gt;using namespace std;class Point{public: Point(float a,float b):x(a),y(b){} void p...
java 按字节读写文件(Base64编码解码)最近在做项目时遇到这样一个需求:依次读取本地文件夹里所有文件的内容,转为JSON,发送到ActiveMQ的消息队列, 然后从MQ的消息队列上获取文件的信息,依次写到本地。常见的文件类型,比如.txt ,.png等文件的读写并不难。但是,我刚才所提到的需求,如果用常规的方法去读写,比如按字节读取文件内容,转为字符串,再转为JSON发送到MQ的队列,然后从
不少WIN10的小伙伴都已经升级了最新的edge浏览器,也都十分欢喜的试用了它的新的功能和界面,但也遇到过各种各样的小小问题不知如何解决。我遇到的就是edge无法储存百度账号的登录状态,导致每次重新打开edge浏览器时都需要重新重新登录百度账号。太讨厌了!造成这种情况的原因是,新版edge的隐私和服务功能太过强大,关闭浏览器会清除上次的网站跟踪,导致账号登录状态无法保存。经过不断摸索,通过以下步骤可以解决百度账号登录状态无法保存的问题。1. 打开浏览器设置①点击浏览器右上角”···“按钮;②点击
every blog every motto: you cannot change what you refuse to confront.0. 前言ENVI中修改头文件说明: 本文基于ENVI 5.31. 正文1.1 设置忽略值代开文件注: 这里数据已经好了,下方只是演示使用,主要针对图片周围是黑色的,改变成白色。Raster Management -> Edit ...
Qt 使用QAxObject保存excel出错,使用windows格式路径分隔符Qt 使用QAxObject读取excel和保存excel时,必须保证文件路径是绝对路径,而且需要使用\\分隔符,不能使用/分隔符;如果使用/分割符号,运行pWorkBook-&amp;gt;dynamicCall(&quot;SaveAs(const QString &amp;amp;)&quot;,&quot;F:/test.xlsx&quot;);会出现弹出保
void var_dump ( mixed $expression [, mixed $... ] )此函数显示关于一个或多个表达式的结构信息,包括表达式的类型与值。数组将递归展开值,通过缩进显示其结构。在PHP5里,所有的public、private和protected属性都会被返回输出。------------------------------------------------------...
一:ros的系统架构 ros系统架构主要分为三个部分:文件系统级计算图级开源社区级1. 文件系统级功能包(package):是ROS中软件组织的基本形式,一个功能包具有最小的结构和最少的内容,用于创建ROS程序;--指的是一种特定的文件结构和文件夹组合功能包清单:manifests.xml 通过这个文件实现对功能包的管理;功能包集(stack):将几
项目用的是[email protected], [email protected] ,[email protected],ui框架那些就不说了首先下载electron-builder,这里建议使用cnpm挂淘宝镜像下载npm install -g cnpm --registry=https://registry.npm.taobao.org,先下载之后统一将整个node_module文件夹删除,用np...
由于存储空间不足,下线的数据库需要把存储空间腾出来,关闭集群资源,主机工程师收回lun需要(包括ocr 和 voting data 磁盘组),新的应用需要上线需要新的数据库,新的hitach存储到位需要重新安装数据库,上次删除gi 和database软件都在,这次只需要重新配置即可。参考How to Deconfigure/Reconfigure(Rebuild OCR) or Deinstall...
先启动pyspark第一篇博客有from pyspark import SparkContext,SparkConffrom pyspark.sql import SparkSessionspark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()spark.read.text(“people.txt”)#读取文件创...