Redis-集群槽位重分片Key迁移实现
Redis提供了辅助脚本redis-trib.rb,以”reshard”参数调用该脚本就可以实现重新分片的操作。但是本质上,该脚本就是通过向迁入节点和迁出节点发送一些命令实现的。
假设要从A节点迁移slot槽位到B节点
设置迁入节点
向迁入节点B发送” CLUSTER SETSLOT
- 对cluster setslot命令,首先从参数获取槽位号slot,如果解析错误则直接返回错误
- 如果当前节点已经是负责槽位slot[slot]的节点,则直接返回
- 先在B的nodes数组中查找迁入节点是否存在,不存在则直接返回;存在则将B的importing_slots_from[slot]设置为A
#cluster.c
void clusterCommand(client *c) {
...
} else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
/* SETSLOT 10 MIGRATING <node ID> */
/* SETSLOT 10 IMPORTING <node ID> */
/* SETSLOT 10 STABLE */
/* SETSLOT 10 NODE <node ID> */
int slot;
clusterNode *n;
if (nodeIsSlave(myself)) {
addReplyError(c,"Please use SETSLOT only with masters.");
return;
}
...
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
...
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
if (server.cluster->slots[slot] == myself) {
addReplyErrorFormat(c,
"I'm already the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
server.cluster->importing_slots_from[slot] = n;
设置迁出节点
向迁出节点A发送” CLUSTER SETSLOT
- 首先判断slots[slot]是否是等于myself,如果不是则说明槽位不由A节点负责,直接返回错误。
- 判断迁入节点B是否存在于当前节点的nodes数组中,如果不存在则说明当前节点不认识B节点,直接返回错误
- 最后设置A节点的migrating_slots_to[slot]=B,表示该槽位正在由A节点迁到B节点
void clusterCommand(client *c) {
...
} else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
...
if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
if (server.cluster->slots[slot] != myself) {
addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
return;
}
if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
addReplyErrorFormat(c,"I don't know about node %s",
(char*)c->argv[4]->ptr);
return;
}
server.cluster->migrating_slots_to[slot] = n;
} else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
...
server.cluster->importing_slots_from[slot] = n;
获取槽位对应的key
回到”CLUSTER GETKEYSINSLOT“命令,在函数clusterCommand中,通过getKeysInSlot()获取最多maxKeys个key
void clusterCommand(client *c) {
...
} else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
...
/* Avoid allocating more than needed in case of large COUNT argument
* and smaller actual number of keys. */
unsigned int keys_in_slot = countKeysInSlot(slot);
if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;
keys = zmalloc(sizeof(robj*)*maxkeys);
numkeys = getKeysInSlot(slot, keys, maxkeys);
addReplyMultiBulkLen(c,numkeys);
for (j = 0; j < numkeys; j++) {
addReplyBulk(c,keys[j]);
decrRefCount(keys[j]);
}
zfree(keys);
从rax树中获取槽位slot对应的key
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
raxIterator iter;
int j = 0;
unsigned char indexed[2];
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
raxStart(&iter,server.cluster->slots_to_keys);
raxSeek(&iter,">=",indexed,2);
while(count-- && raxNext(&iter)) {
if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
}
raxStop(&iter);
return j;
}
向迁出节点A发送MIGRATE命令
向迁出节点发送”MIGRATE
针对上面getkeysinslot命令获取的每个key,向迁出节点A发送”MIGRATE
MIGRATE命令的处理函数是migrateCommand,如下:
- 从当前连接的数据库中查找key,得到其value,并赋值给ov[oi]
- 对每个key,dump其对应的value,然后放入payload中,并封装为restore-asking命令
- 之后将包由节点A发送给节点B
void migrateCommand(client *c) {
...
for (j = 0; j < num_keys; j++) {
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
kv[oi] = c->argv[first_key+j];
oi++;
}
}
...
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
...
createDumpPayload(&payload,ov[j],kv[j]);
...
}
B节点收到包后解析,读取key,并读取dump格式的value,将key和value保存到B节点中,如果有设置TTL则设置TTL,最后返回OK给A
void restoreCommand(client *c) {
...
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
...
/* Create the key and set the TTL if any */
dbAdd(c->db,key,obj);
if (ttl) {
setExpire(c,c->db,key,ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,key);
addReply(c,shared.ok);
server.dirty++;
}
向所有节点发送SETSLOT命令
向所有节点发送”CLUSTER SETSLOT
当槽位中的所有key都迁移完成后,需要向集群中所有节点,包括迁出和迁入的A、B节点,以便通知所有节点,更新槽位
- 首先根据参数
在字典server.cluster->nodes中查询新的负责该槽位的节点n,若找不到,则回复客户端错误信息后返回 - A收到该信息后,判断槽位slot中的key个数为0则key已经迁出完成,将migrating_slots_to[slot]设置为NULL
- B收到该信息后,且n是myself,则将importing_slots_from[slot]设置为NULL
- 调用clusterDelSlot清空该slot相关的信息,然后调用clusterAddSlot,将该槽位的负责人改为节点n;
void clusterCommand(client *c) {
...
} else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
...
} else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
/* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr);
if (!n) {
addReplyErrorFormat(c,"Unknown node %s",
(char*)c->argv[4]->ptr);
return;
}
/* If this hash slot was served by 'myself' before to switch
* make sure there are no longer local keys for this hash slot. */
if (server.cluster->slots[slot] == myself && n != myself) {
if (countKeysInSlot(slot) != 0) {
addReplyErrorFormat(c,
"Can't assign hashslot %d to a different node "
"while I still hold keys for this hash slot.", slot);
return;
}
}
/* If this slot is in migrating status but we have no keys
* for it assigning the slot to another node will clear
* the migratig status. */
if (countKeysInSlot(slot) == 0 &&
server.cluster->migrating_slots_to[slot])
server.cluster->migrating_slots_to[slot] = NULL;
/* If this node was importing this slot, assigning the slot to
* itself also clears the importing status. */
if (n == myself &&
server.cluster->importing_slots_from[slot])
{
/* This slot was manually migrated, set this node configEpoch
* to a new epoch so that the new version can be propagated
* by the cluster.
*
* Note that if this ever results in a collision with another
* node getting the same configEpoch, for example because a
* failover happens at the same time we close the slot, the
* configEpoch collision resolution will fix it assigning
* a different epoch to each node. */
if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
serverLog(LL_WARNING,
"configEpoch updated after importing slot %d", slot);
}
server.cluster->importing_slots_from[slot] = NULL;
}
clusterDelSlot(slot);
clusterAddSlot(n,slot);
以上为止,完成了一次槽位迁移(重新分片)
参考:
1.https://www.cnblogs.com/gqtcgq/p/7247043.html