Redis-集群槽位重分片Key迁移实现

​ ​ Redis提供了辅助脚本redis-trib.rb,以”reshard”参数调用该脚本就可以实现重新分片的操作。但是本质上,该脚本就是通过向迁入节点和迁出节点发送一些命令实现的。

​ 假设要从A节点迁移slot槽位到B节点

设置迁入节点

​ 向迁入节点B发送” CLUSTER SETSLOT IMPORTING ”命令,其中是要迁入的槽位号,是当前负责该槽位的节点。在函数clusterCommand中,处理该命令的代码如下。

  1. 对cluster setslot命令,首先从参数获取槽位号slot,如果解析错误则直接返回错误
  2. 如果当前节点已经是负责槽位slot[slot]的节点,则直接返回
  3. 先在B的nodes数组中查找迁入节点是否存在,不存在则直接返回;存在则将B的importing_slots_from[slot]设置为A

​ #cluster.c

void clusterCommand(client *c) {
		...
		
    } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
        /* SETSLOT 10 MIGRATING <node ID> */
        /* SETSLOT 10 IMPORTING <node ID> */
        /* SETSLOT 10 STABLE */
        /* SETSLOT 10 NODE <node ID> */
        int slot;
        clusterNode *n;

        if (nodeIsSlave(myself)) {
            addReplyError(c,"Please use SETSLOT only with masters.");
            return;
        }
				...
        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
            ...
        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
            if (server.cluster->slots[slot] == myself) {
                addReplyErrorFormat(c,
                    "I'm already the owner of hash slot %u",slot);
                return;
            }
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            server.cluster->importing_slots_from[slot] = n;

设置迁出节点

​ 向迁出节点A发送” CLUSTER SETSLOT MIGRATING ”命令

  1. 首先判断slots[slot]是否是等于myself,如果不是则说明槽位不由A节点负责,直接返回错误。
  2. 判断迁入节点B是否存在于当前节点的nodes数组中,如果不存在则说明当前节点不认识B节点,直接返回错误
  3. 最后设置A节点的migrating_slots_to[slot]=B,表示该槽位正在由A节点迁到B节点
void clusterCommand(client *c) {
		...
		
    } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
				...
				
        if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
            if (server.cluster->slots[slot] != myself) {
                addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
                return;
            }
            if ((n = clusterLookupNode(c->argv[4]->ptr)) == NULL) {
                addReplyErrorFormat(c,"I don't know about node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            server.cluster->migrating_slots_to[slot] = n;
        } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
            ...
            server.cluster->importing_slots_from[slot] = n;

获取槽位对应的key

​ 回到”CLUSTER GETKEYSINSLOT“命令,在函数clusterCommand中,通过getKeysInSlot()获取最多maxKeys个key

void clusterCommand(client *c) {
				...
				
    } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
 				...
        /* Avoid allocating more than needed in case of large COUNT argument
         * and smaller actual number of keys. */
        unsigned int keys_in_slot = countKeysInSlot(slot);
        if (maxkeys > keys_in_slot) maxkeys = keys_in_slot;

        keys = zmalloc(sizeof(robj*)*maxkeys);
        numkeys = getKeysInSlot(slot, keys, maxkeys);
        addReplyMultiBulkLen(c,numkeys);
        for (j = 0; j < numkeys; j++) {
            addReplyBulk(c,keys[j]);
            decrRefCount(keys[j]);
        }
        zfree(keys);

​ 从rax树中获取槽位slot对应的key

unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
    raxIterator iter;
    int j = 0;
    unsigned char indexed[2];

    indexed[0] = (hashslot >> 8) & 0xff;
    indexed[1] = hashslot & 0xff;
    raxStart(&iter,server.cluster->slots_to_keys);
    raxSeek(&iter,">=",indexed,2);
    while(count-- && raxNext(&iter)) {
        if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
        keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
    }
    raxStop(&iter);
    return j;
}

向迁出节点A发送MIGRATE命令

​ 向迁出节点发送”MIGRATE [COPY |REPLACE]”命令

​ 针对上面getkeysinslot命令获取的每个key,向迁出节点A发送”MIGRATE [COPY |REPLACE]"命令,将key从A迁移到B节点。如果最后一个参数是REPLACE,则发送成功之后,还要在当前实例中删除该key;如果是COPY,则无需删除key;默认参数就是REPLACE。

