MingJunDuan的博客
全站访问量

ZooKeeper 3.4.10 集群深度剖析(中)—— 数据模型、Session 与 Watcher

上篇我们建立了 ZK 集群的全局视角,深入理解了 ZAB 协议这个”灵魂”。中篇聚焦 ZK 对开发者最直接的三块:数据模型(你存的是什么)、Session 机制(客户端与服务端的纽带)和 Watcher 机制(变更通知的核心)。


四、数据模型与 API

4.1 ZNode 全景

ZK 的数据模型是一个类似文件系统的层级命名空间。每个节点叫 ZNode,既是数据容器也是路径节点:

/ (根节点,ZNode)
├── config                        ← ZNode: 既是目录,也可存数据
│   ├── app                       ← ZNode
│   │   ├── db.url = "jdbc:..."   ← ZNode: 存配置值
│   │   └── db.pool = "20"       ← ZNode
│   └── mq
│       └── broker = "10.0.0.1:9876"
├── services                      ← ZNode: 服务注册根
│   ├── order-service             ← ZNode: 持久节点
│   │   └── node-001              ← ZNode: 临时节点
│   └── user-service
└── locks
    └── lock-0000000001           ← ZNode: 临时顺序节点

ZNode 四个维度分类

                   持久 (PERSISTENT)           临时 (EPHEMERAL)
               ┌─────────────────────┐  ┌─────────────────────┐
非顺序          │ PERSISTENT          │  │ EPHEMERAL           │
(Non-Sequential)│ - 显式删除才消失      │  │ - Session 结束自动删│
               │ - 用于配置/元数据     │  │ - 用于服务注册/锁   │
               ├─────────────────────┤  ├─────────────────────┤
顺序            │ PERSISTENT_SEQUENTIAL│  │ EPHEMERAL_SEQUENTIAL│
(Sequential)    │ - 名称后追加10位单调 │  │ - 临时 + 顺序       │
               │   递增序号            │  │ - 分布式锁/选举首选  │
               │ - 用于分布式队列       │  │                     │
               └─────────────────────┘  └─────────────────────┘

4.2 Stat 结构 —— 每个 ZNode 的元数据

每个 ZNode 都附带一份 Stat(状态信息),理解 Stat 对排查问题非常重要:

// ZooKeeper 3.4.10 源码: Stat.java (生成的 jute 代码)
public class Stat implements Record {
    private long czxid;      // 创建该节点的 zxid (Create ZXID)
    private long mzxid;      // 最后一次修改的 zxid (Modified ZXID)
    private long ctime;      // 创建时间 (epoch milliseconds)
    private long mtime;      // 最后修改时间
    private int version;     // 数据版本号 (每次 setData 自增)
    private int cversion;    // 子节点版本号 (子节点增删自增)
    private int aversion;    // ACL 版本号 (ACL 变化自增)
    private long ephemeralOwner; // 临时节点的 Session ID (持久节点为0)
    private int dataLength;  // 数据长度 (bytes)
    private int numChildren; // 子节点数量
    private long pzxid;      // 子节点列表最后变更的 zxid (Post ZXID)
}

Stat 字段的实战含义

场景:你怀疑一个配置节点的数据被谁改了?

$ echo "stat" | nc 127.0.0.1 2181 | grep -A1 /config/app
/config/app
cZxid = 0x100000001          ← 谁创建的 (zxid)
ctime = Mon Jun 25 10:00:00  ← 什么时候创建的
mZxid = 0x20000000a          ← 最后一次修改的 zxid (← 用这个定位!)
mtime = Mon Jun 25 15:30:00  ← 什么时候改的
pZxid = 0x100000001          ← 子节点最后一次变化 (≠ mZxid)
cversion = 3                 ← 子节点变更了3次
dataVersion = 1              ← 数据被改过1次 (创建=0)
aclVersion = 0               ← ACL 没变过
ephemeralOwner = 0           ← 0 = 持久节点
dataLength = 128
numChildren = 2

