使用Jedis操作Redis Cluster,我们需要创建JedisCluster对象,再通过JedisCluster对象实例操作数据,代码一般如下:
// 初始化所有节点(例如6个节点)
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>() {
{
add(new HostAndPort("127.0.0.1", 6379));
add(new HostAndPort("127.0.0.1", 6380));
add(new HostAndPort("127.0.0.1", 6381));
add(new HostAndPort("127.0.0.1", 6382));
add(new HostAndPort("127.0.0.1", 6383));
add(new HostAndPort("127.0.0.1", 6384));
}};
// 初始化commnon-pool连接池,并设置相关参数
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 初始化JedisCluster
JedisCluster jedisCluster = new JedisCluster(jedisClusterNode, 1000, 1000, 5, poolConfig);
下面我们详细了解下JedisCluster的创建逻辑,其构造函数核心功能为创建connectionHandler实例,JedisClusterConnectionHandler的构造函数如下:
public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
final GenericObjectPoolConfig poolConfig, int connectionTimeout,
int soTimeout, String password, String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, JedisClusterHostAndPortMap portMap) {
this.cache = new JedisClusterInfoCache(poolConfig, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, portMap);
initializeSlotsCache(nodes, connectionTimeout, soTimeout, password, clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
}
其中重点需要关注的是initializeSlotsCache方法,该方法主要初始化集群中槽和节点的映射关系,下面简单介绍下Redis Cluster的数据分区和路由方案。
Redis Cluster使用虚拟槽数据分区方案,每个节点维护部分槽和数据,当需要读取数据时,使用"CRC16(key) & 16383"哈希算法得到key所在的槽,Redis Cluster本身会维护节点和槽的信息。
当你使用Dummy型客户端(例如redis-cli)在某个不具备该槽的节点上执行读取指令时,Redis Cluster会返回"MOVED"重定向信息,告诉我们该槽所在节点的IP和端口信息,然后就可以到相应的节点上执行命令读取数据。该类型的客户端需要我们自行进行重定向操作,相对不方便。
而Jedis等Smart型客户端本身就维护了槽和节点的映射关系,通过"CRC16(key) & 16383"哈希算法得到key所在的槽后,就可以通过映射关系找到对应的节点,然后直接给该节点发送指令返回数据。
下面为initializeSlotsCache方法源码:
private void initializeSlotsCache(Set<HostAndPort> startNodes,
int connectionTimeout, int soTimeout, String password, String clientName,
boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier) {
for (HostAndPort hostAndPort : startNodes) {
Jedis jedis = null;
try {
jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);
if (password != null) {
jedis.auth(password);
}
if (clientName != null) {
jedis.clientSetname(clientName);
}
cache.discoverClusterNodesAndSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}
需要注意的是,由于discoverClusterNodesAndSlots内使用了WriteLock写锁,因此保证了只有一个连接会初始化节点和槽的映射关系。
重点分析下discoverClusterNodesAndSlots方法:
public void discoverClusterNodesAndSlots(Jedis jedis) {
//写锁保证只有一个连接进行初始化
w.lock();
try {
//清空旧的节点和槽数据,并关闭节点连接
reset();
//使用"cluster slots"指令获取Cluster节点和槽的映射关系
/**
* 返回结构如下:
* 127.0.0.1:6379> cluster slots
* 1) 1) (integer) 5462 -- 起始槽
* 2) (integer) 10922 -- 终止槽
* 3) 1) "127.0.0.1" -- 主节点
* 2) (integer) 6380
* 3) "85371dd3c2c11dbb1cd506ed028de10bb7fa2816"
* 4) 1) "127.0.0.1" -- 从节点
* 2) (integer) 6383
* 3) "01740d2eb2c9f89014e2b8b673d444753d0685cd"
* 2) 1) (integer) 10923
* 2) (integer) 16383
* 3) 1) "127.0.0.1"
* 2) (integer) 6381
* 3) "aacad0a5a2b37d4e11f2253849263575adb78740"
* 4) 1) "127.0.0.1"
* 2) (integer) 6384
* 3) "5e099198bba08597d695f6e2e3db2d6ec1494534"
* 3) 1) (integer) 0
* 2) (integer) 5461
* 3) 1) "127.0.0.1"
* 2) (integer) 6379
* 3) "672cc8350bb1ac1d94e9c511ea234f6b7f86cab1"
* 4) 1) "127.0.0.1"
* 2) (integer) 6382
* 3) "6bab67c3619c4b9cbdfd247ff3e446308a0c6326"
*/
List<Object> slots = jedis.clusterSlots();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
//解析出所负责的的所有槽
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
/**
* 解析主节点和从节点信息,构建槽和节点的映射关系:
* 1. 添加nodes元素,key为"<ip>:<port>",value为该节点对应的JedisPool连接池
* 2. 添加slots元素,key为"slotNum",value为该槽对应节点的JedisPool连接池
*/
int size = slotInfo.size();
for (int i = MASTER_NODE_INDEX; i < size; i++) {
List<Object> hostInfos = (List<Object>) slotInfo.get(i);
if (hostInfos.size() <= 0) {
continue;
}
//获取节点IP端口信息
HostAndPort targetNode = generateHostAndPort(hostInfos);
//设置nodes,Map.Entry结构为<ip:port, JedisPool>
//从节点和主节点都会进行保存
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
//设置slots,Map.Entry结构为<slot, JedisPool>
//只需要设置槽和主节点的映射关系
assignSlotsToNode(slotNums, targetNode);
}
}
}
} finally {
w.unlock();
}
}
维护节点和槽的映射关系主要通过JedisClusterInfoCache里面的两个Map类型的变量:
private final Map nodes = new HashMap();
private final Map slots = new HashMap();
该方法的主要流程如下:
1. 清空nodes和slots数据,并关闭关联的JedisPool。
2. 使用"cluster slots"指令获取Cluster节点和槽的映射关系,其返回结构如下:
127.0.0.1:6379> cluster slots
1) 1) (integer) 5462 . -- 起始槽
2) (integer) 10922 . -- 终止槽
3) 1) "127.0.0.1" . -- 主节点IP
2) (integer) 6380 -- 主节点端口
3) "85371dd3c2c11dbb1cd506ed028de10bb7fa2816" -- 主节点runId
4) 1) "127.0.0.1" . -- 从节点IP
2) (integer) 6383 -- 从节点端口
3) "01740d2eb2c9f89014e2b8b673d444753d0685cd" -- 从节点runId
2) 1) (integer) 10923
2) (integer) 16383
3) 1) "127.0.0.1"
2) (integer) 6381
3) "aacad0a5a2b37d4e11f2253849263575adb78740"
4) 1) "127.0.0.1"
2) (integer) 6384
3) "5e099198bba08597d695f6e2e3db2d6ec1494534"
3) 1) (integer) 0
2) (integer) 5461
3) 1) "127.0.0.1"
2) (integer) 6379
3) "672cc8350bb1ac1d94e9c511ea234f6b7f86cab1"
4) 1) "127.0.0.1"
2) (integer) 6382
3) "6bab67c3619c4b9cbdfd247ff3e446308a0c6326"
3. 解析出所负责的的所有槽,代码如下:
private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
List<Integer> slotNums = new ArrayList<Integer>();
for (int slot = ((Long) slotInfo.get(0)).intValue(); slot <= ((Long) slotInfo.get(1))
.intValue(); slot++) {
slotNums.add(slot);
}
return slotNums;
}
slotNums的数量即为该主节点所负责的槽的数量。
4. 解析主节点和从节点信息,构建槽和节点的映射关系。
值得注意的是,nodes里面会保存主节点和从节点的信息,但是slots里面槽只对应主节点的连接池JedisPool,从节点并需要设置。
最后值得一提的是,Jedis连接Sentinel时,连接的是Sentinel集群,通过“masterName”参数获取Sentinel集群监控的主节点连接,后续的操作也是通过该主节点连接执行。但连接Cluster时,其实连接的是每个Redis实例,读取数据时会根据Jedis维护的槽和节点映射关系去对应的节点上读取。
JedisCluster关闭代码如下:
jedisCluster.close();
其关闭源码逻辑主要调用JedisClusterInfoCache类的reset方法,该方法关闭了所有集群节点的连接,并将nodes和slots数据清空。
//BinaryJedisCluster类
@Override
public void close() {
if (connectionHandler != null) {
connectionHandler.close();
}
}
//JedisClusterConnectionHandler类
@Override
public void close() {
cache.reset();
}
//JedisClusterInfoCache类
public void reset() {
w.lock();
try {
for (JedisPool pool : nodes.values()) {
try {
if (pool != null) {
pool.destroy();
}
} catch (Exception e) {
// pass
}
}
nodes.clear();
slots.clear();
} finally {
w.unlock();
}
}
MYSQL 数据备份背景:在本地开发时,经常需要将本地测试数据同步到服务器上,通过navicat 或者sqlyog 虽然可以将数据便捷的导入导出,但总是觉得比较麻烦于是想通过直接拷贝mysql数据文件的方式,进行数据的同步。操作步骤1、停止mysql服务可在windows服务里,对需要同步数据的机器的mysql服务,进行停止2、拷贝数据到服务器上注意:1、建议本地的mysql版本,和你需要同步数据的服务器的mysql版本保持一致2、mysql 的root 账号密码保持一致,否则数据_mysql数据同步
头文件是#include ,如果要从数组a复制k个元素到数组b,可以这样做memcpy(b,a,sizeof(int)*k);#include #include int main() { int a[5]={0,1,2,3,4}; int b[5]; memcpy(b,a,sizeof(int)*3); for(int i = 0; i < 3; i++) printf(_c语言数组复制到另一个数组
以下内容较多可配合Ctrl+F搜索快速定位,对代码三连击全段选中,复制后粘贴进游戏控制台按。单位类代码示例:admincheat SpawnDino "Bluep...BP'"斯坦比约恩 冰熊 世界BOSS 维京。斯科尔 双狼2 世界BOSS 维京。哈蒂 双狼1 世界BOSS 维京。贝拉 巨蜂 世界BOSS 维京。水晶飞龙女皇 BOSS 水晶。芬里尔巨狼 BOSS 维京。罗克韦尔 BOSS 畸变。君王泰坦 BOSS 灭绝。腐化主宰 BOSS 创世。莫德尔 BOSS 创世。狮蝎 BOSS 焦土。..._方舟原始恐惧mod生物代码
问题描述官方文档做的Demo发现遇到了错误提示如下:error: (-215:Assertion failed) !empty() in function ‘cv::CascadeClassifier::detectMultiScale’错误的原因:出现 error: (-215:Assertion failed) !empty() in function ‘cv::CascadeClassifier::detectMultiScale’ 的主要原因是我们的代码没能够正确找到正确的_error: (-215:assertion failed) !empty() in function 'cv::cascadeclassifier::
一.浮动布局浮动布局就是让网页上的标签“浮起来”,比如<div>是不可能两个同占一行,但是让它们浮起来后,就能使它们同占一行。浮动通过float来设定,左浮:float:left; 右浮:float:left; #div1{ width: 200px; height: 200px; border: 2px solid red..._方框的float
AboutThere is a vulnerability in the below program that allows arbitrary programs to be executed, can you find it?Source code#include #include #include #include
由于UITextSelectionView是系统的私有类,连头文件都没有。那么我们怎么hook它的方法呢。比如初始化方法initWithInteractionAssistant://步骤一 Method originEat = class_getInstanceMethod(NSClassFromString(@"UITextSelectionView"), ..._uitextselectionview
BF算法KMP算法_图解 bf算法
修改tomcat/conf/server.xml配置文件。 maxThreads="500" minSpareThreads="400" /> port="80" protocol="HTTP/1.1" connectionTimeout="20000" enableLookups="false" r
CCRenderTexture,它允许你来动态创建纹理,并且可以在游戏中重用这些纹理。使用 CCRenderTexture非常简单 – 你只需要做以下5步就行了:①.创建一个新的CCRenderTexture:指定所要创建纹理的宽度和高度。②.调用CCRenderTexture的begin方法:设置OpenGL以便之后的任何图形绘制都在CCRenderTexture上,而不
开源报修系统V2版本开源报修管理系统V2版本,后台采用flask框架+layui实现,前台采用原声微信小程序框架编写,V2版本在V1版本基础上扩展,功能更加丰富。文章目录开源报修系统V2版本1、修改微信小程序授权方式2、小程序端增加(新报修、维修中、已完成)三种报修状态。3、最新源码获取方式结束语1、修改微信小程序授权方式微信更新了最新的接口, 运用V1版本源码,你会遇到如下问题,V2版本已经修改为最新的授权方式。需要的同学或已经投入使用的同学,请及时更新源码。修改的代码块如下: getUs_报修维修管理开源
MATLAB读取grib2格式气象数据文章目录MATLAB读取grib2格式气象数据前言一、grib2数据二、使用步骤1.安装nctoolbox包2.数据读取3.数据可视化前言grib2是气象领域一种常用的数据格式,本文介绍用MATLAB读取和可视化grib2数据的方法。一、grib2数据详细介绍内容可以参考[https://www.nco.ncep.noaa.gov/pmb/docs/grib2/grib2_doc/]二、使用步骤1.安装nctoolbox包下载nctool_matlab 读取grib