​ MIGRATE命令的处理函数是migrateCommand,如下:

  1. 从当前连接的数据库中查找key,得到其value,并赋值给ov[oi]
  2. 对每个key,dump其对应的value,然后放入payload中,并封装为restore-asking命令
  3. 之后将包由节点A发送给节点B
void migrateCommand(client *c) {
		...

    for (j = 0; j < num_keys; j++) {
        if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
            kv[oi] = c->argv[first_key+j];
            oi++;
        }
    }
    ...

    /* Create RESTORE payload and generate the protocol to call the command. */
    for (j = 0; j < num_keys; j++) {
        ...
						
        createDumpPayload(&payload,ov[j],kv[j]);
        		...
    }

​ B节点收到包后解析,读取key,并读取dump格式的value,将key和value保存到B节点中,如果有设置TTL则设置TTL,最后返回OK给A

void restoreCommand(client *c) {
		...

    rioInitWithBuffer(&payload,c->argv[3]->ptr);
    if (((type = rdbLoadObjectType(&payload)) == -1) ||
        ((obj = rdbLoadObject(type,&payload,key)) == NULL))
    {
        addReplyError(c,"Bad data format");
        return;
    }
		...

    /* Create the key and set the TTL if any */
    dbAdd(c->db,key,obj);
    if (ttl) {
        setExpire(c,c->db,key,ttl);
    }
    objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
    signalModifiedKey(c->db,key);
    addReply(c,shared.ok);
    server.dirty++;
}

向所有节点发送SETSLOT命令

​ 向所有节点发送”CLUSTER SETSLOT NODE ”命令

​ 当槽位中的所有key都迁移完成后,需要向集群中所有节点,包括迁出和迁入的A、B节点,以便通知所有节点,更新槽位 新的负责节点为

  1. 首先根据参数在字典server.cluster->nodes中查询新的负责该槽位的节点n,若找不到,则回复客户端错误信息后返回
  2. A收到该信息后,判断槽位slot中的key个数为0则key已经迁出完成,将migrating_slots_to[slot]设置为NULL
  3. B收到该信息后,且n是myself,则将importing_slots_from[slot]设置为NULL
  4. 调用clusterDelSlot清空该slot相关的信息,然后调用clusterAddSlot,将该槽位的负责人改为节点n;
void clusterCommand(client *c) {
		...
    } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
        ...
        } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
            /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
            clusterNode *n = clusterLookupNode(c->argv[4]->ptr);

            if (!n) {
                addReplyErrorFormat(c,"Unknown node %s",
                    (char*)c->argv[4]->ptr);
                return;
            }
            /* If this hash slot was served by 'myself' before to switch
             * make sure there are no longer local keys for this hash slot. */
            if (server.cluster->slots[slot] == myself && n != myself) {
                if (countKeysInSlot(slot) != 0) {
                    addReplyErrorFormat(c,
                        "Can't assign hashslot %d to a different node "
                        "while I still hold keys for this hash slot.", slot);
                    return;
                }
            }
            /* If this slot is in migrating status but we have no keys
             * for it assigning the slot to another node will clear
             * the migratig status. */
            if (countKeysInSlot(slot) == 0 &&
                server.cluster->migrating_slots_to[slot])
                server.cluster->migrating_slots_to[slot] = NULL;

            /* If this node was importing this slot, assigning the slot to
             * itself also clears the importing status. */
            if (n == myself &&
                server.cluster->importing_slots_from[slot])
            {
                /* This slot was manually migrated, set this node configEpoch
                 * to a new epoch so that the new version can be propagated
                 * by the cluster.
                 *
                 * Note that if this ever results in a collision with another
                 * node getting the same configEpoch, for example because a
                 * failover happens at the same time we close the slot, the
                 * configEpoch collision resolution will fix it assigning
                 * a different epoch to each node. */
                if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
                    serverLog(LL_WARNING,
                        "configEpoch updated after importing slot %d", slot);
                }
                server.cluster->importing_slots_from[slot] = NULL;
            }
            clusterDelSlot(slot);
            clusterAddSlot(n,slot);

​ 以上为止,完成了一次槽位迁移(重新分片)

参考:

1.https://www.cnblogs.com/gqtcgq/p/7247043.html