pzxid vs mzxid 的区别(易混淆点)

mzxid: 该节点自己的 data 变化的 zxid (setData 操作)
pzxid: 该节点的直接子节点列表变化的 zxid (create/delete 子节点)

示例:
/create /config/app/db ""      ← /config/app 的 pzxid 变化,mzxid 不变
create /config/app/db "mysql"  ← setData 时调用 create 等价
/set /config/app "new data"    ← /config/app 的 mzxid 变化

4.3 顺序节点的实现原理

EPHEMERAL_SEQUENTIAL 和 PERSISTENT_SEQUENTIAL 在创建时,ZK 会自动在节点名后追加一个 10 位单调递增序号

// DataTree.java: createNode() 方法中的顺序节点处理
public String createNode(String path, byte data[], List<ACL> acl,
        long ephemeralOwner, int parentCVersion, long zxid, long time) {

    // 如果是 SEQUENTIAL 节点
    if (isSequential(path)) {
        // 1. 取父节点的 cversion 作为序号基准
        int suffix = parentCVersion;

        // 2. 生成路径: /path/prefix + 0000000000
        path = makeSequentialNodeName(path, suffix);
        // makeSequentialNodeName 的实现:
        // path + String.format("%010d", suffix)
    }

    // 3. 创建 DataNode 并写入
    DataNode child = new DataNode(data, acl, stat);
    parent.addChild(childName);
    dataTree.nodes.put(path, child);
}

注意:序号是基于父节点的 cversion,不是全局自增计数器。这避免了全局竞争,提高了并发度。

4.4 ACL 权限模型

// 权限位定义 (ZooDefs.Perms)
int READ   = 1 << 0;  // 可以读取节点数据
int WRITE  = 1 << 1;  // 可以写入节点数据
int CREATE = 1 << 2;  // 可以创建子节点
int DELETE = 1 << 3;  // 可以删除子节点
int ADMIN  = 1 << 4;  // 可以设置 ACL
int ALL    = READ | WRITE | CREATE | DELETE | ADMIN;

三种 ACL Scheme:

world   → world:anyone         (任何人)
auth    → auth:user:password   (认证的用户)
digest  → digest:user:base64(sha1(user:pass))  (用户名密码摘要)
ip      → ip:192.168.1.0/24   (IP 白名单)

