Redis-Jedis的MOVED_ASK实现
Jedis-2.9.0版本,最近在看Redis Cluster的槽位(Slot)迁移,其中涉及MOVED、ASK命令
- MOVED:key已被迁移到目标节点node上(已迁移完成),则返回MOVED命令并携带目标节点的node,刷新客户端的槽缓存,Redis客户端收到后再次请求新目标节点
- ASK:为什么需要ASK命令,为什么不能仅用MOVED解决?MOVED命令表示后续该key都由新节点处理,ASK命令仅仅表示单个这次的key处理由新节点尝试解决,后面的该key还是继续会发到之前的节点
Jedis客户端实现如下,
- 通过CRC16计算出槽位对应的目标节点,之后请求目标节点
- 如果请求出错,则Jedis会重试maxAttemps次,重试完抛出Too many Cluster redirections
- 如果Redis Cluster返回MOVED、ASK命令,底层是抛出异常,而后捕获到,如果是MOVED命令则刷新本地的槽位信息(发送slots命令给Redis节点),这时槽位对应的node可能变了,再次从本地缓存中槽位对应的node进行请求。
- 如果是ASK命令,则请求ASK命令携带的目标节点B,使用的是ASKING命令发送到B,之后再次将业务命令发送到B节点
#JedisClusterCommand.java
...
public T run(String key) {
if (key == null) {
throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");
}
//1.maxAttempts表示尝试次数
return runWithRetries(SafeEncoder.encode(key), this.maxAttempts, false, false);
}
...
private T runWithRetries(byte[] key, int attempts, boolean tryRandomNode, boolean asking) {
//2.尝试完attempts则抛出异常
if (attempts <= 0) {
throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");
}
Jedis connection = null;
try {
if (asking) {
// TODO: Pipeline asking with the original command to make it
// faster....
connection = askConnection.get();
//7.先发送ASKING命令,之后发送Redis的命令(如Get/Set)
connection.asking();
// if asking success, reset asking flag
asking = false;
} else {
if (tryRandomNode) {
connection = connectionHandler.getConnection();
} else {
//3.通过CRC16计算出槽位对应的节点
connection = connectionHandler.getConnectionFromSlot(JedisClusterCRC16.getSlot(key));
}
}
//请求对应的Redis目标节点
return execute(connection);
} catch (JedisNoReachableClusterNodeException jnrcne) {
throw jnrcne;
} catch (JedisConnectionException jce) {
// release current connection before recursion
releaseConnection(connection);
connection = null;
if (attempts <= 1) {
//We need this because if node is not reachable anymore - we need to finally initiate slots renewing,
//or we can stuck with cluster state without one node in opposite case.
//But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
//if there were no successful responses from this node last few seconds
this.connectionHandler.renewSlotCache();
//no more redirections left, throw original exception, not JedisClusterMaxRedirectionsException, because it's not MOVED situation
throw jce;
}
return runWithRetries(key, attempts - 1, tryRandomNode, asking);
} catch (JedisRedirectionException jre) {
// if MOVED redirection occurred,
//4.如果Redis服务端返回MOVED命令,则刷新本地的槽位Slot信息
if (jre instanceof JedisMovedDataException) {
// it rebuilds cluster's slot cache
// recommended by Redis cluster specification
//发送slots命令给Redis节点获取槽位信息
this.connectionHandler.renewSlotCache(connection);
}
// release current connection before recursion or renewing
releaseConnection(connection);
connection = null;
//5.如果Redis服务端返回ASK命令
if (jre instanceof JedisAskDataException) {
asking = true;
//6.请求ASK命令携带的目标节点
askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));
} else if (jre instanceof JedisMovedDataException) {
} else {
throw new JedisClusterException(jre);
}
return runWithRetries(key, attempts - 1, false, asking);
} finally {
releaseConnection(connection);
}
}
...
Jedis解析Redis的Resp协议响应时,头字符是 - 则表示异常,可参考Redis的Resp协议
#Protocol.java
private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
if (b == PLUS_BYTE) {
return processStatusCodeReply(is);
} else if (b == DOLLAR_BYTE) {
return processBulkReply(is);
} else if (b == ASTERISK_BYTE) {
return processMultiBulkReply(is);
} else if (b == COLON_BYTE) {
return processInteger(is);
} else if (b == MINUS_BYTE) {
//响应命令以-开头则表示错误
processError(is);
return null;
} else {
throw new JedisConnectionException("Unknown reply: " + (char) b);
}
}
...
private static void processError(final RedisInputStream is) {
String message = is.readLine();
//是MOVED命令,则抛出JedisMovedDataException命令
if (message.startsWith(MOVED_RESPONSE)) {
String[] movedInfo = parseTargetHostAndSlot(message);
throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1],
Integer.valueOf(movedInfo[2])), Integer.valueOf(movedInfo[0]));
//是ASK命令,则抛出JedisAskDataException命令
} else if (message.startsWith(ASK_RESPONSE)) {
String[] askInfo = parseTargetHostAndSlot(message);
throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
Integer.valueOf(askInfo[2])), Integer.valueOf(askInfo[0]));
} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {
throw new JedisClusterException(message);
} else if (message.startsWith(BUSY_RESPONSE)) {
throw new JedisBusyException(message);
} else if (message.startsWith(NOSCRIPT_RESPONSE) ) {
throw new JedisNoScriptException(message);
}
throw new JedisDataException(message);
}
Jedis收到MOVED响应后,刷新本地槽位信息缓存的实现如下,获取当前池Pool中的所有Redis节点,之后给Redis节点发送Slots命令,获取集群的状态信息,刷新本地的槽位-节点映射关系
#JedisClusterInfoCache.java
public void renewClusterSlots(Jedis jedis) {
//If rediscovering is already in process - no need to start one more same rediscovering, just return
if (!rediscovering) {
try {
w.lock();
rediscovering = true;
if (jedis != null) {
try {
discoverClusterSlots(jedis);
return;
} catch (JedisException e) {
//try nodes from all pools
}
}
//1.getShuffledNodesPool()获取当前持有的Pool中的Redis节点
for (JedisPool jp : getShuffledNodesPool()) {
try {
jedis = jp.getResource();
//2.获取池Pool对应的Redis节点信息
discoverClusterSlots(jedis);
return;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
} finally {
rediscovering = false;
w.unlock();
}
}
}
private void discoverClusterSlots(Jedis jedis) {
//3.发送SLOTS命令给节点,获取当前集群的状态信息,里面含槽位信息和对应的节点
List<Object> slots = jedis.clusterSlots();
this.slots.clear();
for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;
if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}
List<Integer> slotNums = getAssignedSlotArray(slotInfo);
// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
if (hostInfos.isEmpty()) {
continue;
}
// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
//4.将槽位slot-节点映射写入Map中
assignSlotsToNode(slotNums, targetNode);
}
}
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
} finally {
w.unlock();
}
}
如果当前节点正在迁出槽位,并且命令中的key有的已经不再当前节点中了,则设置错误码为REDIS_CLUSTER_REDIR_ASK,并返回该槽位所迁出的目的地节点;
如果当前节点正在迁入槽位,并且客户端具有ASKING标记(客户端之前发来过”ASKING”命令)或者该命令本身就具有ASKING标记(”RESTORE-ASKING”命令),则只有在涉及多个key,并且有的key不在当前节点中的情况下,才设置错误码为REDIS_CLUSTER_REDIR_UNSTABLE,并返回NULL;否则,返回当前节点;
参考:
- Redis Cluster Spec:https://redis.io/docs/reference/cluster-spec/#ask-redirection