ZooKeeper 3.4.10 集群深度剖析(下)—— Leader 选举、数据同步、Java 客户端与运维实践
上篇讲透了 ZAB 协议,中篇深入了数据模型、Session 和 Watcher。下篇聚焦四个团队高频话题:Leader 选举到底怎么选、数据同步的三种策略、Java 客户端底层 NIO 通信的每一步、以及生产环境的运维最佳实践。
七、Leader 选举 —— 谁当老大
7.1 选举算法演进
ZooKeeper 历史上经历了三代选举算法:
ZooKeeper 3.3.x 及之前:
LeaderElection (最原始,类似 UDP 广播)
AuthFastLeaderElection (基于授权验证的快速选举)
FastLeaderElection (FLE,TCP 版快速选举)
ZooKeeper 3.4.0+:
只剩 FastLeaderElection,其他都被移除了
FastLeaderElection(FLE) 是 ZK 3.4.10 的唯一选举算法。核心设计:
- 每个节点有一个
myid(手动配置的 1-255 整数) - 每次选举有一个
epoch(logical clock,逻辑时钟) - 投票传输的是三元组:
(epoch, zxid, myid)
7.2 选举投票结构
// FastLeaderElection.java: Vote 内部类
static public class Vote {
private final long id; // 投票给谁的 myid
private final long zxid; // 被投票节点的最大 zxid
private final long epoch; // 被投票节点的逻辑时钟(当前选举轮次)
// 比较逻辑: PK 谁更适合当 Leader
public boolean isBetterThan(Vote other) {
// 优先级: epoch > zxid > myid
if (this.epoch > other.epoch) return true;
if (this.epoch < other.epoch) return false;
if (this.zxid > other.zxid) return true;
if (this.zxid < other.zxid) return false;
return this.id > other.id; // myid 大的优先(确定性)
}
}
优先级规则 epoch > zxid > myid 的含义:
epoch 最大 = 该节点经历过最多的 Leader 任期
= 它看到过最多的集群状态变化
= 最"资深"
zxid 最大 = 该节点的数据最新
= 它执行过最多的事务
= 最"完整"
myid 最大 = epoch 和 zxid 都相同时的 tie-breaker
= 确定性裁决(人为配置,不会变)
7.3 FLE 选举过程 —— 完整状态机
┌─────────────────────────────────────────────────────────────┐
│ FastLeaderElection 选举过程 │
│ │
│ 5 节点集群: myid=1,2,3,4,5 │
│ 假设 myid=1 是旧 Leader,刚刚挂了,myid=2,3,4,5 发起选举 │
│ │
│ Step 1: 初始化投票(每个节点先投自己) │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Node-2: vote={id:2, zxid:0x100000010, epoch:2} │ │
│ │ Node-3: vote={id:3, zxid:0x10000000f, epoch:2} │ │
│ │ Node-4: vote={id:4, zxid:0x100000010, epoch:2} │ │
│ │ Node-5: vote={id:5, zxid:0x100000008, epoch:2} │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 2: 广播自己的投票 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ 每个节点把自己的 Vote 发给集群中所有其他节点 │ │
│ │ Notification n = {vote, state:LOOKING, sender=myid} │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 3: 接收其他节点的投票,PK 后更新 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Node-2 收到 Node-3 的 Vote: │ │
│ │ my vote: {epoch:2, zxid:0x100000010, id:2} │ │
│ │ his vote: {epoch:2, zxid:0x10000000f, id:3} │ │
│ │ PK: epoch 相同 → 比 zxid → 0x100000010 > 0x10000000f│ │
│ │ 我的 zxid 更大 → 不改票,继续投自己 │ │
│ │ │ │
│ │ Node-2 收到 Node-4 的 Vote: │ │
│ │ my vote: {epoch:2, zxid:0x100000010, id:2} │ │
│ │ his vote: {epoch:2, zxid:0x100000010, id:4} │ │
│ │ PK: epoch 相同 → zxid 相同 → 比 myid → 4 > 2 │ │
│ │ 他更适合 → 改票投 Node-4! │ │
│ └──────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Step 4: 收到外票后改票,重新广播 │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ Node-2 改投 Node-4 → 广播新 Vote │ │
│ │ Node-3 收到后 PK → epoch/zxid相同,myid 4>3 → 也投Node-4│ │
│ │ Node-5 亦然 │ │
│ │ │ │
│ │ 最终统计: │ │
│ │ Node-2 投 Node-4 │ │
│ │ Node-3 投 Node-4 │ │
│ │ Node-4 投 Node-4 (自己) │ │
│ │ Node-5 投 Node-4 │ │
│ │ │ │
│ │ Node-4 获得 4 票 ≥ 3 (半数) │ │
│ │ → Node-4 成为新 Leader! │ │
│ └──────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
7.4 FLE 核心源码走读
// FastLeaderElection.java: lookForLeader() - 选举主循环
public Vote lookForLeader() throws InterruptedException {
// 1. 更新逻辑时钟 (electrical epoch)
logicalclock.incrementAndGet();
// 2. 初始投票:投自己
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
// 3. 广播自己的投票
sendNotifications();
// 4. 主循环:收票、PK、改票、重新广播,直到选出 Leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 4a. 从 recvqueue 中取一个投票通知
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
if (n == null) {
// 超时,重新广播自己的投票(可能是网络丢包了)
sendNotifications();
continue;
}
// 4b. 判断通知来源的状态
switch (n.state) {
case LOOKING:
// 对方也在选举中,PK!
if (n.electionEpoch > logicalclock.get()) {
// 对方逻辑时钟更大,说明我们落后了
// 更新自己的逻辑时钟,清空已收集的票
logicalclock.set(n.electionEpoch);
recvset.clear();
// PK 后更新投票
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(proposedLeader, proposedZxid, proposedEpoch);
}
sendNotifications();
} else if (n.electionEpoch == logicalclock.get()) {
// 同一轮选举,PK
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
}
// 对方 epoch 更小 → 忽略
// 4c. 关键:计票
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.peerEpoch));
// 检查是否过半
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid, proposedEpoch))) {
// 过半!但还要做最后的确认
while ((n = recvqueue.poll(finalizeWait, ...)) != null) {
if (totalOrderPredicate(...)) {
// 有新票比我更适合,放弃当选
recvqueue.put(n); // 放回去
break;
}
}
if (n == null) {
// 最终确认:我当选了!
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING : ServerState.FOLLOWING);
// 更新 epoch
setCurrentVote(makeVote(proposedLeader, ...));
// 退出循环
return makeVote(proposedLeader, proposedZxid, proposedEpoch);
}
}
break;
case LEADING:
case FOLLOWING:
// 对方已经确定了 Leader
// 判断对方的信息是否更新
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.peerEpoch));
// 直接检查:对方宣称的 Leader 是否已经过半
if (termPredicate(recvset,
new Vote(n.leader, n.zxid, n.peerEpoch, n.state))
&& checkLeader()) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING : ServerState.FOLLOWING);
return makeVote(n.leader, n.zxid, n.peerEpoch);
}
}
break;
}
}
return null;
}
totalOrderPredicate —— PK 比较的核心:
// FastLeaderElection.java
protected boolean totalOrderPredicate(long newId, long newZxid,
long newEpoch, long curId, long curZxid, long curEpoch) {
// 返回值 true = newVote 比 curVote 更适合当 Leader
if (newEpoch > curEpoch) return true;
if (newEpoch < curEpoch) return false;
if (newZxid > curZxid) return true;
if (newZxid < curZxid) return false;
return newId > curId; // epoch 和 zxid 相同,myid 大的胜出
}
termPredicate —— 过半判断:
// 判断某个候选者是否获得了过半投票
protected boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
int count = 0;
for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
if (entry.getValue().id == vote.id) {
count++;
}
}
// 需要严格过半: count > n/2
// 5 节点需要 > 2.5 → 至少 3 票
return count > (self.getVotingView().size() / 2);
}
7.5 FLE 的网络架构 —— 为什么快
// FastLeaderElection 的内部线程结构
// 消息的收发是完全异步的,不阻塞选举主循环
┌─────────────────────────────────────────────────────┐
│ FastLeaderElection │
│ │
│ ┌──────────────┐ ┌────────────────┐ │
│ │ WorkerSender │────►│ QuorumCnxManager│──► 网络 │
│ │ (发送线程) │ │ (连接管理器) │ 发送队列 │
│ └──────┬───────┘ └────────────────┘ │
│ │ 从 queueSendMap 取通知 │
│ │ │
│ ┌──────┴───────┐ ┌────────────────┐ │
│ │WorkerReceiver│◄────│ QuorumCnxManager│◄── 网络 │
│ │ (接收线程) │ │ (连接管理器) │ 接收 │
│ └──────┬───────┘ └────────────────┘ │
│ │ 放入 recvqueue │
│ ▼ │
│ ┌──────────────┐ │
│ │ lookForLeader │ ← 主循环从 recvqueue poll │
│ │ (主线程) │ 不阻塞网络 I/O! │
│ └──────────────┘ │
└─────────────────────────────────────────────────────┘
关键设计:
- 两个 Worker 线程完全独立于选举逻辑
- 主循环只做"收票 → PK → 改票 → 入队"的决策逻辑
- 网络 I/O 和选举逻辑完全解耦
- 通过 sid 的哈希来决定 myid 小的主动连接 myid 大的
→ 避免两两互相连接,减少 TCP 连接数
7.6 选举触发场景
场景 1: 集群启动
所有节点 state = LOOKING
→ 同时进入 lookForLeader()
→ 收敛最快(通常 < 200ms)
场景 2: Leader 宕机
Follower 发现与 Leader 连接断开
→ syncLimit 超时后 state = LOOKING
→ 发起选举
场景 3: Follower 宕机后恢复
如果 Leader 还在,不需要选举
→ Follower 恢复后 state = FOLLOWING
→ 直接与 Leader 同步
场景 4: 网络分区
- 少数派 (2台): 无法形成过半 → 保持 LOOKING → 拒绝服务
- 多数派 (3台): 选举新 Leader → 继续服务
→ 分区恢复后,少数派发现 epoch 落后 → 自动同步
7.7 为什么推荐奇数台
┌──────────────────────────────────────────┐
│ 集群规模 vs 容错能力 │
│ │
│ 节点数 │ 过半 │ 容忍故障 │ 利用率 │
│ ────────┼──────┼─────────┼────────── │
│ 3 │ 2 │ 1 │ 33% (1/3) │
│ 4 │ 3 │ 1 │ 25% (1/4) │
│ 5 │ 3 │ 2 │ 40% (2/5) │
│ 6 │ 4 │ 2 │ 33% (2/6) │
│ 7 │ 4 │ 3 │ 43% (3/7) │
│ │
│ 结论: 4 台和 3 台的容忍度相同(都是1台) │
│ 6 台和 5 台的容忍度相同(都是2台) │
│ → 奇数台性价比最高 │
│ → 5 台是生产环境最常见的规模 │
└──────────────────────────────────────────┘
八、数据同步 —— 只要不是 Leader,就得追数据
8.1 事务日志与快照
ZK 的数据持久化靠两个东西:事务日志(Transaction Log)和快照(Snapshot)。
┌─────────────────────────────────────────────────────────┐
│ ZK 数据持久化架构 │
│ │
│ 写请求 → DataTree (内存) │
│ │ │
│ ├──► Transaction Log (事务日志, 实时追加) │
│ │ dataDir/version-2/log.xxx │
│ │ 格式: log.<first_zxid> │
│ │ 每条记录: {checksum, txnHeader, Record} │
│ │ │
│ └──► Snapshot (快照, 定期生成) │
│ dataDir/version-2/snapshot.<last_zxid> │
│ 格式: │
│ FileHeader (magic + version + dbId) │
│ + sessions (所有活跃 session) │
│ + DataTree (完整序列化) │
│ + checksum │
└─────────────────────────────────────────────────────────┘
事务日志的 WAL(Write-Ahead Log)机制:
// FileTxnLog.java: append()
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException {
// 1. 先写日志到磁盘(force 参数控制是否 fsync)
// force = true → 每次写都 fsync (强一致, 慢)
// force = false → 批量 fsync (最终一致, 快)
OutputStream oStream = getOutputStream(hdr.getZxid());
BinaryOutputArchive oa = BinaryOutputArchive.getArchive(oStream);
oa.writeLong(checksum, "checksum");
serialize(hdr, txn, oa);
oStream.flush();
// 2. 日志提交后才更新内存 DataTree
// → 崩溃恢复时可以从日志重放
}
// FileTxnSnapLog.java: 快照触发逻辑
public void save(DataTree dataTree, ConcurrentHashMap<Long, Integer> sessionsWithTimeouts) {
long lastZxid = dataTree.lastProcessedZxid;
// 条件: 最新 zxid > 上次快照 zxid + snapCount
if (lastZxid - lastSnapshotZxid >= snapCount) {
File snapshotFile = new File(snapDir, "snapshot." + lastZxid);
// 序列化 DataTree 到文件
SerializeUtils.serialize(dataTree, sessionsWithTimeouts, snapshotFile);
lastSnapshotZxid = lastZxid;
}
}
8.2 三种同步策略
当 Follower 与 Leader 建立连接后,首先要完成数据同步,然后才能加入广播。同步策略根据 Follower 的状态分三种:
Follower 发 lastZxid 给 Leader
│
▼
┌─────────────────────────┐
│ Leader 比较: │
│ follower.lastZxid │
│ vs │
│ leader.minCommittedLog │
│ vs │
│ leader.maxCommittedLog │
└────────────┬────────────┘
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ SNAP │ │ DIFF │ │ TRUNC │
│ (全量同步) │ │ (增量同步) │ │ (截断同步) │
└─────────────┘ └─────────────┘ └─────────────┘
触发条件: 触发条件: 触发条件:
follower.zxid < follower.zxid >= follower.zxid >
minCommittedLog minCommittedLog maxCommittedLog
差距太大, 差距在 Leader Follower 比 Leader
Leader 缓存 缓存范围内, 还新!上轮 Commit
已经清了 只发差异 Proposal 没完成,要回退
SNAP 同步过程:
┌─────────────────────────────────────────────────────────┐
│ Leader Follower │
│ ────── ──────── │
│ │
│ 1. Leader 发现 Follower zxid 太小 │
│ → 发 SNAP 指令 │
│ ├─────────────────────────────►│ │
│ │ │
│ 2. Leader 生成当前快照 │
│ 并发送给 Follower │ │
│ ├──────── snapshot ──────────►│ │
│ │ │
│ │ 3. Follower │
│ │ 清空本地数据 │
│ │ 加载快照 │
│ │ 重置 zxid │
│ │ │
│ 4. 快照之后可能又有新事务 │ │
│ → 补充发送 DIFF │ │
│ ├── DIFF (快照后的差异) ─────►│ │
│ │ │
│ 5. 同步完成, 发 NEWLEADER │ │
│ 并带上新的 epoch │ │
│ ├──────── NEWLEADER ────────►│ │
│ │ │
│ 6. Follower ACK NEWLEADER │
│ │◄──────── ACK ───────────────┤ │
│ │ │
│ 7. 确认过半 Follower 已 ACK │
│ → 发 UPTODATE 指令, Follower 加入广播 │
│ ├──────── UPTODATE ─────────►│ │
│ │ │
│ │ Follower 可以开始 │
│ │ 接收 Proposal 了! │
└─────────────────────────────────────────────────────────┘
DIFF 增量同步的关键约束:
// Leader.java: getProposalsFromLog()
// 增量同步时 Leader 从其 proposals 队列中取差异
ConcurrentLinkedQueue<Proposal> outstandingProposals;
// 这个队列缓存的 Proposal 数量是有限的
// 限制: 不能超过 commitLogCount (默认 500)
// + 快照间隔 (snapCount, 默认 100000)
// 如果 Follower 的 lastZxid < minCommittedLog
// → Leader 已经清了缓存 → 无法 DIFF → 只能 SNAP
8.3 TRUNC 截断 —— 一个容易忽略的场景
TRUNC 发生在 Follower 的数据比 Leader 更新时。什么时候 Follower 会比 Leader 更”新”?
场景: Leader 在发出 Commit 消息之前宕机
时间线:
─────────────────────────────────────────────────►
Leader: Proposal#100 ── 收到过半 ACK ── [挂!]
(zxid=0x100000064) │
│
Follower-A: 写日志 ✓ ── ACK ✓ ── 还没收到 Commit
Follower-B: 写日志 ✓ ── ACK ✓ ── 还没收到 Commit
Follower-C: 写日志 ✓ ── ACK ✓ ── 还没收到 Commit
现在 Follower-A/B/C 的 lastZxid = 0x100000064
但 Leader 挂之前可能没来得及更新自己的 lastZxid
新 Leader (假设是 A) 当选后:
→ A 的 lastZxid = 0x100000064
→ 但 A 发现 Proposal#100 没有 Commit
→ A 必须告诉 B、C:Proposal#100 作废(TRUNC 到 0x100000063)
TRUNC 流程:
Leader → Follower: TRUNC 指令 (截断到 zxid = 0x100000063)
Follower 收到后:
1. 从事务日志中删除 zxid > 0x100000063 的记录
2. 从内存 DataTree 中回退对应的数据
3. 发送 ACK
九、Java 客户端底层原理 —— 每一字节怎么走的
9.1 连接建立全流程
// 用户代码
ZooKeeper zk = new ZooKeeper(
"10.0.0.1:2181,10.0.0.2:2181,10.0.0.3:2181,10.0.0.4:2181,10.0.0.5:2181",
10000, // sessionTimeout
watchedEvent -> System.out.println("状态变化: " + watchedEvent.getState())
);
这个构造函数背后发生了什么?
Step 1: ConnectStringParser 解析连接串
┌─────────────────────────────────────────────────────────┐
│ ConnectStringParser parser = new ConnectStringParser( │
│ "10.0.0.1:2181,10.0.0.2:2181,..." │
│ ); │
│ │
│ 解析结果: │
│ serverAddresses = [ │
│ InetSocketAddress(10.0.0.1, 2181), │
│ InetSocketAddress(10.0.0.2, 2181), │
│ InetSocketAddress(10.0.0.3, 2181), │
│ InetSocketAddress(10.0.0.4, 2181), │
│ InetSocketAddress(10.0.0.5, 2181) │
│ ] │
│ chrootPath = null (没有 /chroot 路径) │
└─────────────────────────────────────────────────────────┘
Step 2: StaticHostProvider 打散地址
┌─────────────────────────────────────────────────────────┐
│ // 创建时会把地址列表 shuffle(打散) │
│ // 避免所有客户端同时连第一台,造成负载不均 │
│ Collections.shuffle(serverAddresses); │
│ │
│ 打散后: │
│ [10.0.0.3, 10.0.0.1, 10.0.0.5, 10.0.0.2, 10.0.0.4] │
│ │
│ 连接时轮询: 从 index=0 开始,连接失败移到下一个 │
└─────────────────────────────────────────────────────────┘
Step 3: ClientCnxn 创建双线程
┌─────────────────────────────────────────────────────────┐
│ ClientCnxn cnxn = new ClientCnxn( │
│ connectString, sessionTimeout, watcher, │
│ hostProvider │
│ ); │
│ │
│ cnxn.start() 启动两个线程: │
│ - SendThread: 负责网络 I/O、心跳、重连 │
│ - EventThread: 负责回调用户的 Watcher │
└─────────────────────────────────────────────────────────┘
9.2 ClientCnxnSocketNIO —— NIO 通信底层
// ClientCnxnSocketNIO.java: 核心 I/O 方法
class ClientCnxnSocketNIO extends ClientCnxnSocket {
private SocketChannel sock; // NIO SocketChannel
private Selector selector; // NIO Selector(3.4.10 没有用,用阻塞模式)
private ByteBuffer lenBuffer; // 4 字节:消息长度
private ByteBuffer incomingBuffer; // 消息体
// 注:ZooKeeper 3.4.10 的客户端实际使用阻塞 I/O
// 真正的 NIO 客户端在 3.5+ 才引入(Netty)
// 但服务端 NIOServerCnxnFactory 使用 NIO
// 客户端使用 ClientCnxnSocketNIO 名字带 NIO
// 实际是基于 SocketChannel 的阻塞模式
}
客户端实际发包格式:
每个请求/响应都遵循这个格式:
┌────────────┬────────────────┬─────────────────────┐
│ 4 bytes │ variable │ variable │
│ 长度 (len) │ RequestHeader │ Request Body │
│ │ (序列化) │ (序列化) │
└────────────┴────────────────┴─────────────────────┘
具体字节流:
┌──────────────────────────────────────────────┐
│ [0x00 0x00 0x00 0x2C] [header bytes] [body bytes]
│ ↑ 长度=44
│ ↑ jute 序列化的 RequestHeader
│ - xid: int (4 bytes)
│ - type: int (4 bytes) ← 操作码
│ - ...
└──────────────────────────────────────────────┘
ConnectRequest 的字节流示例:
┌────────────────────────────────────────────────┐
│ 长度 │ xid │ type │ version │ lastZxid │ timeout │ sessionId │ passwd │
│ 4B │ 4B │ 4B │ 4B │ 8B │ 4B │ 8B │ 16B │
└────────────────────────────────────────────────┘
↑ xid=0 表示是"设置阶段"的请求
9.3 SendThread 主循环
这是客户端最核心的运行逻辑:
// ClientCnxn.java: SendThread.run()
class SendThread extends ZooKeeperThread {
void run() {
while (state.isAlive()) {
// 1. 判断连接状态
if (!clientCnxnSocket.isConnected()) {
// 未连接 → 开始连接
startConnect(hostProvider.next(0)); // 轮询地址
// 发送 ConnectRequest
primeConnection();
// 等待 ConnectResponse
readConnectResult();
// 如果成功 → state = CONNECTED
}
// 2. 心跳检测
if (state.isConnected()) {
long idleTime = System.currentTimeMillis() - lastSend;
int readTimeout = (int)(sessionTimeout * 2.0 / 3.0);
// readTimeout = sessionTimeout * 2/3
if (idleTime > readTimeout) {
sendPing(); // 发心跳
lastSend = System.currentTimeMillis();
}
// 如果 idleTime > sessionTimeout
// → 可能已经断开(等不到 Pong)
// → state = DISCONNECTED
}
// 3. 发送 outgingQueue 中的请求
clientCnxnSocket.doTransport(
waitForServerDown, // 等待超时
pendingQueue, // 已发送待响应的队列
outgoingQueue, // 待发送的队列
cnxn // 连接
);
// doTransport 内部:
// a. 从 outgoingQueue 取 Packet
// b. 序列化 Packet → ByteBuffer
// c. sock.write(buffer) → 发送
// d. sock.read(lenBuffer) → 读响应长度
// e. sock.read(incomingBuffer) → 读响应体
// f. 反序列化 → 更新 pendingQueue
}
}
}
9.4 Packet 结构 —— 客户端请求的载体
// ClientCnxn.java: Packet 内部类
static class Packet {
RequestHeader requestHeader; // xid, 操作类型 (OpCode)
ReplyHeader replyHeader; // 响应头 (服务端填充)
Record request; // 请求体 (如 CreateRequest, SetDataRequest)
Record response; // 响应体 (如 CreateResponse)
ByteBuffer bb; // 序列化后的字节缓冲
String clientPath; // 操作的路径
String serverPath; // 服务端返回的路径
boolean finished; // 是否已完成
AsyncCallback cb; // 异步回调
Object ctx; // 回调上下文
WatchRegistration watchRegistration; // Watch 注册信息
}
// 请求的生命周期:
// 1. 创建 Packet, 放入 outgoingQueue
// 2. SendThread 从 outgoingQueue 取出, serialize
// 3. Socket.write(bb) 发送
// 4. Packet 移入 pendingQueue (等待响应)
// 5. Socket.read() 收到响应
// 6. 反序列化 replyHeader + response
// 7. Packet.finished = true
// 8. 从 pendingQueue 中移除
// 9. 如果是 watch 请求 → EventThread 注册 watch
// 10. 触发 callback (如果有)
outgoingQueue → pendingQueue 的流转:
┌─────────────────────────────────────────────────────┐
│ │
│ 用户线程 SendThread │
│ ──────── ────────── │
│ │
│ 调用 zk.create() │
│ │ │
│ ▼ │
│ ┌──────────┐ │
│ │ Packet p │──► outgoingQueue ──► serialize ──► send() │
│ │ (request)│ │ │
│ └──────────┘ │ │
│ │ 移入 pendingQueue │
│ ▼ │
│ ┌──────────────┐ │
│ │ pendingQueue │ ← 等响应! │
│ │ [p, p2, ...]│ │
│ └──────┬───────┘ │
│ │ │
│ │ 收到响应 │
│ ▼ │
│ p.finished = true │
│ pendingQueue.remove(p) │
│ callback.processResult(...) │
│ │
└─────────────────────────────────────────────────────┘
关键: pendingQueue 中的 Packet 按 xid 匹配响应
xid 是客户端分配的递增整数,用于请求-响应关联
9.5 心跳与超时的精确计算
// 客户端视角:
sessionTimeout_negotiated = min(clientPassedTimeout, serverMaxSessionTimeout)
// 举例:
// 客户端传 timeout=30000, 服务端 maxSessionTimeout=20000
// → sessionTimeout = 20000
// 心跳间隔:
// pingInterval = sessionTimeout / 3 (近似)
// readTimeout = sessionTimeout * 2 / 3 (服务端不活动判定)
// 具体时间线:
// sessionTimeout = 12000ms
//
// t=0: 最后一次收到服务端响应
// t=4000: 客户端发 PING (sessionTimeout/3)
// t=8000: 客户端发 PING (readTimeout = 8000ms)
// t=8000+: 客户端认为可能断连 → 准备重连
// t=12000: 服务端判定 session 过期(收到心跳超时)
//
// 这个 (2/3) 的计算很关键:
// - 给客户端留了 sessionTimeout * 1/3 的窗口用于重连
// - 不会等满 sessionTimeout 才重连(浪费 1/3 的时间)
9.6 实战场景 —— 客户端所连节点宕机,全过程剖析
这是生产环境中最常见的故障场景之一:客户端连接着 ZooKeeper 集群中的某一台节点,该节点突然宕机(进程崩溃 / 机器断电 / 网络不通)。此时 ZK 集群和客户端各自会发生什么?Session 和临时节点的命运如何?
9.6.1 场景设定
初始状态:
ZK 集群: 5节点 (myid=1 Leader, myid=2~4 Follower, myid=5 Observer)
客户端: 连接到 Follower-2 (10.0.0.2:2181)
Session: sessionId=0x2000000000000001, sessionTimeout=12000ms
临时节点: /locks/mylock (ephemeralOwner=0x2000000000000001)
Watcher: /config/app 注册了 data watch
┌─────────┐
│ Leader │ (myid=1)
│ 正常 │
└────┬────┘
│
┌────────────┼────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│Follower │ │Follower │ │Follower │
│ myid=2 │ │ myid=3 │ │ myid=4 │
│ ✕ 宕机!│ │ 正常 │ │ 正常 │
└────┬────┘ └─────────┘ └─────────┘
│
│ 原连接断开!
│
┌────┴────┐
│ Client │
│ Session │
│ 0x200...│
└─────────┘
9.6.2 完整时间线
时间线 (假设 sessionTimeout=12000ms)
══════════════════════════════════════════════════════════════════════►
T=0ms T=100ms T=4000ms T=8000ms T=12000ms
│ │ │ │ │
│ │ │ │ │
▼ ▼ ▼ ▼ ▼
┌──────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│Follower│ │客户端 │ │客户端 │ │客户端 │ │Leader │
│myid=2 │ │SendThread│ │发PING │ │判定: │ │判定: │
│宕机! │ │Socket读 │ │无响应 │ │readTimeout│ │Session │
│ │ │抛异常 │ │ │ │= 8000ms │ │过期! │
│进程挂 │ │(EOF/ │ │ │ │ │ │ │
│ │ │IOException│ │ │ │开始重连: │ │清理: │
└──────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │临时节点 │
│ │ │ │触发Watcher│
▼ │ │ └──────────┘
┌──────────────────┐ │ │
│ 客户端状态变化: │ │ ▼
│ state = │ │ ┌──────────────────────────┐
│ Disconnected │ │ │ SendThread 轮询地址列表: │
│ │ │ │ hostProvider.next() │
│ EventThread: │ │ │ │
│ watcher.process │ │ │ 原连 10.0.0.2 失败 │
│ (Disconnected) │ │ │ → 试 10.0.0.3:2181 │
└──────────────────┘ │ │ → 连接成功! │
│ │ → 发 ConnectRequest │
│ │ (带 sessionId+password)│
│ └────────────┬─────────────┘
│ │
│ ▼
│ ┌──────────────────────┐
│ │ Follower-3 收到重连: │
│ │ 1. 校验 sessionId │
│ │ 2. 校验 password │
│ │ 3. 转交 Leader 处理 │
│ │ 4. Leader 确认 │
│ │ Session 未过期 │
│ │ 5. 返回 ConnectResp │
│ │ → 重连成功! │
│ └──────────┬───────────┘
│ │
▼ ▼
┌─────────────────────────────────────┐
│ 重连成功! │
│ state = SyncConnected │
│ │
│ 临时节点 /locks/mylock 还在! │
│ (因为 Session 未过期) │
│ │
│ 但 Watcher 丢了! │
│ (原 Follower-2 上的 watchTable 消失)│
│ → EventThread 收到 None-SyncConnected│
│ → 用户需在回调中重新注册 Watcher │
└─────────────────────────────────────┘
===== 但如果 T=12000ms 还没重连成功 (sessionTimeout 耗尽) 的分支 =====
T=0ms ... T=8000ms T=12000ms T=12001ms+
│ │ │ │
▼ ▼ ▼ ▼
┌──────┐ ┌──────────┐ ┌────────────┐ ┌──────────┐
│myid=2│ │客户端仍在 │ │Leader 判断 │ │客户端终于 │
│宕机 │ │重连中... │ │Session过期! │ │连回来了 │
└──────┘ │(网络问题) │ │ │ │ │
└──────────┘ │1.关闭Session│ │发ConnectR│
│2.删/locks/ │ │equest │
│ mylock │ │ │
│3.触发Watcher│ │Leader: │
│ 给其它客户端│ │Session已 │
│ │ │过期! │
└────────────┘ │ │
│返回 │
│SessionExp │
│ired! │
│ │
│客户端收到:│
│EventThread│
│回调 │
│watcher. │
│process │
│(Expired) │
│ │
│用户必须: │
│new ZK() │
│重建所有 │
│临时节点+ │
│Watcher │
└──────────┘
9.6.3 客户端视角的状态机变化
// 客户端 Watcher 接收到的回调序列:
// === 情况1: sessionTimeout 内重连成功 ===
// 回调1: Disconnected
watcher.process(new WatchedEvent(
EventType.None, // 非节点事件,是连接事件
KeeperState.Disconnected, // 断连了
null // 无路径
));
// = 此时客户端进入 RECONNECTING 状态 =
// SendThread 正在尝试重连...
// 回调2: SyncConnected (重连成功)
watcher.process(new WatchedEvent(
EventType.None,
KeeperState.SyncConnected, // 重连成功!
null
));
// = Session 恢复, 临时节点还在, 但所有 Watch 丢失! =
// === 情况2: sessionTimeout 耗尽, Session 过期 ===
// 回调1: Disconnected
watcher.process(new WatchedEvent(
EventType.None,
KeeperState.Disconnected,
null
));
// ... 重连中 ... 超过了 sessionTimeout ...
// 回调2: Expired (Session 过期!)
watcher.process(new WatchedEvent(
EventType.None,
KeeperState.Expired, // ← 这个状态意味着一切归零
null
));
// = 此时当前 ZooKeeper 实例已经不可用 =
// = 所有临时节点已被 Leader 清理 =
// = 必须 new ZooKeeper() 重建 =
9.6.4 ZK 集群视角 —— 宕机节点的角色决定了后果
情况A:宕机的是 Follower
Leader 视角:
1. Leader 与 Follower-2 的 peer 连接断开
2. Leader 检查 Follower-2 上的 Session 归属
- Follower-2 只是转发请求,Session 由 Leader 统一管理
- Leader 的 SessionTracker 中这些 Session 仍然存活
3. 集群可用性不变 (4/5 节点存活,过半 = 3)
4. 不触发 Leader 选举 (Leader 还在)
情况B:宕机的是 Leader
集群视角:
1. Follower 发现与 Leader 的连接断开
2. syncLimit * tickTime (如 5*2000=10000ms) 超时后
3. Follower 状态变为 LOOKING,发起选举
4. 新 Leader 当选,epoch +1
5. 新 Leader 的 SessionTracker 从快照中恢复 Session
- 那些连接在旧 Leader 上的客户端 Session 仍然在!
- 只要客户端在 sessionTimeout 内连到任意节点,Session 可恢复
6. 新 Leader 发起数据同步 (DIFF/TRUNC)
7. 广播模式恢复
关键: 即使 Leader 宕机,客户端 Session 不丢失!
因为每次快照时会持久化所有活跃 Session
新 Leader 加载快照时恢复 SessionTracker
9.6.5 Session 重连的 secret —— password
客户端第一次连接成功后,服务端返回的 ConnectResponse 中包含一个 password(实际上是 sessionId 的哈希签名):
// 服务端: SessionTrackerImpl.java
// 创建 Session 时生成 password
long sessionId = initializeNextSession(serverId);
// password = Random.nextLong() (随机生成)
// 或者 3.4.10 的具体实现中,password 是 Random.nextLong()
// 客户端: ClientCnxn.java
// 保存 ConnectResponse 中的 sessionId 和 password
this.sessionId = response.getSessionId();
this.sessionPasswd = response.getPasswd();
// 重连时带上这两个值
ConnectRequest req = new ConnectRequest();
req.setSessionId(sessionId); // "我是谁"
req.setPasswd(sessionPasswd); // "这是我的凭证"
req.setTimeOut(sessionTimeout);
req.setLastZxidSeen(lastZxid); // "我最后看到的事务ID"
// 服务端验证:
// SessionTrackerImpl.java: checkSession()
synchronized public boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
// Session 不存在 → 可能是过期被清理了
return false;
}
if (s.isClosing()) {
// Session 正在关闭中
return false;
}
// 验证通过 → 刷新过期时间
s.tickTime = System.currentTimeMillis();
return true;
}
password 的作用:防止客户端伪造 sessionId 劫持他人的 Session。没有 password,即使你知道别人的 sessionId,也无法恢复他的 Session(和临时节点)。
9.6.6 临时节点的命运 —— 最关键的总结
┌──────────────────────────────────────────────────────────────┐
│ 临时节点何时删除?只有一种情况:Session 过期! │
│ │
│ 客户端调 close() ──────────────────────► 临时节点立即删除 │
│ 客户端进程 crash ──────► sessionTimeout 后 ──► 临时节点删除 │
│ 客户端网络断连 ────────► sessionTimeout 后 ──► 临时节点删除 │
│ 客户端所连节点宕机 ────► sessionTimeout 后 ──► 临时节点删除 │
│ Leader 宕机 ──────────► 新 Leader 恢复 Session ──► 不删除! │
│ │
│ 核心: 临时节点的生命周期绑定 Session, 不绑定 TCP 连接! │
│ 只要 Session 未过期, 无论连到哪台节点, 临时节点都在 │
└──────────────────────────────────────────────────────────────┘
9.6.7 开发人员应该怎么做
public class RobustZooKeeperClient {
private ZooKeeper zk;
private final CountDownLatch connected = new CountDownLatch(1);
private final Set<String> ephemeralNodes = new HashSet<>();
private final Map<String, Watcher> watchRegistry = new HashMap<>();
public void start() throws Exception {
connect(); // 首次连接
}
private void connect() throws Exception {
this.zk = new ZooKeeper(connectString, sessionTimeout, event -> {
switch (event.getState()) {
case SyncConnected:
// 首次连接成功或重连成功
connected.countDown();
if (event.getType() == EventType.None) {
// 重连成功后, 重新注册所有 Watcher!
reRegisterAllWatches();
}
break;
case Disconnected:
// 断连了, 但不慌
// 临时节点还在 (Session 还没过期)
// SendThread 正在自动重连
log.warn("ZK disconnected, reconnecting...");
break;
case Expired:
// Session 过期了! 一切都丢了!
log.error("ZK session expired! Rebuilding...");
// 1. 关闭旧的 ZooKeeper 实例
zk.close();
// 2. 重新连接 (获得新的 Session)
connect();
// 3. 重建所有临时节点
recreateAllEphemeralNodes();
// 4. 重新注册所有 Watcher
reRegisterAllWatches();
break;
}
});
// 等待首次连接完成
if (!connected.await(10, TimeUnit.SECONDS)) {
throw new RuntimeException("ZK connection timeout");
}
}
private void reRegisterAllWatches() {
for (Map.Entry<String, Watcher> entry : watchRegistry.entrySet()) {
try {
zk.exists(entry.getKey(), entry.getValue());
} catch (Exception e) {
log.error("Failed to re-register watch: {}", entry.getKey(), e);
}
}
}
private void recreateAllEphemeralNodes() {
for (String path : ephemeralNodes) {
try {
zk.create(path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
log.error("Failed to recreate ephemeral: {}", path, e);
}
}
}
}
最佳实践总结:
| 回调状态 | 含义 | 你要做什么 |
|---|---|---|
SyncConnected |
首次连接成功 / 重连成功 | 首次: 创建临时节点+注册Watch;重连: 重新注册 Watch |
Disconnected |
TCP 断连,正在重连 | 记录日志,等待自动恢复,不要 panic |
Expired |
Session 过期,一切归零 | 关闭旧实例,new ZooKeeper(),重建所有临时节点和 Watcher |
AuthFailed |
认证失败 | 检查 ACL 配置 |
十、运维与监控
10.1 关键配置项(zoo.cfg)
# === 基础配置 ===
tickTime=2000 # 基本时间单位 (ms),心跳/超时都基于此
initLimit=10 # Follower 启动同步超时 = 10 * tickTime = 20s
syncLimit=5 # Follower 同步超时 = 5 * tickTime = 10s
# === 数据路径 ===
dataDir=/data/zookeeper/data # 快照存储
dataLogDir=/data/zookeeper/logs # 事务日志存储(建议单独磁盘,SSD优先)
# === 网络 ===
clientPort=2181 # 客户端连接端口
# server.X = hostname:peerPort:leaderElectionPort
server.1=10.0.0.1:2888:3888 # 2888 = peer 通信端口, 3888 = 选举端口
server.2=10.0.0.2:2888:3888
server.3=10.0.0.3:2888:3888
server.4=10.0.0.4:2888:3888
server.5=10.0.0.5:2888:3888
# === 高级参数 ===
maxClientCnxns=60 # 单 IP 最大连接数(防恶意连接)
autopurge.snapRetainCount=3 # 保留最近 3 个快照文件
autopurge.purgeInterval=1 # 每小时清理一次旧日志/快照
snapCount=100000 # 10万次事务触发一次快照
maxSessionTimeout=40000 # 服务端允许的最大 session 超时 (ms)
10.2 四字命令
# 最常用的几个监控命令
echo stat | nc 127.0.0.1 2181 # 服务状态 (连接数/延迟/znode数)
# 输出示例:
# Zookeeper version: 3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f
# Latency min/avg/max: 0/1/25
# Received: 847321
# Sent: 847320
# Connections: 15
# Outstanding: 0
# Zxid: 0x20000034e
# Mode: leader
# Node count: 1234
echo ruok | nc 127.0.0.1 2181 # 是否正常运行 → "imok"
echo mntr | nc 127.0.0.1 2181 # ⭐ 最全的监控指标 (3.4.0+)
# 输出示例:
# zk_version 3.4.10
# zk_avg_latency 1
# zk_max_latency 25
# zk_min_latency 0
# zk_packets_received 847322
# zk_packets_sent 847321
# zk_num_alive_connections 15
# zk_outstanding_requests 0
# zk_server_state leader
# zk_znode_count 1234
# zk_watch_count 89
# zk_ephemerals_count 42
# zk_approximate_data_size 1048576
# zk_open_file_descriptor_count 156
# zk_max_file_descriptor_count 65536
echo cons | nc 127.0.0.1 2181 # 所有客户端连接详情
echo dump | nc 127.0.0.1 2181 # 临时节点列表
echo srvr | nc 127.0.0.1 2181 # 服务详情 (同 stat, 格式略不同)
# 注意: 3.4.10 需要配置白名单才能使用四字命令
# zoo.cfg 中添加:
# 4lw.commands.whitelist=stat,ruok,mntr,cons,dump,srvr
10.3 JMX 监控集成
// ZK 3.4.10 默认开启 JMX
// 启动参数中指定 JMX 端口:
// -Dcom.sun.management.jmxremote.port=9995
// -Dcom.sun.management.jmxremote.ssl=false
// -Dcom.sun.management.jmxremote.authenticate=false
// 关键 MBean:
// org.apache.ZooKeeperService:name0=StandaloneServer_port2181,name1=InMemoryDataTree
// → NodeCount, WatchCount, EphemeralsCount
//
// org.apache.ZooKeeperService:name0=StandaloneServer_port2181,name1=Connection
// → AvgLatency, MaxLatency, OutStandingRequests[待处理], PacketsReceived, PacketsSent
//
// org.apache.ZooKeeperService:name0=StandaloneServer_port2181,name1=LeaderElection
// → 选举相关指标
10.4 告警阈值建议
指标 告警阈值 说明
───────────────────────────────────────────────────────────
zk_avg_latency > 50ms 写入延迟升高
zk_outstanding_requests > 100 排队请求过多 → Leader 可能过载
zk_num_alive_connections > 5000 连接数过高
zk_watch_count > 100000 Watcher 数量过大
zk_znode_count > 100000 ZNode 过多
zk_approximate_data_size > 500MB DataTree 内存占用过大
zk_open_file_descriptor_count > 80% 上限 可能接近 FD 限制
Mode != leader/follower 节点异常
十一、企业级最佳实践
11.1 连接串设计
方案 A: 所有地址都写(推荐)
connectString = "10.0.0.1:2181,10.0.0.2:2181,...,10.0.0.5:2181"
优点: 客户端启动时打散地址,天然负载均衡
缺点: 扩容时需要更新所有客户端配置
方案 B: 域名 + 负载均衡
connectString = "zk-cluster.internal:2181"
优点: 扩容不改变配置
缺点: 引入额外依赖(DNS/LB),增加故障点
推荐: 生产环境用方案A (ZK 客户端自带故障转移)
搭配配置中心动态刷新地址列表(如 Apollo/Nacos)
11.2 sessionTimeout 选择
sessionTimeout 设置多少合适?
太短 (如 3s):
- 网络抖动 → Session 过期 → 临节点删除 → 分布式锁释放
- GC 暂停 (Full GC 可能 2-3s) → Session 过期
- Watcher 丢失频繁 → 客户端不断重建连接
太长 (如 60s):
- 服务真的挂了 → 临时节点 60s 后才删除
- 分布式锁持有者挂了 → 60s 锁才释放 → 业务阻塞
- 故障检测慢
推荐: 10s - 20s (生产环境经验值)
- 需要结合应用 GC 暂停时间调优
- 如果 GC 暂停 > 5s, 先优化 GC, 再加长 sessionTimeout
11.3 客户端版本选择
ZooKeeper 原生客户端:
- 优点: 无额外依赖、轻量
- 缺点: Watcher 一次性触发需手动维护、没有重试/重连策略封装
Apache Curator (Netflix 开源):
- 优点: 自动重连、Watcher 自动重新注册、分布式锁/选举开箱即用
- 缺点: 额外依赖、学习曲线
企业内部推荐:
- 简单场景 (服务注册/配置读取) → 原生客户端足够
- 复杂场景 (分布式锁/选举/屏障) → Curator
- 统一封装: 在 Curator 基础上再封装一层公司内部 SDK
11.4 数据治理
ZK 不该存什么:
✗ 大文件 ( >> 1MB)
✗ 高频变化的数据 (如实时指标)
✗ 日志/流水数据
✗ 缓存数据 (ZK 是协调服务,不是缓存)
ZK 适合存什么:
✓ 配置项 ( < 10KB)
✓ 服务注册信息 (IP:Port, 元数据)
✓ 分布式锁节点 (临时顺序节点)
✓ Leader 选举标记
✓ 集群拓扑信息
数据组织规范:
/{namespace}/{env}/{service}/{config-key}
示例:
/prod/order-service/db/master.url
/prod/order-service/mq/rocketmq.namesrv
/prod/payment-service/db/master.url
好处:
- 按环境隔离 (prod/staging/dev)
- 按服务隔离 (ACL 粒度)
- 一目了然
11.5 滚动升级注意事项
ZooKeeper 3.4.10 滚动升级步骤:
1. 升级前检查:
- 确认集群状态正常 (echo stat | nc ... | grep Mode)
- 确认有 Leader 且过半节点在线
- 备份 dataDir 和 dataLogDir
2. 逐个升级 Follower:
a. 停一台 Follower
b. 替换 jar 包
c. 启动 → 自动同步数据 → 重新加入集群
d. 等待 'mntr' 显示 zk_server_state = follower
e. 检查 zk_outstanding_requests = 0
f. 下一台
3. 最后升级 Leader:
a. 停 Leader → 自动触发选举
b. 等待新 Leader 产生
c. 升级原 Leader → 作为 Follower 启动
d. 升级原新 Leader
4. 验证:
- 所有节点 Mode 正常
- mntr 各项指标正常
- 客户端读写正常
全文总结
三篇联动回顾:
上篇 —— 架构与协议 (是什么、怎么组织的)
1. ZK 定位 CP 系统, 5 节点过半机制
2. DataTree + DataNode 内存模型
3. ZAB 协议: Proposal → ACK(过半) → Commit
流水线化优化、zxid 设计、ACK 前写日志
中篇 —— 开发者接口 (怎么用、为什么这样设计)
4. ZNode 四种类型 + Stat 12 个字段
5. Session 分桶管理 + 临时节点级联删除
6. Watcher 一次性触发 + 双线程回调
下篇 —— 核心技术 + 运维 (怎么选、怎么管)
7. FastLeaderElection: epoch > zxid > myid
8. 同步策略: SNAP / DIFF / TRUNC
9. Java 客户端: SendThread + EventThread + NIO
10. 四字命令 + JMX 监控 + 告警阈值
11. sessionTimeout 选择 + 数据治理 + 滚动升级
关键源码文件索引(ZooKeeper 3.4.10):
| 文件 | 内容 |
|---|---|
QuorumPeer.java |
节点主循环,角色状态机 |
FastLeaderElection.java |
FLE 选举算法核心 |
Leader.java |
Leader 角色:Proposal/Commit/同步 |
Follower.java |
Follower 角色:转发/ACK |
DataTree.java |
内存数据模型 |
SessionTrackerImpl.java |
Session 分桶管理 |
WatchManager.java |
Watcher 注册/触发 |
FileTxnLog.java |
事务日志 WAL |
FileTxnSnapLog.java |
快照生成 |
ClientCnxn.java |
Java 客户端双线程模型 |
NIOServerCnxnFactory.java |
服务端 NIO 网络层 |
如果本文对你有帮助,欢迎在评论区讨论。对 ZooKeeper 3.5+ 的新特性(如动态重配置、TTL 节点)感兴趣的也可以留言,后续可以补充一篇版本演进对比。