Redis-集群故障检测与选新主实现
Redis Cluster集群中是如何进行故障检测的,故障检测到后如何进行新主选举的,选举完之后又发生了什么?
如下,ClusterState、ClusterNode中的currentEpoch、configEpoch、fail_reports都是很重的属性,和故障检测与故障后选主有关。
#cluster.h
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
struct clusterNode *node; /* Node reporting the failure condition. */
mstime_t time; /* Time of the last report from this node. */
} clusterNodeFailReport;
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known clients port of this node */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
//用于槽位迁移,迁移槽位slot 100时,获取这个数组下标100处的值,即为槽位slot 100上的redis key个数
uint64_t slots_keys_count[CLUSTER_SLOTS];
//用于槽位迁移,记录了槽位slot 对应的key有哪些
rax *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or zero if stil not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The followign fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
} clusterState;
故障检测
假设集群中有A、B、C三个节点,A识别C可能故障了,即pfail,之后A发PING心跳包给B进行故障节点信息的传播
redis 集群 节点故障检测:https://www.cnblogs.com/gqtcgq/p/7247044.html
在Redis cluster中,每个节点都有一个定时任务ClusterCron,见如下,方法中,A节点向C节点发PING后没收到PONG,未收到响应时间超过cluster_node_timeout则判定C节点是pfail
void clusterCron(void) {
。。。
delay = now - node->ping_sent;
if (delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (REDIS_NODE_PFAIL|REDIS_NODE_FAIL))) {
redisLog(REDIS_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= REDIS_NODE_PFAIL;
update_state = 1;
}
}
在A节点中,将C节点状态设置为pfail之后,心跳包中除了正常节点的信息外,将当前节点nodes列表中,状态为pfail的节点全部放在ping包的最后,然后发出去,这样节点的pfail状态就被传播到其它节点,如下转态为非pfail的都跳过
/* Send a PING or PONG packet to the specified node, making sure to add enough
* gossip informations. */
void clusterSendPing(clusterLink *link, int type) {
...
/* If there are PFAIL nodes, add them at the end. */
if (pfail_wanted) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL && pfail_wanted > 0) {
clusterNode *node = dictGetVal(de);
if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
if (node->flags & CLUSTER_NODE_NOADDR) continue;
if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
clusterSetGossipEntry(hdr,gossipcount,node);
freshnodes--;
gossipcount++;
/* We take the count of the slots we allocated, since the
* PFAIL stats may not match perfectly with the current number
* of PFAIL nodes. */
pfail_wanted--;
}
B节点收到A节点的心跳包(含有C pfail的信息)后怎么处理的呢,B通过TCP端口收到A的信息后,在clusterProcessGossipSection中处理的,如下,B节点收到后,clusterNodeAddFailureReport方法中将这个信息记录在nodes[C]的fail_report(是个链表)中,这个链表表示B收到有多少个其它节点认为它pfail了
void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
uint16_t count = ntohs(hdr->count);
clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
while(count--) {
uint16_t flags = ntohs(g->flags);
clusterNode *node;
sds ci;
if (server.verbosity == LL_DEBUG) {
ci = representClusterNodeFlags(sdsempty(), flags);
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
g->nodename,
g->ip,
ntohs(g->port),
ntohs(g->cport),
ci);
sdsfree(ci);
}
/* Update our state accordingly to the gossip sections */
node = clusterLookupNode(g->nodename);
if (node) {
/* We already know this node.
Handle failure reports, only when the sender is a master. */
if (sender && nodeIsMaster(sender) && node != myself) {
if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
if (clusterNodeAddFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s as not reachable.",
sender->name, node->name);
}
markNodeAsFailingIfNeeded(node);
} else {
if (clusterNodeDelFailureReport(node,sender)) {
serverLog(LL_VERBOSE,
"Node %.40s reported node %.40s is back online.",
sender->name, node->name);
}
}
}
接着,在B节点markNodeAsFailingIfNeeded中,判断fail_reports中的节点数是否超过集群size/2,如果超过则将node[c]的状态改为fail,之后B节点将C节点状态变为fail的信息广播给所有节点,而不是像心跳一样只发送给部分节点。
void markNodeAsFailingIfNeeded(clusterNode *node) {
int failures;
int needed_quorum = (server.cluster->size / 2) + 1;
if (!nodeTimedOut(node)) return; /* We can reach it. */
if (nodeFailed(node)) return; /* Already FAILing. */
failures = clusterNodeFailureReportsCount(node);
/* Also count myself as a voter if I'm a master. */
if (nodeIsMaster(myself)) failures++;
if (failures < needed_quorum) return; /* No weak agreement from masters. */
serverLog(LL_NOTICE,
"Marking node %.40s as failing (quorum reached).", node->name);
//将节点设置为fail
/* Mark the node as failing. */
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL;
node->fail_time = mstime();
//如果当前是master节点,则将node标记为fail的消息广播出去
/* Broadcast the failing node name to everybody, forcing all the other
* reachable nodes to flag the node as FAIL. */
if (nodeIsMaster(myself)) clusterSendFail(node->name);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
}
#cluster.c
void clusterSendFail(char *nodename) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
}
#cluster.c
void clusterBroadcastMessage(void *buf, size_t len) {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,buf,len);
}
dictReleaseIterator(di);
}
选主
- 怎么做故障主从切换的呢
-
发起选举
-
如何投票(currentEpoch)
- 切换为主后,广播给所有其他节点,其他节点更新本地集群信息,如slot的负责人,涉及configEpoch
故障检测之后,A节点中将node[C]状态变为fail的信息发送给B、D,假设D是个从节点,且其主节点是C节点。D节点的ClusterCron()中,判断自己是slave,clusterHandleSlaveFailover()方法中
void clusterCron(void) {
...
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
clusterHandleSlaveFailover方法中,首先会进行过滤,
- 如果当前节点是master则直接返回
- 如果当前节点是slave,且与主断连超node_timeout*2则直接返回,不符合发起拉票的条件
void clusterHandleSlaveFailover(void) {
...
/* Pre conditions to run the function, that must be met both in case
* of an automatic or manual failover:
* 1) We are a slave.
* 2) Our master is flagged as FAIL, or this is a manual failover.
* 3) We don't have the no failover configuration set, and this is
* not a manual failover.
* 4) It is serving slots. */
if (nodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) ||
(server.cluster_slave_no_failover && !manual_failover) ||
myself->slaveof->numslots == 0)
{
/* There are no reasons to failover, so we set the reason why we
* are returning without failing over to NONE. */
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
return;
}
/* Set data_age to the number of seconds we are disconnected from
* the master. */
if (server.repl_state == REPL_STATE_CONNECTED) {
data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
* 1000;
} else {
data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
}
/* Remove the node timeout from the data age as it is fine that we are
* disconnected from our master at least for the time it was down to be
* flagged as FAIL, that's the baseline. */
if (data_age > server.cluster_node_timeout)
data_age -= server.cluster_node_timeout;
/* Check if our data is recent enough according to the slave validity
* factor configured by the user.
*
* Check bypassed for manual failovers. */
if (server.cluster_slave_validity_factor &&
data_age >
(((mstime_t)server.repl_ping_slave_period * 1000) +
(server.cluster_node_timeout * server.cluster_slave_validity_factor)))
{
if (!manual_failover) {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
return;
}
}
...
/* Ask for votes if needed. */
if (server.cluster->failover_auth_sent == 0) {
//拉票前currentEpoch+1
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
clusterRequestFailoverAuth方法中,假设当前是D节点,D节点将CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST请求广播给所有的节点,发起拉票
/* This function sends a FAILOVE_AUTH_REQUEST message to every node in order to
* see if there is the quorum for this slave instance to failover its failing
* master.
*
* Note that we send the failover request to everybody, master and slave nodes,
* but only the masters are supposed to reply to our query. */
void clusterRequestFailoverAuth(void) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the master is working. */
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(buf,totlen);
}
节点B收到D节点发来的请求变为主节点的请求后,会使用clusterSendFailoverAuthIfNeeded函数判定是否给D投票
int clusterProcessPacket(clusterLink *link) {
...
/* PING, PONG, MEET: process config information. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
...
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender,hdr);
...
/* Vote for the node asking for our vote if there are the conditions. */
void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
clusterNode *master = node->slaveof;
uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
...
//如果D的currentEpoch小于B的currentEpoch,B不会投票给D,直接返回
if (requestCurrentEpoch < server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
node->name,
(unsigned long long) requestCurrentEpoch,
(unsigned long long) server.cluster->currentEpoch);
return;
}
//如何B在当前currentEpoch周期已投过票,则不会投票给D
/* I already voted for this epoch? Return ASAP. */
if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
serverLog(LL_WARNING,
"Failover auth denied to %.40s: already voted for epoch %llu",
node->name,
(unsigned long long) server.cluster->currentEpoch);
return;
}
...
//B投票给D
/* We can vote for this slave. */
server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
node->slaveof->voted_time = mstime();
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
clusterSendFailoverAuth(node);
serverLog(LL_WARNING, "Failover auth granted to %.40s for epoch %llu",
node->name, (unsigned long long) server.cluster->currentEpoch);
}
clusterSendFailoverAuth()方法将返回CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK给D,表示B投票给D
/* Send a FAILOVER_AUTH_ACK message to the specified node. */
void clusterSendFailoverAuth(clusterNode *node) {
unsigned char buf[sizeof(clusterMsg)];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,buf,totlen);
}
D收到B的CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK,之后将faulover_auth_count++
int clusterProcessPacket(clusterLink *link) {
...
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
if (!sender) return 1; /* We don't know that node. */
/* We consider this vote only if the sender is a master serving
* a non zero number of slots, and its currentEpoch is greater or
* equal to epoch where this node started the election. */
if (nodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch)
{
server.cluster->failover_auth_count++;
/* Maybe we reached a quorum here, set a flag to make sure
* we check ASAP. */
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
}
从节点升级为主
如下,在D节点中,通过clusterCron定时任务扫描,调用clusterHandleSlaveFailover函数
void clusterCron(void) {
...
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
clusterHandleSlaveMigration(max_slaves);
}
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
D节点中,由于发起过拉票,所以server.cluster->failover_auth_sent大于0,如果server.cluster->failover_auth_count值大于集群size 的一半,则选举成功,调用clusterFailoverReplaceYourMaster方法
void clusterHandleSlaveFailover(void) {
int needed_quorum = (server.cluster->size / 2) + 1;
...
/* Ask for votes if needed. */
if (server.cluster->failover_auth_sent == 0) {
server.cluster->currentEpoch++;
server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
serverLog(LL_WARNING,"Starting a failover election for epoch %llu.",
(unsigned long long) server.cluster->currentEpoch);
clusterRequestFailoverAuth();
server.cluster->failover_auth_sent = 1;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
return; /* Wait for replies. */
}
/* Check if we reached the quorum. */
if (server.cluster->failover_auth_count >= needed_quorum) {
/* We have the quorum, we can finally failover the master. */
serverLog(LL_WARNING,
"Failover election won: I'm the new master.");
/* Update my configEpoch to the epoch of the election. */
if (myself->configEpoch < server.cluster->failover_auth_epoch) {
myself->configEpoch = server.cluster->failover_auth_epoch;
serverLog(LL_WARNING,
"configEpoch set to %llu after successful failover",
(unsigned long long) myself->configEpoch);
}
/* Take responsibility for the cluster slots. */
clusterFailoverReplaceYourMaster();
} else {
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
}
如下,D节点执行clusterFailoverReplaceYourMaster()
- clusterSetNodeAsMaster()中将D节点的slaveOf设置为null,即不作为slave节点
- replicationUnsetMaster()中取消于旧主节点之间的复制
- 遍历所有的槽位,将旧主负责的槽位负责人改为D节点本身
- clusterBroadcastPong()函数将D节点信息广播到所有节点,即将D节点升级为主,负责旧主的槽位信息广播出去
void clusterFailoverReplaceYourMaster(void) {
int j;
clusterNode *oldmaster = myself->slaveof;
if (nodeIsMaster(myself) || oldmaster == NULL) return;
/* 1) Turn this node into a master. */
clusterSetNodeAsMaster(myself);
replicationUnsetMaster();
/* 2) Claim all the slots assigned to our master. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeGetSlotBit(oldmaster,j)) {
clusterDelSlot(j);
clusterAddSlot(myself,j);
}
}
/* 3) Update state and save config. */
clusterUpdateState();
clusterSaveConfigOrDie(1);
/* 4) Pong all the other nodes so that they can update the state
* accordingly and detect that we switched to master role. */
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
/* 5) If there was a manual failover in progress, clear the state. */
resetManualFailover();
}
更新配置
D节点获取投票并升级为主后,广播PONG包给所有节点,假设节点B收到信息后,如下:
- 判断发送节点D宣称负责的槽位,是否与当前节点B记录的不同,若不同,在B节点中置dirty_slots为1。由于D节点是从节点升级上来的,B节点中保留的还是D节点旧负责人负责槽位
- clusterUpdateSlotsConfigWith()中,如果当前节点B节点的configEpoch小于D的configEpoch,则更新B节点中槽位的负责人信息(D负责的槽位)
- 有个例外情况,即B节点中槽位负责人的configEpoch反而大于D节点PONG包中的configEpoch,此时说明D信息需要更新,B节点会发送Update包给D节点,让D节点更新配置——这种情况是:发送节点之前是主节点,经过一段时间后又重新上线了,而此时在其他节点眼中,它负责的槽位已经被其他节点接手了,整个集群的configEpoch已升级
int clusterProcessPacket(clusterLink *link) {
...
/* PING, PONG, MEET: process config information. */
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
...
/* Update our info about served slots.
*
* Note: this MUST happen after we update the master/slave state
* so that CLUSTER_NODE_MASTER flag will be set. */
/* Many checks are only needed if the set of served slots this
* instance claims is different compared to the set of slots we have
* for it. Check this ASAP to avoid other computational expansive
* checks later. */
clusterNode *sender_master = NULL; /* Sender or its master if slave. */
int dirty_slots = 0; /* Sender claimed slots don't match my view? */
if (sender) {
sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
if (sender_master) {
dirty_slots = memcmp(sender_master->slots,
hdr->myslots,sizeof(hdr->myslots)) != 0;
}
}
/* 1) If the sender of the message is a master, and we detected that
* the set of slots it claims changed, scan the slots to see if we
* need to update our configuration. */
if (sender && nodeIsMaster(sender) && dirty_slots)
clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
/* 2) We also check for the reverse condition, that is, the sender
* claims to serve slots we know are served by a master with a
* greater configEpoch. If this happens we inform the sender.
*
* This is useful because sometimes after a partition heals, a
* reappearing master may be the last one to claim a given set of
* hash slots, but with a configuration that other instances know to
* be deprecated. Example:
*
* A and B are master and slave for slots 1,2,3.
* A is partitioned away, B gets promoted.
* B is partitioned away, and A returns available.
*
* Usually B would PING A publishing its set of served slots and its
* configEpoch, but because of the partition B can't inform A of the
* new configuration, so other nodes that have an updated table must
* do it. In this way A will stop to act as a master (or can try to
* failover if there are the conditions to win the election). */
if (sender && dirty_slots) {
int j;
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(hdr->myslots,j)) {
if (server.cluster->slots[j] == sender ||
server.cluster->slots[j] == NULL) continue;
if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch)
{
serverLog(LL_VERBOSE,
"Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s",
sender->name, server.cluster->slots[j]->name);
clusterSendUpdate(sender->link,
server.cluster->slots[j]);
/* TODO: instead of exiting the loop send every other
* UPDATE packet for other nodes that are the new owner
* of sender's slots. */
break;
}
}
}
}
...
#clusterUpdateSlotsConfigWith()方法
void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
...
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots,j)) {
/* The slot is already bound to the sender of this message. */
if (server.cluster->slots[j] == sender) continue;
/* The slot is in importing state, it should be modified only
* manually via redis-trib (example: a resharding is in progress
* and the migrating side slot was already closed and is advertising
* a new config. We still want the slot to be closed manually). */
if (server.cluster->importing_slots_from[j]) continue;
/* We rebind the slot to the new node claiming it if:
* 1) The slot was unassigned or the new node claims it with a
* greater configEpoch.
* 2) We are not currently importing the slot. */
if (server.cluster->slots[j] == NULL ||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
{
/* Was this slot mine, and still contains keys? Mark it as
* a dirty slot. */
if (server.cluster->slots[j] == myself &&
countKeysInSlot(j) &&
sender != myself)
{
dirty_slots[dirty_slots_count] = j;
dirty_slots_count++;
}
if (server.cluster->slots[j] == curmaster)
newmaster = sender;
clusterDelSlot(j);
clusterAddSlot(sender,j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_FSYNC_CONFIG);
}
}
}
currentEpoch 与 configEpoch,currentEpoch是集群的epoch,用于主节点的选举;configEpoch则用于多个节点间配置冲突时,协调以哪个配置为准。
参考:
- https://www.cnblogs.com/gqtcgq/p/7247042.html