企业内部推荐实践:生产环境 ZK 一定要开 ACL,至少区分:

  • 应用 A 的配置路径 /config/app-A/* 只给 app-A digest 权限
  • 运维管理路径给管理员,ip 白名单
  • world:anyone 彻底踢掉

4.5 ZK 为什么不适合做大容量存储

这是一个面试高频题。根本原因:

1. 数据全量在内存
   - DataTree 用 ConcurrentHashMap 存所有 ZNode
   - 每个 ZNode 限制 1MB(jute buffer 硬编码)
   - 100万个 1KB 的 ZNode ≈ 1GB 内存 + JVM 开销 ≈ 2-3GB

2. 写操作是全局串行的
   - ZAB 保证所有写操作全局有序
   - Leader 单点处理写请求
   - 写 QPS 通常 1-3万/s(取决于数据大小和网络延迟)

3. 快照和事务日志随数据量线性膨胀
   - 快照是全量 DataTree 序列化
   - 启动时需要加载快照 + 重放事务日志
   - 数据量大 = 启动慢、恢复慢

结论:ZK 是协调服务,配置、元数据、锁、选举信息——这类热数据、小数据适合存 ZK。业务数据请走数据库/消息队列。


五、Session 机制 —— 客户端连接的生命线

Session 是整个 ZK 客户端交互的核心抽象。客户端从连接到断开,所有操作都在一个 Session 的上下文中进行。理解 Session 机制是排查 ZK 连接问题的前提。

5.1 Session 是什么

┌──────────────────────────────────────────────────────────────┐
│                     Session 全生命周期                         │
│                                                              │
│  客户端                      服务端 (Leader)                   │
│  ──────                      ────────────────                 │
│                                                              │
│  1. new ZooKeeper(connectString, timeout, watcher)           │
│     │                                                        │
│     │  发送 ConnectRequest                                    │
│     ├────────────────────────────────────────►│               │
│     │                                         │               │
│     │                    2. Leader 创建 Session              │
│     │                    ┌───────────────────┐               │
│     │                    │ sessionId =        │               │
│     │                    │   serverId(8bit)  │               │
│     │                    │ + 时间戳(40bit)    │               │
│     │                    │ + 计数器(16bit)    │               │
│     │                    │                   │               │
│     │                    │ timeout =          │               │
│     │                    │ min(clientTimeout, │               │
│     │                    │     maxSessionTimeout)            │
│     │                    │                   │               │
│     │                    │ 加入 SessionTracker│               │
│     │                    └───────────────────┘               │
│     │                                         │               │
│     │  返回 ConnectResponse (sessionId, timeout)             │
│     │◄────────────────────────────────────────┤               │
│     │                                         │               │
│     ▼                                         │               │
│  ┌──────────────────────┐                     │               │
│  │ 状态: CONNECTED       │                     │               │
│  │ 定时发 PING (心跳)     │                     │               │
│  └──────────────────────┘                     │               │
│     │                                         │               │
│     │  ... 正常工作 ...                       │               │
│     │                                         │               │
│     │  网络断开                                │               │
│  ┌──┴───────────────────┐                    │               │
│  │ 状态: DISCONNECTED    │                    │               │
│  │ 自动重连           │                       │               │
│  │ ConnectString 下一个  │                     │               │
│  │ 地址尝试...          │                      │               │
│  └──┬───────────────────┘                    │               │
│     │                                         │               │
│     │  重连成功 (sessionTimeout 时间内)       │               │
│     ├────────────────────────────────────────►│               │
│     │  验证 sessionId + password              │               │
│     │◄────────────────────────────────────────┤               │
│     ▼                                         │               │
│  ┌──────────────────────┐                     │               │
│  │ 状态: CONNECTED       │                     │               │
│  │ Session 恢复!         │                     │               │
│  └──────────────────────┘                     │               │
│                                                │               │
│  ... 如果超过 sessionTimeout 没连回来 ...      │               │
│                                                ▼               │
│                              ┌────────────────────────────┐   │
│                              │ Leader: Session 过期!       │   │
│                              │ 1. 关闭 Session             │   │
│                              │ 2. 删除该 Session 所有      │   │
│                              │    临时节点                  │   │
│                              │ 3. 触发 Watcher             │   │
│                              └────────────────────────────┘   │
│                                                              │
│  客户端终于连回来了...                                         │
│     │                                                        │
│     │  服务端返回 SessionExpiredException!                     │
│     │◄────────────────────────────                           │
│     ▼                                                        │
│  ┌──────────────────────┐                                    │
│  │ 状态: EXPIRED         │                                    │
│  │ 需要 new ZooKeeper()  │                                    │
│  │ 重新建立 Session       │                                    │
│  └──────────────────────┘                                    │
└──────────────────────────────────────────────────────────────┘

5.2 sessionId 生成规则

// SessionTrackerImpl.java: initializeNextSession()
public static long initializeNextSession(long id) {
    long nextSid;
    // serverId: 取 server id 的低 8 位
    // 左移 56 位,放在最高 8 位
    nextSid = (System.currentTimeMillis() << 24) >>> 8;
    // 中间 40 位: 当前时间戳
    nextSid = nextSid | (id << 56); // id 就是 myid
    return nextSid;
}
sessionId 结构 (64-bit):

┌──────────┬────────────────────────────┬──────────────────┐
│  8 bit   │         40 bit             │     16 bit       │
│ serverId │      毫秒时间戳             │   自增计数器       │
└──────────┴────────────────────────────┴──────────────────┘

为什么这样设计:
- serverId 在高位:同一个 server 重启后,新 sessionId > 旧 sessionId
- 时间戳在中间:保证跨 server 的唯一性
- 计数器在低位:同一毫秒内可以创建 65535 个 session

5.3 SessionTracker —— 分桶管理的艺术

SessionTrackerImpl 是管理所有 Session 的核心组件,采用分桶(Bucketing)策略来高效管理过期:

// SessionTrackerImpl.java
public class SessionTrackerImpl implements SessionTracker {
    // 所有 session 的映射
    HashMap<Long, SessionImpl> sessionsById = new HashMap<>();

    // 分桶: key=过期时间点, value=该时间点过期的 session 集合
    HashMap<Long, SessionSet> sessionSets = new HashMap<>();

    // 以下三个参数控制分桶粒度
    long nextExpirationTime;  // 下次检查过期的时间点
    int expirationInterval;   // 检查间隔 = tickTime (通常 2000-3000ms)
    long sessionTimeout;      // session 超时时间
}

分桶机制图解

假设 tickTime = 2000ms, sessionTimeout = 10000ms

时间线 (每个刻度 = 1 tick = 2000ms):
Tick:  0     1     2     3     4     5     6     7     8     ...

Session 分配规则:
- 当前 tick = 2 时创建的 session
  → 过期时间 = tick(2) × 2000 + 10000 = 14000ms = tick 7
  → 放入 Bucket[7]

Bucket 结构:
┌──────────┬──────────┬──────────┬──────────┬──────────┐
│ Bucket[3]│ Bucket[4]│ Bucket[5]│ Bucket[6]│ Bucket[7]│ ...
├──────────┼──────────┼──────────┼──────────┼──────────┤
│ session1 │ session4 │ session6 │          │ session2 │
│ session3 │ session5 │          │          │ session8 │
└──────────┴──────────┴──────────┴──────────┴──────────┘
                                                  ▲
                                          当前 tick=7 时,
                                          这个桶里所有 session 过期!

关键代码:
while (true) {
    // 当前 tick
    long now = (System.currentTimeMillis() / expirationInterval);

    // 检查是否有桶到期
    if (now >= nextExpirationTime) {
        SessionSet set = sessionSets.remove(nextExpirationTime);
        if (set != null) {
            for (SessionImpl s : set.sessions) {
                // 检查心跳时间, 超出 timeout 则过期
                if (s.tickTime + timeout < System.currentTimeMillis()) {
                    expire(s);
                }
            }
        }
        nextExpirationTime += expirationInterval;
    }
    Thread.sleep(expirationInterval / 2); // 半 tick 检查一次
}

分桶的核心优势

  • 不需要遍历所有 Session 来判断过期(O(n) → O(桶数))
  • 只需检查当前到期的桶,时间复杂度接近 O(1)
  • 桶的粒度由 expirationInterval(即 tickTime)决定

5.4 Session 激活与心跳

Session 的活性通过心跳(Ping)维持:

客户端视角:
┌─────────────────────────────────────────────┐
│ SendThread: 每 (sessionTimeout * 2/3) 发 Ping │
│                                             │
│ if (now - lastSend > readTimeout) {          │
│     sendPing();  // 往服务端发 PING 请求      │
│ }                                            │
│                                             │
│ readTimeout = sessionTimeout * 2/3          │
│ 例如: sessionTimeout = 10000ms              │
│       readTimeout = 6666ms                  │
└─────────────────────────────────────────────┘

服务端视角:
┌──────────────────────────────────────────────────┐
│ 收到任何请求 (PING/GET/SET/...) 都视为心跳         │
│ → 更新 session.tickTime = now                    │
│ → 移动 session 到新的过期桶                       │
│                                                  │
│ Session 过期条件:                                  │
│   tickTime + sessionTimeout < System.currentTimeMillis() │
└──────────────────────────────────────────────────┘

5.5 临时节点的级联删除

当 Session 过期后,Leader 负责清理该 Session 创建的所有临时节点。这个过程是一个事务

// SessionTrackerImpl.java: expire()
private void expire(SessionImpl s) {
    // 1. 收集该 session 的所有临时节点
    Set<String> ephemerals = zk.getZKDatabase()
        .getEphemerals(s.sessionId);

    // 2. 构造一个"CloseSession"请求,提交给 ZAB
    //    这是一个事务,保证原子性
    Request request = new Request(null, s.sessionId, 0,
        OpCode.closeSession, ...);
    request.setTxn(new CloseSessionTxn(ephemeralPaths));

    // 3. 通过正常的 ZAB Proposal → Commit 流程执行
    //    保证集群内所有节点一致删除
    zk.submitRequest(request);

    // 4. 触发 Watcher
    for (String path : ephemerals) {
        // dataWatches 和 childWatches 都会触发
        zk.getZKDatabase().getDataTree()
            .triggerWatch(path, EventType.NodeDeleted);
    }
}

这意味着:临时节点的删除操作与正常写操作共享一个 ZAB 队列,是严格有序的。如果你依赖某个临时节点做分布式锁,当持有者 Session 过期时,锁的释放是全局一致的——不会出现 A 节点认为已释放、B 节点认为还持有的一致性问题。


六、Watcher 机制 —— 变更通知的核心

6.1 设计哲学:一次性触发

ZK 的 Watcher 有一个核心设计:一次性触发(One-Time Trigger)。注册的 Watch 被触发一次后自动删除。

为什么是一次性的?

1. 简化服务端实现
   - Watcher 只在服务端内存中,不持久化
   - 服务端重启,所有 Watcher 丢失
   - 如果 Watcher 是永久的,服务端重启后客户端期望的"持续 Watch"会静默丢失
   - 一次性机制强制客户端在每次触发后重新注册 → 自然解决了服务端重启的问题

2. 语义清晰
   - Watcher 告诉你"在某个时间点之后,数据发生了变化"
   - 不保证你看到了每一次变化
   - 客户端通过"触发→重新注册→获取最新数据"的模式来保证最终一致性

3. 避免惊群效应
   - 永久 Watcher 会导致大量客户端同时收到通知
   - 一次性 + 客户端主动重新注册,自然分散了时间窗口

6.2 服务端 WatcherManager

// WatchManager.java
public class WatchManager {
    // watchTable: path → 对该 path 注册了 Watch 的 Watcher 集合
    // key: watch path (String)
    // value: 关注这个 path 的 Watcher 集合
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<>();

    // watch2Paths: Watcher → 该 Watcher 关注的 path 集合
    // key: Watcher (通过 ServerCnxn 标识)
    // value: 该 Watcher 注册的所有 path
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<>();

    // 注册 Watcher
    public boolean addWatch(String path, Watcher watcher) {
        // 双向关联
        watchTable.computeIfAbsent(path, k -> new HashSet<>()).add(watcher);
        watch2Paths.computeIfAbsent(watcher, k -> new HashSet<>()).add(path);
        return true;
    }

    // 触发 Watcher
    public Set<Watcher> triggerWatch(String path, EventType type) {
        // 1. 从 watchTable 中取出该 path 的所有 Watcher
        Set<Watcher> watchers = watchTable.remove(path);

        // 2. 从 watch2Paths 中移除对应的关联
        for (Watcher w : watchers) {
            Set<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }

        // 3. 返回 Watcher 集合,由上层发送通知给各个客户端
        return watchers;
    }
}

双 Map 结构的作用

watchTable:  data 变化时,快速找到该 path 有哪些 Watcher 需要通知
             path → [Watcher1, Watcher2, ...]
             ↑ 这个方向的查询是 O(1)

watch2Paths: 客户端断开连接时,清理该客户端注册的所有 Watcher
             Watcher → [path1, path2, ...]
             ↑ 这个方向的查询也是 O(1)

6.3 Watcher 触发时机

操作             触发的事件类型         触发路径
────────────────────────────────────────────────────
create /x        NodeCreated          /x (parent watch)
                 NodeChildrenChanged  / (child watch on parent)

delete /x        NodeDeleted          /x (data watch)
                 NodeChildrenChanged  / (child watch on parent)

setData /x       NodeDataChanged      /x (data watch)

setData /x/a     NodeDataChanged      /x/a (data watch)
                 不会触发 /x 的任何 watch

create /x/a      NodeChildrenChanged  /x (child watch)
                 不会触发 /x 的 data watch

总结:
- data watch:    被 setData 触发,被 delete 触发
- child watch:   被 create/delete 子节点触发
- exist watch:   被 create(→Created) / delete(→Deleted) / setData(→DataChanged) 触发

6.4 客户端 Watcher 处理流程

┌─────────────────────────────────────────────────────────────┐
│                  客户端 Watcher 处理                         │
│                                                             │
│  SendThread (网络线程)          EventThread (事件回调线程)    │
│  ──────────────────────          ────────────────────────── │
│                                                             │
│  从 Socket 读到 WatchEvent                                  │
│       │                                                     │
│       ▼                                                     │
│  ┌────────────────┐                                         │
│  │ 反序列化事件     │                                        │
│  │ 构造 WatchedEvent│                                       │
│  └───────┬────────┘                                         │
│          │                                                  │
│          │  put(event)                                      │
│          ├─────────────────────────────────────►│            │
│          │                                      │            │
│          │                        ┌─────────────┴─────────┐ │
│          │                        │ waitingEvents 队列     │ │
│          │                        │ (LinkedBlockingQueue) │ │
│          │                        └─────────────┬─────────┘ │
│          │                                      │           │
│          │                                      │ take()    │
│          │                                      ▼           │
│          │                        ┌───────────────────────┐ │
│          │                        │ processEvent(event)   │ │
│          │                        │ → watcher.process(e)  │ │
│          │                        │ → 调用用户回调!        │ │
│          │                        └───────────────────────┘ │
│          │                                      │           │
│          │                                      │           │
│   SendThread                               EventThread     │
│   继续读网络数据                           单线程顺序处理    │
│   不阻塞!                                 保证回调顺序!     │
└─────────────────────────────────────────────────────────────┘

客户端关键源码

// ClientCnxn.java (ZooKeeper 3.4.10)
public class ClientCnxn {
    // 发送线程: 负责网络 I/O
    class SendThread extends ZooKeeperThread {
        void readResponse(ByteBuffer bb) {
            // 读到 WatchEvent
            WatchedEvent event = ... // 反序列化
            // 放入 EventThread 的队列,不阻塞 SendThread
            eventThread.queueEvent(event);
        }
    }

    // 事件线程: 负责回调
    class EventThread extends ZooKeeperThread {
        private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<>();

        public void queueEvent(WatchedEvent event) {
            // 根据事件类型和路径,找到之前注册的 Watcher
            switch (event.getType()) {
            case None:
                // 连接状态变化 (Disconnected/SyncConnected/Expired)
                // 交给 defaultWatcher
                break;
            case NodeDataChanged:
            case NodeDeleted:
                // 从 dataWatches 中找到该 path 的 Watcher
                Watcher w = dataWatches.remove(event.getPath());
                if (w != null) {
                    waitingEvents.put(new WatcherSetEventPair(w, event));
                }
                break;
            case NodeChildrenChanged:
                // 从 childWatches 中找到该 path 的 Watcher
                ...
                break;
            }
        }

        public void run() {
            while (true) {
                Object event = waitingEvents.take();
                if (event instanceof WatcherSetEventPair) {
                    // 调用用户的 Watcher.process()
                    ((WatcherSetEventPair) event).watcher
                        .process(((WatcherSetEventPair) event).event);
                }
            }
        }
    }
}

双线程模型的设计意图

为什么不让 SendThread 直接回调用户的 Watcher?

1. 用户 Watcher 可能阻塞
   - 用户在 Watcher 里可能做网络调用、数据库查询等耗时操作
   - 如果在 SendThread 里执行,会阻塞所有网络 I/O
   - 导致心跳无法发送 → 服务端认为客户端挂了一> Session 过期

2. EventThread 单线程保证回调顺序
   - 事件按到达顺序处理
   - Watcher A 一定在 Watcher B 之前执行(如果 A 先到达)
   - 虽然不能保证"绝对"的因果顺序,但至少保证了同一个连接的有序性

3. 隔离性
   - 即使 EventThread 里的 Watcher 抛异常,只影响后续回调
   - SendThread 不受影响,心跳和重连逻辑继续正常运转

6.5 实战场景:利用 Watcher 实现配置热更新

public class ConfigWatcher {
    private ZooKeeper zk;
    private String configValue;
    private final String configPath = "/config/app/db.url";

    public void start() throws Exception {
        zk = new ZooKeeper("10.0.0.1:2181,10.0.0.2:2181", 10000, event -> {
            if (event.getType() == Watcher.Event.EventType.None) {
                // 连接状态事件,不是数据变化
                return;
            }

            // 数据变化了!触发回调
            if (event.getPath().equals(configPath)) {
                try {
                    // 关键:重新注册 Watch (一次性机制!)
                    byte[] newData = zk.getData(configPath, this::watchCallback, null);
                    configValue = new String(newData);
                    System.out.println("配置更新: " + configValue);
                    // 这里可以触发 Spring @RefreshScope 等
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 初始加载 + 注册 Watch
        byte[] data = zk.getData(configPath, event -> {
            // 这里不能直接引用 this,用 lambda 包装
        }, null);

        // ... 但上面初始加载已经默认 Watcher 作为整体 Watcher 处理
        // 更好的写法:
        watchAndLoad();
    }

    private void watchAndLoad() throws Exception {
        // exists + getData 的原子组合思想
        // 先 register watch,再 getData
        Stat stat = zk.exists(configPath, event -> {
            if (event.getType() == Event.EventType.NodeDeleted) {
                System.out.println("配置节点被删除了!");
            } else {
                watchAndLoad(); // 递归重新注册
            }
        });

        if (stat != null) {
            byte[] data = zk.getData(configPath, false, null); // 不加 watch
            configValue = new String(data);
        }
    }
}

关键点

  • Watch 触发后必须重新注册,否则之后的变化不会收到通知
  • existsgetData 之间有时间窗口,节点可能被删了——检查返回值
  • 回调方法不要做耗时操作,考虑线程池异步处理

中篇小结

中篇核心要点回顾:

1. 数据模型:
   - ZNode 四种类型:PERSISTENT/EPHEMERAL/PERSISTENT_SEQUENTIAL/EPHEMERAL_SEQUENTIAL
   - Stat 结构 12 个字段,每个都有确切的业务含义
   - ZK 是大内存 + 单点写 + 全量快照 = 不适合做大容量存储

2. Session 机制:
   - sessionId = serverId(8bit) + timestamp(40bit) + counter(16bit)
   - 分桶管理过期:HashMap<过期时间, Session集合>
   - Session 过期 → 临时节点级联删除(作为 ZAB 事务执行)
   - 重连窗口 = sessionTimeout,超时则 Expired

3. Watcher 机制:
   - 一次性触发:触发后自动删除,客户端必须重新注册
   - 服务端双 Map 结构:watchTable + watch2Paths
   - 客户端双线程模型:SendThread(网络) + EventThread(回调)
   - 最佳实践:exists + getData 组合,回调中重新注册

下篇将展开 Leader 选举的完整算法、数据同步的三种策略、Java 客户端底层 NIO 通信,以及企业级运维最佳实践。

本文阅读量