ZooKeeper 3.4.10 集群深度剖析(中)—— 数据模型、Session 与 Watcher
上篇我们建立了 ZK 集群的全局视角,深入理解了 ZAB 协议这个”灵魂”。中篇聚焦 ZK 对开发者最直接的三块:数据模型(你存的是什么)、Session 机制(客户端与服务端的纽带)和 Watcher 机制(变更通知的核心)。
四、数据模型与 API
4.1 ZNode 全景
ZK 的数据模型是一个类似文件系统的层级命名空间。每个节点叫 ZNode,既是数据容器也是路径节点:
/ (根节点,ZNode)
├── config ← ZNode: 既是目录,也可存数据
│ ├── app ← ZNode
│ │ ├── db.url = "jdbc:..." ← ZNode: 存配置值
│ │ └── db.pool = "20" ← ZNode
│ └── mq
│ └── broker = "10.0.0.1:9876"
├── services ← ZNode: 服务注册根
│ ├── order-service ← ZNode: 持久节点
│ │ └── node-001 ← ZNode: 临时节点
│ └── user-service
└── locks
└── lock-0000000001 ← ZNode: 临时顺序节点
ZNode 四个维度分类:
持久 (PERSISTENT) 临时 (EPHEMERAL)
┌─────────────────────┐ ┌─────────────────────┐
非顺序 │ PERSISTENT │ │ EPHEMERAL │
(Non-Sequential)│ - 显式删除才消失 │ │ - Session 结束自动删│
│ - 用于配置/元数据 │ │ - 用于服务注册/锁 │
├─────────────────────┤ ├─────────────────────┤
顺序 │ PERSISTENT_SEQUENTIAL│ │ EPHEMERAL_SEQUENTIAL│
(Sequential) │ - 名称后追加10位单调 │ │ - 临时 + 顺序 │
│ 递增序号 │ │ - 分布式锁/选举首选 │
│ - 用于分布式队列 │ │ │
└─────────────────────┘ └─────────────────────┘
4.2 Stat 结构 —— 每个 ZNode 的元数据
每个 ZNode 都附带一份 Stat(状态信息),理解 Stat 对排查问题非常重要:
// ZooKeeper 3.4.10 源码: Stat.java (生成的 jute 代码)
public class Stat implements Record {
private long czxid; // 创建该节点的 zxid (Create ZXID)
private long mzxid; // 最后一次修改的 zxid (Modified ZXID)
private long ctime; // 创建时间 (epoch milliseconds)
private long mtime; // 最后修改时间
private int version; // 数据版本号 (每次 setData 自增)
private int cversion; // 子节点版本号 (子节点增删自增)
private int aversion; // ACL 版本号 (ACL 变化自增)
private long ephemeralOwner; // 临时节点的 Session ID (持久节点为0)
private int dataLength; // 数据长度 (bytes)
private int numChildren; // 子节点数量
private long pzxid; // 子节点列表最后变更的 zxid (Post ZXID)
}
Stat 字段的实战含义:
场景:你怀疑一个配置节点的数据被谁改了?
$ echo "stat" | nc 127.0.0.1 2181 | grep -A1 /config/app
/config/app
cZxid = 0x100000001 ← 谁创建的 (zxid)
ctime = Mon Jun 25 10:00:00 ← 什么时候创建的
mZxid = 0x20000000a ← 最后一次修改的 zxid (← 用这个定位!)
mtime = Mon Jun 25 15:30:00 ← 什么时候改的
pZxid = 0x100000001 ← 子节点最后一次变化 (≠ mZxid)
cversion = 3 ← 子节点变更了3次
dataVersion = 1 ← 数据被改过1次 (创建=0)
aclVersion = 0 ← ACL 没变过
ephemeralOwner = 0 ← 0 = 持久节点
dataLength = 128
numChildren = 2
pzxid vs mzxid 的区别(易混淆点):
mzxid: 该节点自己的 data 变化的 zxid (setData 操作)
pzxid: 该节点的直接子节点列表变化的 zxid (create/delete 子节点)
示例:
/create /config/app/db "" ← /config/app 的 pzxid 变化,mzxid 不变
create /config/app/db "mysql" ← setData 时调用 create 等价
/set /config/app "new data" ← /config/app 的 mzxid 变化
4.3 顺序节点的实现原理
EPHEMERAL_SEQUENTIAL 和 PERSISTENT_SEQUENTIAL 在创建时,ZK 会自动在节点名后追加一个 10 位单调递增序号:
// DataTree.java: createNode() 方法中的顺序节点处理
public String createNode(String path, byte data[], List<ACL> acl,
long ephemeralOwner, int parentCVersion, long zxid, long time) {
// 如果是 SEQUENTIAL 节点
if (isSequential(path)) {
// 1. 取父节点的 cversion 作为序号基准
int suffix = parentCVersion;
// 2. 生成路径: /path/prefix + 0000000000
path = makeSequentialNodeName(path, suffix);
// makeSequentialNodeName 的实现:
// path + String.format("%010d", suffix)
}
// 3. 创建 DataNode 并写入
DataNode child = new DataNode(data, acl, stat);
parent.addChild(childName);
dataTree.nodes.put(path, child);
}
注意:序号是基于父节点的 cversion,不是全局自增计数器。这避免了全局竞争,提高了并发度。
4.4 ACL 权限模型
// 权限位定义 (ZooDefs.Perms)
int READ = 1 << 0; // 可以读取节点数据
int WRITE = 1 << 1; // 可以写入节点数据
int CREATE = 1 << 2; // 可以创建子节点
int DELETE = 1 << 3; // 可以删除子节点
int ADMIN = 1 << 4; // 可以设置 ACL
int ALL = READ | WRITE | CREATE | DELETE | ADMIN;
三种 ACL Scheme:
world → world:anyone (任何人)
auth → auth:user:password (认证的用户)
digest → digest:user:base64(sha1(user:pass)) (用户名密码摘要)
ip → ip:192.168.1.0/24 (IP 白名单)
企业内部推荐实践:生产环境 ZK 一定要开 ACL,至少区分:
- 应用 A 的配置路径
/config/app-A/*只给app-Adigest 权限 - 运维管理路径给管理员,
ip白名单 world:anyone彻底踢掉
4.5 ZK 为什么不适合做大容量存储
这是一个面试高频题。根本原因:
1. 数据全量在内存
- DataTree 用 ConcurrentHashMap 存所有 ZNode
- 每个 ZNode 限制 1MB(jute buffer 硬编码)
- 100万个 1KB 的 ZNode ≈ 1GB 内存 + JVM 开销 ≈ 2-3GB
2. 写操作是全局串行的
- ZAB 保证所有写操作全局有序
- Leader 单点处理写请求
- 写 QPS 通常 1-3万/s(取决于数据大小和网络延迟)
3. 快照和事务日志随数据量线性膨胀
- 快照是全量 DataTree 序列化
- 启动时需要加载快照 + 重放事务日志
- 数据量大 = 启动慢、恢复慢
结论:ZK 是协调服务,配置、元数据、锁、选举信息——这类热数据、小数据适合存 ZK。业务数据请走数据库/消息队列。
五、Session 机制 —— 客户端连接的生命线
Session 是整个 ZK 客户端交互的核心抽象。客户端从连接到断开,所有操作都在一个 Session 的上下文中进行。理解 Session 机制是排查 ZK 连接问题的前提。
5.1 Session 是什么
┌──────────────────────────────────────────────────────────────┐
│ Session 全生命周期 │
│ │
│ 客户端 服务端 (Leader) │
│ ────── ──────────────── │
│ │
│ 1. new ZooKeeper(connectString, timeout, watcher) │
│ │ │
│ │ 发送 ConnectRequest │
│ ├────────────────────────────────────────►│ │
│ │ │ │
│ │ 2. Leader 创建 Session │
│ │ ┌───────────────────┐ │
│ │ │ sessionId = │ │
│ │ │ serverId(8bit) │ │
│ │ │ + 时间戳(40bit) │ │
│ │ │ + 计数器(16bit) │ │
│ │ │ │ │
│ │ │ timeout = │ │
│ │ │ min(clientTimeout, │ │
│ │ │ maxSessionTimeout) │
│ │ │ │ │
│ │ │ 加入 SessionTracker│ │
│ │ └───────────────────┘ │
│ │ │ │
│ │ 返回 ConnectResponse (sessionId, timeout) │
│ │◄────────────────────────────────────────┤ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────┐ │ │
│ │ 状态: CONNECTED │ │ │
│ │ 定时发 PING (心跳) │ │ │
│ └──────────────────────┘ │ │
│ │ │ │
│ │ ... 正常工作 ... │ │
│ │ │ │
│ │ 网络断开 │ │
│ ┌──┴───────────────────┐ │ │
│ │ 状态: DISCONNECTED │ │ │
│ │ 自动重连 │ │ │
│ │ ConnectString 下一个 │ │ │
│ │ 地址尝试... │ │ │
│ └──┬───────────────────┘ │ │
│ │ │ │
│ │ 重连成功 (sessionTimeout 时间内) │ │
│ ├────────────────────────────────────────►│ │
│ │ 验证 sessionId + password │ │
│ │◄────────────────────────────────────────┤ │
│ ▼ │ │
│ ┌──────────────────────┐ │ │
│ │ 状态: CONNECTED │ │ │
│ │ Session 恢复! │ │ │
│ └──────────────────────┘ │ │
│ │ │
│ ... 如果超过 sessionTimeout 没连回来 ... │ │
│ ▼ │
│ ┌────────────────────────────┐ │
│ │ Leader: Session 过期! │ │
│ │ 1. 关闭 Session │ │
│ │ 2. 删除该 Session 所有 │ │
│ │ 临时节点 │ │
│ │ 3. 触发 Watcher │ │
│ └────────────────────────────┘ │
│ │
│ 客户端终于连回来了... │
│ │ │
│ │ 服务端返回 SessionExpiredException! │
│ │◄──────────────────────────── │
│ ▼ │
│ ┌──────────────────────┐ │
│ │ 状态: EXPIRED │ │
│ │ 需要 new ZooKeeper() │ │
│ │ 重新建立 Session │ │
│ └──────────────────────┘ │
└──────────────────────────────────────────────────────────────┘
5.2 sessionId 生成规则
// SessionTrackerImpl.java: initializeNextSession()
public static long initializeNextSession(long id) {
long nextSid;
// serverId: 取 server id 的低 8 位
// 左移 56 位,放在最高 8 位
nextSid = (System.currentTimeMillis() << 24) >>> 8;
// 中间 40 位: 当前时间戳
nextSid = nextSid | (id << 56); // id 就是 myid
return nextSid;
}
sessionId 结构 (64-bit):
┌──────────┬────────────────────────────┬──────────────────┐
│ 8 bit │ 40 bit │ 16 bit │
│ serverId │ 毫秒时间戳 │ 自增计数器 │
└──────────┴────────────────────────────┴──────────────────┘
为什么这样设计:
- serverId 在高位:同一个 server 重启后,新 sessionId > 旧 sessionId
- 时间戳在中间:保证跨 server 的唯一性
- 计数器在低位:同一毫秒内可以创建 65535 个 session
5.3 SessionTracker —— 分桶管理的艺术
SessionTrackerImpl 是管理所有 Session 的核心组件,采用分桶(Bucketing)策略来高效管理过期:
// SessionTrackerImpl.java
public class SessionTrackerImpl implements SessionTracker {
// 所有 session 的映射
HashMap<Long, SessionImpl> sessionsById = new HashMap<>();
// 分桶: key=过期时间点, value=该时间点过期的 session 集合
HashMap<Long, SessionSet> sessionSets = new HashMap<>();
// 以下三个参数控制分桶粒度
long nextExpirationTime; // 下次检查过期的时间点
int expirationInterval; // 检查间隔 = tickTime (通常 2000-3000ms)
long sessionTimeout; // session 超时时间
}
分桶机制图解:
假设 tickTime = 2000ms, sessionTimeout = 10000ms
时间线 (每个刻度 = 1 tick = 2000ms):
Tick: 0 1 2 3 4 5 6 7 8 ...
Session 分配规则:
- 当前 tick = 2 时创建的 session
→ 过期时间 = tick(2) × 2000 + 10000 = 14000ms = tick 7
→ 放入 Bucket[7]
Bucket 结构:
┌──────────┬──────────┬──────────┬──────────┬──────────┐
│ Bucket[3]│ Bucket[4]│ Bucket[5]│ Bucket[6]│ Bucket[7]│ ...
├──────────┼──────────┼──────────┼──────────┼──────────┤
│ session1 │ session4 │ session6 │ │ session2 │
│ session3 │ session5 │ │ │ session8 │
└──────────┴──────────┴──────────┴──────────┴──────────┘
▲
当前 tick=7 时,
这个桶里所有 session 过期!
关键代码:
while (true) {
// 当前 tick
long now = (System.currentTimeMillis() / expirationInterval);
// 检查是否有桶到期
if (now >= nextExpirationTime) {
SessionSet set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
// 检查心跳时间, 超出 timeout 则过期
if (s.tickTime + timeout < System.currentTimeMillis()) {
expire(s);
}
}
}
nextExpirationTime += expirationInterval;
}
Thread.sleep(expirationInterval / 2); // 半 tick 检查一次
}
分桶的核心优势:
- 不需要遍历所有 Session 来判断过期(O(n) → O(桶数))
- 只需检查当前到期的桶,时间复杂度接近 O(1)
- 桶的粒度由
expirationInterval(即 tickTime)决定
5.4 Session 激活与心跳
Session 的活性通过心跳(Ping)维持:
客户端视角:
┌─────────────────────────────────────────────┐
│ SendThread: 每 (sessionTimeout * 2/3) 发 Ping │
│ │
│ if (now - lastSend > readTimeout) { │
│ sendPing(); // 往服务端发 PING 请求 │
│ } │
│ │
│ readTimeout = sessionTimeout * 2/3 │
│ 例如: sessionTimeout = 10000ms │
│ readTimeout = 6666ms │
└─────────────────────────────────────────────┘
服务端视角:
┌──────────────────────────────────────────────────┐
│ 收到任何请求 (PING/GET/SET/...) 都视为心跳 │
│ → 更新 session.tickTime = now │
│ → 移动 session 到新的过期桶 │
│ │
│ Session 过期条件: │
│ tickTime + sessionTimeout < System.currentTimeMillis() │
└──────────────────────────────────────────────────┘
5.5 临时节点的级联删除
当 Session 过期后,Leader 负责清理该 Session 创建的所有临时节点。这个过程是一个事务:
// SessionTrackerImpl.java: expire()
private void expire(SessionImpl s) {
// 1. 收集该 session 的所有临时节点
Set<String> ephemerals = zk.getZKDatabase()
.getEphemerals(s.sessionId);
// 2. 构造一个"CloseSession"请求,提交给 ZAB
// 这是一个事务,保证原子性
Request request = new Request(null, s.sessionId, 0,
OpCode.closeSession, ...);
request.setTxn(new CloseSessionTxn(ephemeralPaths));
// 3. 通过正常的 ZAB Proposal → Commit 流程执行
// 保证集群内所有节点一致删除
zk.submitRequest(request);
// 4. 触发 Watcher
for (String path : ephemerals) {
// dataWatches 和 childWatches 都会触发
zk.getZKDatabase().getDataTree()
.triggerWatch(path, EventType.NodeDeleted);
}
}
这意味着:临时节点的删除操作与正常写操作共享一个 ZAB 队列,是严格有序的。如果你依赖某个临时节点做分布式锁,当持有者 Session 过期时,锁的释放是全局一致的——不会出现 A 节点认为已释放、B 节点认为还持有的一致性问题。
六、Watcher 机制 —— 变更通知的核心
6.1 设计哲学:一次性触发
ZK 的 Watcher 有一个核心设计:一次性触发(One-Time Trigger)。注册的 Watch 被触发一次后自动删除。
为什么是一次性的?
1. 简化服务端实现
- Watcher 只在服务端内存中,不持久化
- 服务端重启,所有 Watcher 丢失
- 如果 Watcher 是永久的,服务端重启后客户端期望的"持续 Watch"会静默丢失
- 一次性机制强制客户端在每次触发后重新注册 → 自然解决了服务端重启的问题
2. 语义清晰
- Watcher 告诉你"在某个时间点之后,数据发生了变化"
- 不保证你看到了每一次变化
- 客户端通过"触发→重新注册→获取最新数据"的模式来保证最终一致性
3. 避免惊群效应
- 永久 Watcher 会导致大量客户端同时收到通知
- 一次性 + 客户端主动重新注册,自然分散了时间窗口
6.2 服务端 WatcherManager
// WatchManager.java
public class WatchManager {
// watchTable: path → 对该 path 注册了 Watch 的 Watcher 集合
// key: watch path (String)
// value: 关注这个 path 的 Watcher 集合
private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<>();
// watch2Paths: Watcher → 该 Watcher 关注的 path 集合
// key: Watcher (通过 ServerCnxn 标识)
// value: 该 Watcher 注册的所有 path
private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<>();
// 注册 Watcher
public boolean addWatch(String path, Watcher watcher) {
// 双向关联
watchTable.computeIfAbsent(path, k -> new HashSet<>()).add(watcher);
watch2Paths.computeIfAbsent(watcher, k -> new HashSet<>()).add(path);
return true;
}
// 触发 Watcher
public Set<Watcher> triggerWatch(String path, EventType type) {
// 1. 从 watchTable 中取出该 path 的所有 Watcher
Set<Watcher> watchers = watchTable.remove(path);
// 2. 从 watch2Paths 中移除对应的关联
for (Watcher w : watchers) {
Set<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
// 3. 返回 Watcher 集合,由上层发送通知给各个客户端
return watchers;
}
}
双 Map 结构的作用:
watchTable: data 变化时,快速找到该 path 有哪些 Watcher 需要通知
path → [Watcher1, Watcher2, ...]
↑ 这个方向的查询是 O(1)
watch2Paths: 客户端断开连接时,清理该客户端注册的所有 Watcher
Watcher → [path1, path2, ...]
↑ 这个方向的查询也是 O(1)
6.3 Watcher 触发时机
操作 触发的事件类型 触发路径
────────────────────────────────────────────────────
create /x NodeCreated /x (parent watch)
NodeChildrenChanged / (child watch on parent)
delete /x NodeDeleted /x (data watch)
NodeChildrenChanged / (child watch on parent)
setData /x NodeDataChanged /x (data watch)
setData /x/a NodeDataChanged /x/a (data watch)
不会触发 /x 的任何 watch
create /x/a NodeChildrenChanged /x (child watch)
不会触发 /x 的 data watch
总结:
- data watch: 被 setData 触发,被 delete 触发
- child watch: 被 create/delete 子节点触发
- exist watch: 被 create(→Created) / delete(→Deleted) / setData(→DataChanged) 触发
6.4 客户端 Watcher 处理流程
┌─────────────────────────────────────────────────────────────┐
│ 客户端 Watcher 处理 │
│ │
│ SendThread (网络线程) EventThread (事件回调线程) │
│ ────────────────────── ────────────────────────── │
│ │
│ 从 Socket 读到 WatchEvent │
│ │ │
│ ▼ │
│ ┌────────────────┐ │
│ │ 反序列化事件 │ │
│ │ 构造 WatchedEvent│ │
│ └───────┬────────┘ │
│ │ │
│ │ put(event) │
│ ├─────────────────────────────────────►│ │
│ │ │ │
│ │ ┌─────────────┴─────────┐ │
│ │ │ waitingEvents 队列 │ │
│ │ │ (LinkedBlockingQueue) │ │
│ │ └─────────────┬─────────┘ │
│ │ │ │
│ │ │ take() │
│ │ ▼ │
│ │ ┌───────────────────────┐ │
│ │ │ processEvent(event) │ │
│ │ │ → watcher.process(e) │ │
│ │ │ → 调用用户回调! │ │
│ │ └───────────────────────┘ │
│ │ │ │
│ │ │ │
│ SendThread EventThread │
│ 继续读网络数据 单线程顺序处理 │
│ 不阻塞! 保证回调顺序! │
└─────────────────────────────────────────────────────────────┘
客户端关键源码:
// ClientCnxn.java (ZooKeeper 3.4.10)
public class ClientCnxn {
// 发送线程: 负责网络 I/O
class SendThread extends ZooKeeperThread {
void readResponse(ByteBuffer bb) {
// 读到 WatchEvent
WatchedEvent event = ... // 反序列化
// 放入 EventThread 的队列,不阻塞 SendThread
eventThread.queueEvent(event);
}
}
// 事件线程: 负责回调
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<>();
public void queueEvent(WatchedEvent event) {
// 根据事件类型和路径,找到之前注册的 Watcher
switch (event.getType()) {
case None:
// 连接状态变化 (Disconnected/SyncConnected/Expired)
// 交给 defaultWatcher
break;
case NodeDataChanged:
case NodeDeleted:
// 从 dataWatches 中找到该 path 的 Watcher
Watcher w = dataWatches.remove(event.getPath());
if (w != null) {
waitingEvents.put(new WatcherSetEventPair(w, event));
}
break;
case NodeChildrenChanged:
// 从 childWatches 中找到该 path 的 Watcher
...
break;
}
}
public void run() {
while (true) {
Object event = waitingEvents.take();
if (event instanceof WatcherSetEventPair) {
// 调用用户的 Watcher.process()
((WatcherSetEventPair) event).watcher
.process(((WatcherSetEventPair) event).event);
}
}
}
}
}
双线程模型的设计意图:
为什么不让 SendThread 直接回调用户的 Watcher?
1. 用户 Watcher 可能阻塞
- 用户在 Watcher 里可能做网络调用、数据库查询等耗时操作
- 如果在 SendThread 里执行,会阻塞所有网络 I/O
- 导致心跳无法发送 → 服务端认为客户端挂了一> Session 过期
2. EventThread 单线程保证回调顺序
- 事件按到达顺序处理
- Watcher A 一定在 Watcher B 之前执行(如果 A 先到达)
- 虽然不能保证"绝对"的因果顺序,但至少保证了同一个连接的有序性
3. 隔离性
- 即使 EventThread 里的 Watcher 抛异常,只影响后续回调
- SendThread 不受影响,心跳和重连逻辑继续正常运转
6.5 实战场景:利用 Watcher 实现配置热更新
public class ConfigWatcher {
private ZooKeeper zk;
private String configValue;
private final String configPath = "/config/app/db.url";
public void start() throws Exception {
zk = new ZooKeeper("10.0.0.1:2181,10.0.0.2:2181", 10000, event -> {
if (event.getType() == Watcher.Event.EventType.None) {
// 连接状态事件,不是数据变化
return;
}
// 数据变化了!触发回调
if (event.getPath().equals(configPath)) {
try {
// 关键:重新注册 Watch (一次性机制!)
byte[] newData = zk.getData(configPath, this::watchCallback, null);
configValue = new String(newData);
System.out.println("配置更新: " + configValue);
// 这里可以触发 Spring @RefreshScope 等
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 初始加载 + 注册 Watch
byte[] data = zk.getData(configPath, event -> {
// 这里不能直接引用 this,用 lambda 包装
}, null);
// ... 但上面初始加载已经默认 Watcher 作为整体 Watcher 处理
// 更好的写法:
watchAndLoad();
}
private void watchAndLoad() throws Exception {
// exists + getData 的原子组合思想
// 先 register watch,再 getData
Stat stat = zk.exists(configPath, event -> {
if (event.getType() == Event.EventType.NodeDeleted) {
System.out.println("配置节点被删除了!");
} else {
watchAndLoad(); // 递归重新注册
}
});
if (stat != null) {
byte[] data = zk.getData(configPath, false, null); // 不加 watch
configValue = new String(data);
}
}
}
关键点:
- Watch 触发后必须重新注册,否则之后的变化不会收到通知
exists和getData之间有时间窗口,节点可能被删了——检查返回值- 回调方法不要做耗时操作,考虑线程池异步处理
中篇小结
中篇核心要点回顾:
1. 数据模型:
- ZNode 四种类型:PERSISTENT/EPHEMERAL/PERSISTENT_SEQUENTIAL/EPHEMERAL_SEQUENTIAL
- Stat 结构 12 个字段,每个都有确切的业务含义
- ZK 是大内存 + 单点写 + 全量快照 = 不适合做大容量存储
2. Session 机制:
- sessionId = serverId(8bit) + timestamp(40bit) + counter(16bit)
- 分桶管理过期:HashMap<过期时间, Session集合>
- Session 过期 → 临时节点级联删除(作为 ZAB 事务执行)
- 重连窗口 = sessionTimeout,超时则 Expired
3. Watcher 机制:
- 一次性触发:触发后自动删除,客户端必须重新注册
- 服务端双 Map 结构:watchTable + watch2Paths
- 客户端双线程模型:SendThread(网络) + EventThread(回调)
- 最佳实践:exists + getData 组合,回调中重新注册
下篇将展开 Leader 选举的完整算法、数据同步的三种策略、Java 客户端底层 NIO 通信,以及企业级运维最佳实践。