MingJunDuan的博客
全站访问量

Elastic-Job 分布式定时任务深度剖析 —— 从概念到原理

定时任务是后端开发中最常见的需求之一。单机时代,一个 cron 表达式加上 @Scheduled 就能解决问题。但当服务集群部署后,问题立刻复杂起来:谁执行?执行几次?执行节点宕机了怎么办?

Elastic-Job 是当当网开源(现为 Apache ShardingSphere 生态)的分布式任务调度框架,Lite 模式以去中心化 + ZooKeeper 协调的设计在业界独树一帜。本文从概念到原理,深入剖析 Elastic-Job Lite 的架构设计、核心机制和源码实现。


一、为什么需要分布式定时任务

1.1 单机定时任务的三个瓶颈

┌──────────────────────────────────────────────────┐
│                  单机定时任务的困境                  │
├──────────────┬──────────────┬─────────────────────┤
│   单点故障    │   性能上限    │    运维碎片化        │
├──────────────┼──────────────┼─────────────────────┤
│ 机器宕机      │ 单机处理能力  │ 10 台机器各有自己的   │
│ 所有定时任务   │ 有限,数据量  │ @Scheduled,      │
│ 全部停摆      │ 增长后无法    │ 没有统一的管理视图    │
│              │ 按时完成      │                     │
└──────────────┴──────────────┴─────────────────────┘

单机定时任务的根本问题是:执行能力与可用性都绑定在一台机器上

1.2 分布式定时任务的核心挑战

当定时任务从单机走向分布式,需要回答三个问题:

问题 本质 Elastic-Job 的答案
谁执行? 多节点如何分工 分片机制:将任务拆成 N 个分片,分配给不同节点
执行几次? 如何避免重复执行 分片与实例一一对应,同一分片不会同时在两个节点执行
宕机怎么办? 节点故障后任务如何接管 失效转移 + 自诊断修复

1.3 业界方案对比

框架 架构 注册中心 分片 管理界面
Quartz 集群版 去中心化(数据库锁) 数据库 ❌ 不支持 ❌ 无
XXL-JOB 中心化(调度中心) 数据库(自研) ✅ 支持 ✅ 完善
Elastic-Job Lite 去中心化(ZK协调) ZooKeeper ✅ 核心特性 ❌ 无(需配合事件追踪)
SchedulerX 中心化(阿里云) 阿里云托管 ✅ 支持 ✅ 阿里云控制台

Elastic-Job 的独特之处在于:没有调度中心,所有节点对等,通过 ZooKeeper 协调。这意味着没有单点故障,任何节点宕机都不影响整体调度。


二、Elastic-Job 架构全景

2.1 Lite vs Cloud

Elastic-Job 有两个子项目:

Elastic-Job
├── Elastic-Job-Lite  ← 本文重点
│   - 轻量级,jar 包嵌入业务应用
│   - 无中心化,仅依赖 ZooKeeper
│   - 适合业务系统内嵌使用
│
└── Elastic-Job-Cloud
    - 基于 Mesos/Docker 的云原生方案
    - 有调度中心(Scheduler)
    - 资源弹性伸缩,适合大规模调度

Lite 模式是绝大多数团队的选择:引入一个 jar 包、配置一个 ZK 地址,就能拥有分布式定时任务能力。

2.2 去中心化设计哲学

                    ┌─────────────────────┐
                    │   ZooKeeper 集群     │
                    │  (唯一的协调中心)     │
                    └──┬──────┬──────┬────┘
                       │      │      │
              ┌────────┼──────┼──────┼────────┐
              │        │      │      │        │
              ▼        ▼      ▼      ▼        ▼
         ┌────────┐┌────────┐┌────────┐┌────────┐
         │ 节点 A  ││ 节点 B  ││ 节点 C  ││ 节点 D  │
         │ Quartz ││ Quartz ││ Quartz ││ Quartz │
         │ 分片 0  ││ 分片 1  ││ 分片 2  ││ 分片 3  │
         └────────┘└────────┘└────────┘└────────┘
         
核心特点:
- 每个节点都是"完整的":本地 Quartz 调度 + 任务执行逻辑
- 节点之间不知道彼此存在,只与 ZK 通信
- ZK 是协调中心,不是调度中心 —— 它不"发号施令",只提供状态存储 + 变更通知

这个设计的关键洞察是:调度逻辑下沉到每个节点,协调逻辑上浮到 ZK。每个节点本地 Quartz 到点就触发,但”该不该执行”由 ZK 中的分片信息决定。

2.3 整体启动流程

应用启动
    │
    ▼
注册中心连接(ZK)
    │
    ▼
作业注册:写入 config、servers、instances 节点
    │
    ▼
主节点选举(LeaderLatch)
    │
    ▼
主节点执行分片分配
    │
    ▼
开启本地 Quartz 调度
    │
    ▼
Cron 触发 → 查 ZK 分片信息 → 执行分配给自己的分片

三、ZooKeeper 注册中心 —— 分布式协调的基石

3.1 为什么选 ZooKeeper?

Elastic-Job 的协调需求正好命中 ZK 的三大核心能力:

ZK 特性 Elastic-Job 用途
临时节点 实例上线建临时节点,宕机自动删除 → 集群自动感知节点变化
Watcher 机制 监听分片变化、配置变更、实例上下线 → 事件驱动,无需轮询
分布式锁(Curator) 主节点选举(LeaderLatch)、失效转移竞争 → 互斥操作保证

结合前文 ZK Watcher 机制分析的知识:Elastic-Job 依赖 Curator 的 PathChildrenCacheNodeCache 自动处理 Watch 重注册,开发者无需关心 ZK 一次触发的问题。

3.2 命名空间与节点树全景

每个作业在 ZK 中的完整目录结构:

/${namespace}/${jobName}/
│
├── config                          ← 持久节点,JSON 格式
│   └── 存储:cron、分片总数、分片参数、作业类型、监听器配置等
│
├── servers/                        ← 持久节点
│   ├── 192.168.1.101              ← 持久节点,存服务器状态
│   └── 192.168.1.102
│
├── instances/                      ← 临时节点(核心!)
│   ├── 192.168.1.101@-@8372       ← 实例下线后自动删除
│   └── 192.168.1.102@-@9231       ← 格式:IP@进程ID
│
├── sharding/                       ← 分片信息
│   ├── 0/
│   │   ├── instance               ← 持久节点:该分片分配给哪个实例
│   │   ├── running                ← 临时节点:分片正在执行
│   │   ├── misfire                ← 持久节点:错过执行标记
│   │   └── disabled               ← 持久节点:禁用标记
│   ├── 1/
│   │   └── ...
│   └── 2/
│       └── ...
│
├── leader/                         ← 主节点相关
│   ├── election/
│   │   ├── latch/                 ← 选举用临时顺序节点(Curator 管理)
│   │   └── instance               ← 临时节点:当前主节点实例 ID
│   ├── sharding/
│   │   ├── necessary              ← 持久节点:标记需要重新分片
│   │   └── processing             ← 临时节点:正在执行分片(阻塞任务)
│   └── failover/
│       ├── latch/                 ← 失效转移竞争锁
│       └── items/                 ← 待转移的分片项
│           ├── 0
│           └── 2
│
└── guarantee/                      ← 分布式栅栏
    ├── started/                    ← 标记已启动的实例
    └── completed/                  ← 标记已完成的实例

3.3 节点类型与生命周期的对应关系

一个核心设计原则:

临时节点 (EPHEMERAL)
  ├── instances/*       → 实例存活 = ZK Session 存活
  ├── sharding/*/running → 任务正在执行 = Session 存活
  ├── leader/election/instance → 主节点存活 = Session 存活
  └── leader/sharding/processing → 正在分片 = Session 存活

持久节点 (PERSISTENT)
  ├── config            → 只要作业存在就存在
  ├── servers/*         → 服务器注册信息持久化
  ├── sharding/*/instance → 分片分配关系持久化(超出 Session 生命周期)
  └── leader/sharding/necessary → 重分片标记持久化

关键设计意图:临时节点绑定 Session 生命周期,宕机即消失 → 触发集群感知。持久节点记录分配关系,超出单次 Session → 用于一致性校验和恢复。

3.4 连接 ZK 的代码入口

// 注册中心配置
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
    new ZookeeperConfiguration("zk1:2181,zk2:2181,zk3:2181", "elasticjob-namespace")
);
regCenter.init();

// 作业配置
JobConfiguration jobConfig = JobConfiguration.newBuilder("myJob", 3)
    .cron("0/5 * * * * ?")
    .shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou")
    .build();

// 启动
new ScheduleJobBootstrap(regCenter, new MySimpleJob(), jobConfig).schedule();

几行代码就完成了一个分布式定时任务的搭建。背后是 ZK 节点树的全自动管理。

3.5 单节点执行 vs 多节点执行 —— ZK 数据结构的差异

很多开发者以为”单节点执行”就是只有一个实例注册到 ZK。实际上并非如此——差异不在 instances/,而在 sharding/ 子树。

场景设定

3 个实例:节点 A (IP 101)、节点 B (IP 102)、节点 C (IP 103)
作业名:myJob,命名空间:elasticjob

单节点执行(shardingTotalCount = 1)

/${namespace}/myJob/
│
├── config
│   └── {"cron":"0/5 * * * * ?", "shardingTotalCount":1, ...}
│
├── servers/
│   ├── 192.168.1.101
│   ├── 192.168.1.102
│   └── 192.168.1.103
│
├── instances/                         ← 3 个实例都注册了!
│   ├── 192.168.1.101@-@8372
│   ├── 192.168.1.102@-@9231
│   └── 192.168.1.103@-@5721
│
├── sharding/
│   └── 0/                             ← 只有一个分片
│       ├── instance → "192.168.1.102@-@9231"  ← 只分给 B
│       ├── running  → "192.168.1.102@-@9231"  ← 只有 B 在执行
│       ├── misfire
│       └── disabled
│
├── leader/                            ← 与多节点完全相同
│   ├── election/...
│   ├── sharding/...
│   └── failover/...
│
└── guarantee/...

执行时刻:

10:00:00 Cron 触发
  节点 A → 查 /sharding/0/instance = "B@9231" ≠ 我  → skip
  节点 B → 查 /sharding/0/instance = "B@9231" = 我  → 执行!
  节点 C → 查 /sharding/0/instance = "B@9231" ≠ 我  → skip

关键:3 个实例都在 ZK 注册,3 个 Quartz 都触发,但只有 1 个真正执行。任意一个宕机,分片可以 failover 到另一个实例——这就是”弹性”。

多节点执行(shardingTotalCount = 3)

/${namespace}/myJob/
│
├── config
│   └── {"cron":"0/5 * * * * ?", "shardingTotalCount":3,
│         "shardingItemParameters":"0=Beijing,1=Shanghai,2=Guangzhou", ...}
│
├── instances/                         ← 完全相同!
│   ├── 192.168.1.101@-@8372
│   ├── 192.168.1.102@-@9231
│   └── 192.168.1.103@-@5721
│
├── sharding/                          ← 这里有 3 个分片子目录
│   ├── 0/
│   │   ├── instance → "192.168.1.101@-@8372"  ← 分给 A
│   │   ├── running  → "192.168.1.101@-@8372"  ← A 在执行
│   │   ├── misfire
│   │   └── disabled
│   ├── 1/
│   │   ├── instance → "192.168.1.102@-@9231"  ← 分给 B
│   │   ├── running  → "192.168.1.102@-@9231"  ← B 在执行
│   │   ├── misfire
│   │   └── disabled
│   └── 2/
│       ├── instance → "192.168.1.103@-@5721"  ← 分给 C
│       ├── running  → "192.168.1.103@-@5721"  ← C 在执行
│       ├── misfire
│       └── disabled
│
├── leader/                            ← 完全相同
│   ├── election/...
│   ├── sharding/...
│   └── failover/...
│
└── guarantee/                         ← 子节点数量不同
    ├── started/
    │   ├── 0 → 101
    │   ├── 1 → 102
    │   └── 2 → 103
    └── completed/
        ├── 0 → 101
        ├── 1 → 102
        └── 2 → 103

差异对照一览

ZK 路径                    shardingTotalCount=1    shardingTotalCount=3
─────────────────────────  ─────────────────────   ─────────────────────
config                     相同(仅 totalCount 不同)  相同
servers/*                  3 个持久节点             3 个持久节点(相同)
instances/*                3 个临时节点             3 个临时节点(相同)
leader/election/*          相同                    相同
leader/sharding/*          相同                    相同
leader/failover/*          相同                    相同

sharding/                  只有 0/                  有 0/、1/、2/
  {item}/instance          1 个                    3 个,各自指向不同实例
  {item}/running           最多 1 个同时存在         最多 3 个同时存在
  {item}/misfire           1 个                     3 个

guarantee/started/         最多 1 个子节点           最多 3 个子节点
guarantee/completed/       最多 1 个子节点           最多 3 个子节点

特殊情况:实例数 > 分片数

shardingTotalCount = 2,实例 = 3 (A, B, C)

平均分配结果:
  A: 分片 0
  B: 分片 1
  C: 无分片

ZK 上的 sharding/:
  sharding/0/instance → A@8372
  sharding/1/instance → B@9231

执行时:
  节点 A → 执行分片 0 ✓
  节点 B → 执行分片 1 ✓
  节点 C → instances 里有它,但 sharding/ 下没有它的分片 → skip

核心认知:

┌────────────────────────────────────────────────────────┐
│                                                       │
│  instances/ 反映"谁在线"                                │
│  sharding/  反映"谁执行什么"                             │
│                                                       │
│  两者是独立的。在线不一定有分片,有分片一定在线。          │
│                                                       │
│  单节点执行 = sharding/ 下只有 1 个子目录                 │
│  多节点执行 = sharding/ 下有 N 个子目录                   │
│                                                       │
│  其他 ZK 路径(instances、leader、servers)完全一样。      │
│  分片数 ≠ 实例数,两者设计上就解耦。                       │
└────────────────────────────────────────────────────────┘

四、主节点选举 —— 谁来做全局决策

4.1 为什么需要主节点?

去中心化架构中,大部分操作(触发调度、执行任务)各节点独立完成。但有一个操作需要唯一决策者:分片分配

如果每个节点都自己算分片,可能出现 A 算出来分片 0 给自己、B 也算出来分片 0 给自己 —— 造成重复执行。所以需要一个”临时协调者”来做分片计算并把结果写入 ZK。

主节点不是调度中心,它不做任务调度,只做分片分配这一件事。

4.2 LeaderLatch 选举原理

Elastic-Job 使用 Curator 的 LeaderLatch 实现选举,底层是 ZK 的临时顺序节点:

// 源码位置:LeaderService.electLeader()
public void electLeader() {
    log.debug("Elect a new leader now.");
    jobNodeStorage.executeInLeader(LeaderNode.ELECTION_LATCH, new LeaderElectionExecutionCallback());
    log.debug("Leader election completed.");
}

// LeaderElectionExecutionCallback.execute()
public void execute() {
    if (!hasLeadership()) {
        return;
    }
    // 在 /leader/election/instance 写入自己的实例 ID
    jobNodeStorage.fillEphemeralJobNode(LeaderNode.ELECTION_INSTANCE, 
        JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
选举过程:
时刻 T0: 3 个节点同时启动,各自在 /leader/election/latch/ 下创建临时顺序节点
         latch/_c_0000000001  ← 节点 A (最小,获得 Leader)
         latch/_c_0000000002  ← 节点 B
         latch/_c_0000000003  ← 节点 C
         
时刻 T1: 节点 A 发现自己序号最小 → 获取 Leadership
         └→ 在 /leader/election/instance 写入 "192.168.1.101@-@8372"

时刻 T2: 节点 A 宕机 → 临时节点 _c_0000000001 和 instance 自动删除
         └→ 节点 B 通过 Watcher 感知 → 成为新 Leader
         └→ 写入 /leader/election/instance = "192.168.1.102@-@9231"

4.3 以作业为维度的选举

不同作业的 Leader 可以是不同节点。例如:

集群 3 节点: A(101), B(102), C(103)

作业 "orderJob"  → Leader: A
作业 "userJob"   → Leader: B
作业 "reportJob"  → Leader: C

这避免了”所有作业的主节点都在一台机器上”的热点问题。每个作业独立选举,选举范围只在同一 /${namespace}/${jobName}/leader/election/ 路径下。

4.4 选举触发时机

触发场景 触发方式 说明
作业初始化 registerStartUpInfo() 中调用 leaderService.electLeader() 每个节点启动时都参与选举
主节点宕机 Watcher 感知 leader/election/instance 删除 剩余节点重新竞争
主节点放弃 主节点主动 close() 释放 LeaderLatch 正常下线

4.5 主节点的职责范围

主节点职责(仅两件事):
┌─────────────────────────────────────────────┐
│ 1. 分片分配                                  │
│    - 感知到 necessary 标记后执行分片分配       │
│    - 将分片结果写入 /sharding/{item}/instance │
│                                             │
│ 2. 失效转移标记                              │
│    - 感知到实例下线后标记失效分片              │
│    - 写入 /leader/failover/items/{item}      │
└─────────────────────────────────────────────┘

主节点 NOT 做的事:
✗ 不调度任务(各节点本地 Quartz 独立触发)
✗ 不执行失效转移(所有空闲节点竞争执行权)
✗ 不收集执行结果(各节点独立回写 ZK)

五、分片机制 —— 弹性扩容的核心

5.1 分片概念澄清

Elastic-Job 的分片不是数据分片。它分的是”执行权”(即谁执行哪部分逻辑)。数据怎么分,由开发者在 process() 方法中根据分片参数自行处理。

shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou

分片结果(平均分配,2 个实例):
  实例 A: 分片 0, 1  →  处理 Beijing + Shanghai 的数据
  实例 B: 分片 2      →  处理 Guangzhou 的数据

5.2 分片执行全流程

Step 1: 标记阶段
    ┌─ 触发条件发生后,主节点在 /leader/sharding/necessary 创建持久节点
    └─ 这是一个"信号量",所有节点监听这个节点

Step 2: 阻塞阶段
    ┌─ 主节点创建 /leader/sharding/processing 临时节点
    └─ 所有执行节点发现 processing 存在 → 阻塞等待(暂停任务执行)
    
Step 3: 计算阶段
    ┌─ 主节点读取 /instances 获取在线实例列表
    │─ 主节点读取 /config 获取分片总数
    │─ 主节点按策略计算分片分配方案
    └─ 实例列表按 IP 排序(保证多次计算结果一致)

Step 4: 写入阶段
    ┌─ 主节点事务写入 /sharding/0/instance, /sharding/1/instance ...
    └─ 各节点通过 Watcher 感知分片变更,读取自己负责的分片

Step 5: 解除阶段
    ┌─ 主节点删除 /leader/sharding/processing
    └─ 所有节点退出阻塞,按新分片方案执行任务

关键设计:任务执行过程中即使有节点上下线,也不会立即重新分片,只标记 necessary。等到下次任务触发前才执行分片。这保证了单次执行周期内的分片稳定性。

5.3 分片策略详解

// 策略接口
public interface JobShardingStrategy {
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, 
                                              String jobName, 
                                              int shardingTotalCount);
}

5.3.1 平均分配策略(默认)

分片数: 5, 实例: [A, B, C]

算法:
 每个实例分到: 5 / 3 = 1 个,余数 2
 前 2 个实例各多分 1 个

结果:
 A: [0, 1]
 B: [2, 3]
 C: [4]

源码逻辑(AverageAllocationJobShardingStrategy):
 int shardingCount = shardingTotalCount / instancesCount;   // 1
 int remain = shardingTotalCount % instancesCount;           // 2
 // 前 remain 个实例各多分配 1 片

5.3.2 轮询分配策略

作业名哈希 对实例列表轮转后平均分配。优点:同一实例不会在所有作业中都分配到相同分片号。

5.3.3 奇偶分配策略

根据实例数量的奇偶性决定 IP 排序方向(升序/降序),轮转后再平均分配。在多作业场景下,分片分布更均匀。

5.3.4 自定义策略

public class MyShardingStrategy implements JobShardingStrategy {
    @Override
    public Map<JobInstance, List<Integer>> sharding(
            List<JobInstance> instances, String jobName, int shardingTotalCount) {
        // 根据机房标签、机器负载等自定义分配逻辑
        Map<JobInstance, List<Integer>> result = new HashMap<>();
        // ... 自定义逻辑
        return result;
    }
}

5.4 触发重新分片的四个条件

条件 ZK 事件 触发者
实例上线 instances/ 下新增临时节点 新启动节点的 Watcher
实例下线 instances/ 下临时节点被删除 主节点的 Watcher
分片总数变更 config 节点数据变化 配置变更 Watcher
自诊断修复 ReconciliationService 检测到不一致 主节点

所有触发最终都执行同一个操作:/leader/sharding/necessary 创建标记节点

5.5 实例排序的稳定性保证

分片分配使用 JobInstance 列表,排序依据是 IP 地址

// 源码中所有策略都以相同的顺序排列实例
Collections.sort(jobInstances);  // JobInstance 实现了 Comparable,按 IP 排序

这保证了:只要在线实例集合相同,每次计算的结果完全一致。避免了”抖动”——分片不会在无变化的情况下重新分配。

5.6 源码关键路径

// ShardingService.shardingIfNecessary() —— 分片入口
public void shardingIfNecessary() {
    // 获取可用的分片执行器(需要获取分布式锁)
    Optional<LeaderElectionExecutionCallback> optional = 
        createShardingExecutionCallback();
    if (!optional.isPresent()) {
        return;  // 未获取到锁,其他主节点在处理
    }
    // 检查是否真的需要分片
    if (!isNeedSharding()) {
        return;
    }
    // 执行分片(在分布式锁保护下)
    optional.get().execute();
}

// ShardingExecuteCallback.execute() —— 核心分片逻辑
public void execute() {
    // 1. 获取在线实例
    List<String> instances = jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
    // 2. 获取分片策略并计算
    Map<JobInstance, List<Integer>> shardingResult = 
        jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount);
    // 3. 事务写入 ZK
    for (Map.Entry<JobInstance, List<Integer>> entry : shardingResult.entrySet()) {
        for (int shardItem : entry.getValue()) {
            jobNodeStorage.replaceJobNode(
                ShardingNode.getInstanceNode(shardItem), 
                entry.getKey().getJobInstanceId());
        }
    }
}

六、任务执行全流程 —— 从 Cron 触发到结果回写

6.1 整体时序

时间轴 →

T-0:      所有节点的本地 Quartz 同时触发 Cron 信号
              │
T-1:      每个节点检查 /leader/sharding/necessary 是否需要分片
              │   ├─ 需要 → 等待主节点完成分片
              │   └─ 不需要 → 继续
              │
T-2:      每个节点读取 /sharding/*/instance,获取自己负责的分片号
              │   ├─ "我有分片 [0, 1]" → 执行
              │   └─ "我没有分片"  → 跳过(这是正常情况!)
              │
T-3:      创建 /sharding/0/running 临时节点(执行锁)
              │
T-4:      执行 before 监听器(分布式栅栏:所有分片就绪才放行)
              │
T-5:      执行 process(ShardingContext) 业务逻辑
              │
T-6:      执行 after 监听器(分布式栅栏:所有分片完成才放行)
              │
T-7:      删除 /sharding/0/running 临时节点
              │
T-8:      等待下一个 Cron 触发

6.2 “所有节点同时触发,但只有部分执行”的设计

这是 Elastic-Job 最有意思的设计之一:

┌──────────────────────────────────────────────┐
│ 为什么不让主节点触发,然后分发给执行节点?        │
├──────────────────────────────────────────────┤
│ 1. 去中心化:主节点宕机不影响任务调度触发        │
│ 2. 降低延迟:不需要"调度→分发→确认"的链路       │
│ 3. 简化模型:每个节点的 Quartz 独立运行          │
│                                              │
│ 代价:节点时钟不一致可能导致触发偏差             │
│ 解决:依赖 ZK 的分片信息做"执行守卫"             │
└──────────────────────────────────────────────┘

每个节点触发后做的第一件事是查 ZK 上的分片分配:我是分片 0 的执行者吗?是就执行,不是就跳过。这保证了即使所有节点同时触发,每个分片只被执行一次。

6.3 分布式栅栏 —— 执行前/后监听器

场景:3 个分片分布在 2 台机器上
  机器 A: 分片 0, 1
  机器 B: 分片 2

before 监听器(分布式栅栏):
  机器 A 分片 0 done ─────┐
  机器 A 分片 1 done ─────┼──→ 全部就绪 → 放行
  机器 B 分片 2 done ─────┘

作用:确保所有分片的数据准备就绪后再开始执行业务逻辑

实现原理:

// GuaranteeService.registerStart() —— 分布式开始栅栏
public void registerStart(List<Integer> shardingItems) {
    for (int each : shardingItems) {
        // 在 /guarantee/started/{item} 创建临时节点
        jobNodeStorage.fillEphemeralJobNode(
            GuaranteeNode.getStartedNode(each), "");
    }
}

// 等待所有分片就绪的判断
public boolean isAllStarted() {
    return jobNodeStorage.getJobNodeChildrenKeys(GuaranteeNode.STARTED_ROOT)
        .size() >= configService.load(true).getTypeConfig().getCoreConfig()
            .getShardingTotalCount();
}

6.4 错过任务重执行(Misfire)

当任务执行时间超过 Cron 间隔时,Quartz 会标记 Misfire。Elastic-Job 的处理:

Cron: 每 5 秒执行,但上次执行花了 8 秒
         │
    0s   ├─ 触发#1 (执行 8s, 到 8s 结束)
    5s   ├─ 触发#2 应该发生但 #1 还在跑 → Misfire!
   10s   ├─ 触发#3 (正常)
    
Elastic-Job 处理:
    触发#2 被标记为 misfire → 写入 /sharding/{item}/misfire
    → 如果配置了 misfire=true,在下次触发时补执行
Misfire 配置:
  misfire: true   → 错过触发的任务会被补偿执行
  misfire: false  → 错过就错过了,等下一次正常触发

6.5 Cron 表达式深度解析 —— 四个常见追问

6.5.1 Cron 是谁控制的?每个节点各自执行吗?

答案:每个节点有自己的 Quartz 实例,各自独立触发,互不通信。

┌──────────────────────────────────────────────────┐
│          常见的误解:"主节点触发然后分发"            │
│                                                  │
│  ✗ 错误理解:                                     │
│    主节点 Quartz 触发 → 通知 A 执行分片 0           │
│                      → 通知 B 执行分片 1           │
│                                                  │
│  ✓ 实际机制:                                     │
│    节点 A Quartz 触发 → 查 ZK → "我的分片是 0" → 执行│
│    节点 B Quartz 触发 → 查 ZK → "我的分片是 1" → 执行│
│    节点 C Quartz 触发 → 查 ZK → "我没有分片"   → 跳过│
│                                                  │
│    三个 Quartz 同时响、各自判、互不依赖              │
└──────────────────────────────────────────────────┘

Cron 表达式的流转路径:

开发者配置 → 写入 ZK /config(持久节点)→ 每个节点启动时读 ZK → 设置本地 Quartz

所以 Cron 表达式的控制链是:你配置 → ZK 存储 → 节点自取 → Quartz 自调。没有”中心调度器”。

6.5.2 后启动的实例会晚于先启动的实例吗?

会,但仅限于第一个周期。之后完全同步。

场景:Cron = "0/5 * * * * ?"(在 :00, :05, :10, :15... 触发)

节点 A 启动于 10:00:00
  10:00:00 → 触发 ✓
  10:05:00 → 触发 ✓

节点 B 启动于 10:02:00
  10:00:00 → 已过时,不触发(或标记 misfire)
  10:05:00 → 触发 ✓(跟 A 同时)

关键认知:Quartz 的 Cron 是基于绝对壁钟时间,不是”启动后每 5 分钟”。0/5 * * * * ? 的含义是”在分钟数为 0、5、10、15… 的时间点触发”,不是”启动后经过 5 分钟触发”。只要两个节点的时钟同步(NTP),它们会在同一时刻触发。

6.5.3 Cron 是什么时候写入 ZK 的?每个实例都会写吗?

第一个实例写入,后续实例只读不写。

// 源码关键路径:SchedulerFacade.registerStartUpInfo()
// → ConfigService.setUpConfiguration()
public void setUpConfiguration(LiteJobConfiguration liteJobConfig) {
    if (jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT)) {
        // ZK 上已存在 → 仅校验,不覆盖
        LiteJobConfiguration existing = LiteJobConfigurationGsonFactory.fromJson(
            jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT));
        if (!existing.equals(liteJobConfig)) {
            log.warn("Job configuration on ZK is different from local, using ZK's.");
        }
        return;  // 直接返回,不写入
    }
    // ZK 上不存在 → 写入(通常是第一个实例执行)
    jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, 
        LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
}
时序:
  节点 A 启动 (10:00:00)
      ├─ 查 ZK /config → 不存在
      └─ 写入 /config = {"cron":"0/5 * * * * ?", ...}
      
  节点 B 启动 (10:02:00)
      ├─ 查 ZK /config → 已存在 ✓
      ├─ 比较本地 cron 与 ZK cron → 一致 → 跳过写入
      └─ 读 ZK /config → 设置本地 Quartz

  节点 C 启动 (10:03:00)
      └─ 同上,只读不写

关键结论:/config 是持久节点,写一次,永久存在。 如果启动时发现本地 cron 与 ZK 不一致,以 ZK 为准(并打印警告日志)。

6.5.4 后台修改 Cron 表达式,怎么生效的?

ZK Watcher 通知 → 每个实例本地 reschedule Quartz。

完整链路:

  [管理后台] 修改 cron: "0/5" → "0/10"
      │
      ▼
  [ZooKeeper] /config 节点数据变更
      │
      ├──── Watcher 触发(NodeCache 感知)────┐
      │                                       │
      ▼                                       ▼
  [节点 A]                               [节点 B]
      │                                       │
      ▼                                       ▼
  比较新旧 cron:                        比较新旧 cron:
  "0/5" ≠ "0/10"                       "0/5" ≠ "0/10"
      │                                       │
      ▼                                       ▼
  Quartz 重新调度:                      Quartz 重新调度:
  scheduler.rescheduleJob(             scheduler.rescheduleJob(
    oldTrigger, newTrigger)              oldTrigger, newTrigger)
      │                                       │
      ▼                                       ▼
  下次按新 cron 触发:                   下次按新 cron 触发:
  10:10 → 10:20 → 10:30              10:10 → 10:20 → 10:30

时效性分析:

假设当前时间 10:08,修改 cron:

  旧 cron: "0/5 * * * * ?"  → 下次触发 10:10
  新 cron: "0/10 * * * * ?" → 下次触发 10:10
  
  → cron 变化但下次触发恰好重合(10:10),延迟约 2 分钟

  旧 cron: "0/5 * * * * ?"  → 下次触发 10:10
  新 cron: "0/7 * * * * ?"  → 下次触发 10:14
  
  → 延迟约 6 分钟,期间不会执行

关键细节:
  - 正在执行中的任务不会被中断,新 Cron 只影响下一次触发
  - 如果分片总数也变了,会同时标记 leader/sharding/necessary

6.5.5 后台手动触发定时任务,是怎么实现的?

写一个触发标记到 ZK,每个实例 Watcher 感知后立即调用 triggerJob()

完整链路:

  [管理后台] 点击 "立即执行"
      │
      ▼
  [ZooKeeper] 写入临时节点: /namespace/myJob/trigger
      │
      ├──── Watcher 触发 ────┐
      │                      │
      ▼                      ▼
  [节点 A]              [节点 B]              [节点 C]
      │                      │                      │
      ▼                      ▼                      ▼
  查 ZK 分片表:          查 ZK 分片表:          查 ZK 分片表:
  我的分片 = [0]         我的分片 = [1]         我的分片 = [2]
      │                      │                      │
      ▼                      ▼                      ▼
  triggerJob()           triggerJob()           triggerJob()
  立即执行 分片 0!       立即执行 分片 1!       立即执行 分片 2!
  (不等 Cron)            (不等 Cron)            (不等 Cron)

跟 Cron 触发的区别只有一个:Cron 是 Quartz 定时器到点自动调用,手动触发是 ZK Watcher 收到信号后手动调用 triggerJob()执行逻辑完全一样(都要查分片表,都只执行自己的分片)。


6.6 缓存机制与 ZK 压力分析 —— Cron 触发会”打爆”ZK 吗?

如果每次 Cron 触发都要去 ZK 读一堆数据,高频任务确实能把 ZK 打爆。Elastic-Job 的设计对此有充分考虑:用多层本地缓存 + Curator 缓存机制,将 ZK 读取降到零,只保留必不可少的写入。

6.6.1 每次 Cron 触发,到底读了什么?

答案:几乎全是本地内存读取,不访问 ZK。

Cron 触发一次,ZK 操作分解:

  ┌────────────────────────────────────────────────────────┐
  │ 操作                  是否访问 ZK        频率/分片       │
  ├────────────────────────────────────────────────────────┤
  │ 读取分片分配表          否 (TreeCache 缓存)   0          │
  │ 读取 config            否 (NodeCache 缓存)    0          │
  │ 检查 necessary 标记    否 (NodeCache 缓存)    0          │
  │ 创建 running 临时节点   是 (必须! 分布式执行锁) 1 次写    │
  │ 删除 running 临时节点   是 (必须! 执行完成)    1 次删    │
  └────────────────────────────────────────────────────────┘

只有 running 节点的创建和删除需要访问 ZK,这是分布式执行锁,不能用任何缓存替代。 其他所有读操作都被 Curator 的 TreeCache / NodeCache 拦截,走本地内存。

6.6.2 缓存体系全景

                     ┌──────────────────────────┐
                     │     ZooKeeper 集群        │
                     └────┬──────┬──────┬───────┘
                          │      │      │
              ┌───────────┼──────┼──────┼───────────┐
              │           │      │      │           │
              ▼           ▼      ▼      ▼           ▼
       ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
       │ 节点 A    │ │ 节点 B    │ │ 节点 C    │ │ 节点 D    │
       ├──────────┤ ├──────────┤ ├──────────┤ ├──────────┤
       │ TreeCache│ │ TreeCache│ │ TreeCache│ │ TreeCache│
       │ /sharding│ │ /sharding│ │ /sharding│ │ /sharding│
       │ 全量缓存  │ │ 全量缓存  │ │ 全量缓存  │ │ 全量缓存  │
       ├──────────┤ ├──────────┤ ├──────────┤ ├──────────┤
       │NodeCache │ │NodeCache │ │NodeCache │ │NodeCache │
       │ /config  │ │ /config  │ │ /config  │ │ /config  │
       ├──────────┤ ├──────────┤ ├──────────┤ ├──────────┤
       │本地变量   │ │本地变量   │ │本地变量   │ │本地变量   │
       │ myItems  │ │ myItems  │ │ myItems  │ │ myItems  │
       │ [0,1]    │ │ [2,3]    │ │ [4,5]    │ │ [6,7]    │
       └──────────┘ └──────────┘ └──────────┘ └──────────┘
       
数据流:
  读分片分配   → TreeCache (内存) → 不访问 ZK
  读配置       → NodeCache (内存) → 不访问 ZK
  创建 running → 必须访问 ZK (执行锁,不能缓存)
  
数据变更时:
  ZK /config 变化  → Watcher 推送 → NodeCache 更新 → 本地重新调度 Quartz
  ZK /sharding 变化 → Watcher 推送 → TreeCache 更新 → myItems 重新计算

6.6.3 TreeCache 原理 —— 关键优化

// 节点启动时,TreeCache 把 /sharding/ 子树全量加载到本地内存
TreeCache treeCache = new TreeCache(client, "/namespace/myJob/sharding");
treeCache.start();

// Watcher 持续监听 ZK 变更,自动增量更新本地缓存
// 当 Cron 触发时:
// getLocalShardingItems() → 直接读 TreeCache → 纯内存操作,零网络调用
List<Integer> myItems = cachedShardingItems.getInstanceItems(instanceId);

这就是 Curator TreeCache 的价值:启动时一次全量拉取 + Watcher 持续增量同步。之后的所有读取都是内存操作。它解决了前文 ZK Watcher 分析中”每次触发需要重新注册 Watcher”的问题——Curator 在底层自动处理了 Watcher 的重新注册。

6.6.4 running 节点会给 ZK 造成压力吗?

算一笔账:

假设场景:10 个作业,每个 3 个分片,Cron = 每 5 秒

每秒 ZK 写入量:
  每次触发:3 个 running 创建 + 3 个 running 删除 = 6 次写
  每分钟:6 × (60/5) = 72 次写
  每秒:72 / 60 = 1.2 次写

ZK 的写能力:
  单节点 ZK:~10,000-20,000 TPS(顺序写入 ZAB 日志)
  3 节点 ZK 集群:~5,000-10,000 TPS(受限于 ZAB 协议广播到 Follower)

结论:1.2 TPS vs 5,000+ TPS → 不到 0.03%,完全可以忽略

极端场景推演

场景:100 个作业,每个 10 个分片,Cron = 每秒 1 次(这本身就不合理)

每秒 running 写 = 100 × 10 × 2 = 2,000 次/秒
2,000 / 5,000 = 40% 的 ZK 写容量 → 开始有压力

但现实是:
  - 不会有 100 个作业都每秒执行(Cron 精度通常在分钟级)
  - running 节点分散在不同路径 /{namespace}/{jobName}/sharding/{item}/running
    → 不是热点写入,分散在不同 ZNode 上

6.6.5 真正的 ZK 压力来自哪里?

不是 Cron 触发,而是实例频繁上下线和 Session 超时重建。

┌─────────────────────────────────────────────────────────┐
│ 高压力操作              压力来源              真实风险    │
├─────────────────────────────────────────────────────────┤
│ Cron 触发 running 写    每次任务执行时         低         │
│ Watcher 注册/触发       实例上下线/配置变更    低-中      │
│ 选举 (LeaderLatch)      实例上下线时           低         │
│ 分片写入                实例上下线/配置变更    低         │
├─────────────────────────────────────────────────────────┤
│ ⚠️ 大量实例频繁上下线   每次上下线触发全流程    中         │
│ ⚠️ 大量作业同时启动     瞬时大量节点写入        中-高      │
│ ⚠️ ZK Session 超时      所有临时节点全删+重建  高         │
└─────────────────────────────────────────────────────────┘

最需要警惕的场景:ZK Session 过期后所有实例同时重连——几百个临时节点瞬间全部重建 + 所有 Watcher 同时触发 + 所有作业同时选举 + 所有作业同时分片。这是前文分析 ZK Session 机制时提到的经典高负载场景。

设置建议:

sessionTimeoutMilliseconds: 60000-120000
  - 太短:轻微网络抖动就 Session 超时 → 大规模重建 → ZK 压力瞬间飙升
  - 太长:节点真实宕机后要等太久 → 失效转移延迟大

经验值:60s 适合大多数场景,网络不稳定可调到 120s

七、失效转移 —— 节点宕机后谁接手

7.1 触发条件

失效转移不是默认开启的,需要同时满足:

failover = true           ← 作业级别配置
monitorExecution = true   ← 作业级别配置(默认 true)
+ 某个节点在执行任务期间宕机
+ 该节点的 running 临时节点因 Session 过期被 ZK 删除

为什么需要 monitorExecution=true 因为它决定了是否创建 running 临时节点来监控执行状态。没有 running 节点,就无法区分”节点在执行中宕机”和”节点根本没执行”。

7.2 完整六阶段流程

时刻 T0: 节点 A(分片0) + 节点 B(分片1) 正常执行中
         /sharding/0/running → "A@8372" (临时节点)
         /sharding/1/running → "B@9231" (临时节点)

时刻 T1: 节点 A 宕机,进程崩溃
         └→ ZK Session 超时 → /sharding/0/running 自动删除
         
阶段 1 - 故障检测:
         主节点(节点B) Watcher 感知到 /sharding/0/running 被删除
         └→ 检查 /instances 中节点 A 的临时节点也已消失
         └→ 确认节点 A 宕机(不是任务正常完成)
         
阶段 2 - 标记待转移:
         主节点写入 /leader/failover/items/0 (持久节点)
         └→ 含义:"分片 0 需要被接管"

阶段 3 - 选举执行者:
         所有空闲节点竞争 /leader/failover/latch 分布式锁
         └→ 节点 C 抢到锁(节点 B 正在执行分片 1,不参与竞争)

阶段 4 - 转移执行:
         节点 C:
         ① 创建 /sharding/0/failover 标记节点
         ② 创建 temporary /sharding/0/running → "C@5721"
         ③ 删除 /leader/failover/items/0
         ④ 立即触发分片 0 的任务执行(不等待 Cron)

阶段 5 - 清理:
         执行完成后删除 /sharding/0/running、/sharding/0/failover

阶段 6 - 分片恢复:
         下次 Cron 触发前,主节点检测到实例变更
         └→ 重新分片,分片 0 正式分配给节点 C

7.3 失效转移 vs 重新分片 —— 本质区别

失效转移 (Failover)                  重新分片 (Resharding)
─────────────────────              ─────────────────────
触发时机:任务执行中,节点宕机        触发时机:下次任务触发前
响应速度:秒级(立即接管)            响应速度:分钟级(等下次 Cron)
作用范围:仅故障分片                  作用范围:全部分片重新分配
实现方式:分布式锁竞争                 实现方式:主节点统一计算
配置要求:failover=true            自动触发(无需额外配置)
             + monitorExecution=true

失效转移是”急救”,重新分片是”康复”。两者互补:失效转移保证当前任务不中断,重新分片保证长期运行均衡。

7.4 关键源码

// FailoverService.failoverIfNecessary() —— 每个节点定时检查
public void failoverIfNecessary() {
    if (!needFailover()) {
        return;
    }
    // 竞争失效转移执行权
    jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
}

// FailoverLeaderExecutionCallback.execute() —— 获取锁后执行
public void execute() {
    // 获取一个待转移的分片
    List<String> items = jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT);
    if (items.isEmpty()) return;
    int crashedItem = Integer.parseInt(items.get(0));
    
    // 标记当前实例为执行者
    jobNodeStorage.fillEphemeralJobNode(
        FailoverNode.getExecutionFailoverNode(crashedItem), 
        JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    
    // 移除失效转移标记
    jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
    
    // 立即触发执行(不等 Cron)
    jobScheduleController.triggerJob();
}

7.5 双节点级联故障 —— 已知的边界问题

场景:仅 2 个节点,节点 B 宕机后又恢复,恢复期间节点 A 也宕机

问题:
  1. 节点 B 异常终止时,/sharding/1/running 残留(未正常清理)
  2. 节点 A 等待 B 的 running 节点被删除…但它永远不会自动删除
     因为 B 恢复后会创建新的 Session,旧的 running 节点已"无主"
  3. LegacyCrashedRunningItemListener 仅在单实例场景做清理

影响:分片 1 的 running 节点一直存在 → 调度永久卡住

应急修复:
  zkCli.sh -server localhost:2181
  delete /namespace/jobName/sharding/1/running

这个问题的本质是:临时节点的 Session 已经过期,但 ZK 客户端重连时没有清理”上一个 Session 创建的临时节点”。这也是我们前文分析 ZK Session 机制时提到的经典场景。


八、自诊断修复 —— 最终一致性的兜底

8.1 为什么需要自诊断?

失效转移是事件驱动的(Watcher 触发),但分布式环境下有些场景 Watcher 可能漏掉:

场景 1: 网络抖动
  节点 A 短暂断连 → ZK 临时节点删除 → 分片重新分配给 B
  节点 A 恢复 → 但分片分配已变,A 不知道自己"丢"了分片
  (Watcher 在断连期间丢失,前文 ZK 系列详细分析过)

场景 2: ZK 数据残留
  节点直接 kill -9,running 节点没来得及清理
  Session 过期后 ZK 删除临时节点,但 sharding/{item}/instance 是持久节点
  导致分片分配给了"已不在线"的实例

场景 3: 时间窗口竞态
  分片过程中发生网络分区
  分片写入与实例上下线穿插发生

失效转移处理单点故障的即时接管,自诊断处理跨周期的状态不一致。两者互补。

8.2 ReconcileService —— 定时巡检

// ReconcileService 继承 Guava AbstractScheduledService
@Override
protected Scheduler scheduler() {
    // 每分钟检查一次
    return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MINUTES);
}

@Override
protected void runOneIteration() throws Exception {
    LiteJobConfiguration config = configService.load(true);
    int reconcileIntervalMinutes = null == config ? -1 : config.getReconcileIntervalMinutes();
    
    // 到达配置的修复间隔才执行(不是每分钟都修)
    if (reconcileIntervalMinutes > 0 
        && (System.currentTimeMillis() - lastReconcileTime 
            >= reconcileIntervalMinutes * 60 * 1000)) {
        
        lastReconcileTime = System.currentTimeMillis();
        
        // 三个条件必须同时满足
        if (leaderService.isLeaderUntilBlock()           // ① 当前是主节点
            && !shardingService.isNeedSharding()          // ② 没有正在进行的分片
            && shardingService.hasShardingInfoInOfflineServers()) { // ③ 存在不一致
            log.warn("Elastic Job: job status node has inconsistent value, "
                + "start reconciling...");
            shardingService.setReshardingFlag();
        }
    }
}

8.3 不一致检测的核心逻辑

// ShardingService.hasShardingInfoInOfflineServers()
public boolean hasShardingInfoInOfflineServers() {
    // 从 ZK 获取在线实例列表(临时节点)
    List<String> onlineInstances = 
        jobNodeStorage.getJobNodeChildrenKeys(InstanceNode.ROOT);
    
    // 遍历所有分片,检查分片分配的实例是否在线
    int shardingTotalCount = configService.load(true)
        .getTypeConfig().getCoreConfig().getShardingTotalCount();
    
    for (int i = 0; i < shardingTotalCount; i++) {
        String assignedInstance = 
            jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i));
        if (!onlineInstances.contains(assignedInstance)) {
            return true;  // 发现"幽灵分片":分配给已下线实例
        }
    }
    return false;
}
关键对比:
  /instances/                   ← 临时节点,反映"当前谁在线"
  /sharding/{item}/instance     ← 持久节点,记录"分片分配给谁"
  
  不一致 = 持久分片记录指向了不在临时实例列表中的实例
       → 说明该实例已下线,但分片未被重新分配
       → 设置 necessary 标记 → 触发重新分片

8.4 自诊断 vs 失效转移 vs 事件驱动—— 三层保障

              ┌─────────────────────────────┐
              │    事件驱动 (Watcher)         │  ← 第一层:实时感知
              │    实例变化 → 立即标记重分片    │
              ├─────────────────────────────┤
              │    失效转移 (Failover)        │  ← 第二层:即时接管
              │    运行中宕机 → 秒级接管        │
              ├─────────────────────────────┤
              │    自诊断 (Reconciliation)    │  ← 第三层:周期兜底
              │    定时巡检 → 修复遗漏         │
              └─────────────────────────────┘

九、作业类型生态

Elastic-Job 3.x 基于 ShardingSphere 可插拔架构,将作业解耦为作业接口执行器接口,内置四种作业类型并支持 SPI 扩展。

9.1 Simple 作业 —— 最简单模式

public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        // shardingContext 提供:
        //   getShardingItem()       → 当前分片号
        //   getShardingTotalCount() → 分片总数
        //   getShardingParameter()  → 分片参数(如 "Beijing")
        
        int shardNo = shardingContext.getShardingItem();
        String city = shardingContext.getShardingParameter();
        log.info("分片 {} 处理城市 {}", shardNo, city);
        // 执行实际业务逻辑
    }
}

适用:普通定时任务,不需要流式数据处理的场景。

9.2 Dataflow 作业 —— 流式处理

Dataflow 作业是处理持续流入数据的最佳选择:

public class MyDataflowJob implements DataflowJob<Order> {
    
    @Override
    public List<Order> fetchData(ShardingContext context) {
        // 拉取待处理数据
        // context 告诉你是哪个分片,你据此决定拉取哪部分数据
        int shardNo = context.getShardingItem();
        int totalShard = context.getShardingTotalCount();
        return orderService.fetchPending(shardNo, totalShard, 100);
    }
    
    @Override
    public void processData(ShardingContext context, List<Order> data) {
        // 处理拉取到的数据
        for (Order order : data) {
            orderService.process(order);
        }
    }
}

核心配置 streamingProcess

streamingProcess = true:
  Cron 触发
    → fetchData() → 返回数据?
    ├→ 有数据 → processData() → fetchData() → ...循环
    └→ 无数据 → 本次触发结束,等下次 Cron
  
streamingProcess = false (默认):
  Cron 触发
    → fetchData() → processData() → 结束

适用场景:订单超时关闭、消息推送、数据同步等需要”持续消费”的场景。streamingProcess=true 等价于”只要还有数据就干活,干完为止”。

9.3 Script 作业 —— 零 Java 代码接入

elasticjob:
  jobs:
    cleanLogJob:
      job-type: SCRIPT
      cron: "0 0 2 * * ?"
      sharding-total-count: 1
      props:
        script.command.line: "/opt/scripts/clean_logs.sh"

支持 Shell、Python、Perl 等,脚本中通过环境变量获取分片上下文:

#!/bin/bash
echo "分片号: ${SHARDING_ITEM}, 总分片: ${SHARDING_TOTAL_COUNT}"
# 清理对应分片的日志

9.4 HTTP 作业(3.x 新增)

无需编写 Java 代码,通过配置调用 HTTP 接口即可:

elasticjob:
  jobs:
    httpJob:
      job-type: HTTP
      cron: "0 */10 * * * ?"
      sharding-total-count: 1
      props:
        http.url: "http://internal-api/task/execute"
        http.method: "POST"
        http.data: '{"type":"report","ts":"${EXECUTION_TIME}"}'

9.5 SPI 扩展 —— 自定义作业类型

实现步骤:
  1. 实现 Job 接口(或 ElasticJob 接口)
  2. 实现 JobExecutor 接口
  3. 在 META-INF/services/ 下添加 SPI 声明
  4. 配置中指定 job-type 名称即可

十、连接状态与容灾

10.1 ZK 连接状态机

                  ┌──────────────┐
          ┌──────►│  CONNECTED   │◄─────────┐
          │       │  (正常状态)    │          │
          │       └──────┬───────┘          │
          │              │ 网络闪断           │
          │              ▼                  │
          │       ┌──────────────┐          │
          │       │  SUSPENDED   │          │
          │       │  (连接挂起)   │          │
          │       └──────┬───────┘          │
          │              │                  │
          │     ┌────────┴────────┐         │
          │     ▼                 ▼         │
          │  Session 未过期    Session 过期   │
          │     │                 │         │
          │     ▼                 ▼         │
          │  ┌──────────┐  ┌──────────┐    │
          │  │RECONNECTED│  │   LOST   │    │
          │  │  (重连成功) │  │(彻底断开) │────┘
          │  └──────────┘  └──────────┘   (重新注册)
          │       │
          └───────┘

10.2 各状态下的作业行为

连接状态 作业行为 原因
CONNECTED 正常运行 ZK 通信正常,分片信息准确
SUSPENDED 暂停调度 可能发生脑裂,暂停执行避免重复
LOST 暂停调度 + 清理本地状态 彻底断连,清空运行信息
RECONNECTED 重新注册 + 恢复调度 重新写入实例节点、分片状态,恢复执行
// AbstractScheduleTracker —— 连接状态处理
public void stateChanged(CuratorFramework client, ConnectionState newState) {
    switch (newState) {
        case SUSPENDED:
        case LOST:
            // 暂停作业调度
            jobScheduleController.pauseJob();
            break;
        case RECONNECTED:
            // 重新持久化上线信息
            serverService.persistOnline(true);
            // 清理上次的 running 标记
            executionService.clearRunningInfo(shardingItems);
            // 恢复作业调度
            jobScheduleController.resumeJob();
            break;
    }
}

结合前文 ZK Session 分析:SUSPENDED 期间是”黄金重连窗口”,只要在 sessionTimeout 内重连成功,临时节点不会消失,集群不会感知到实例变化。


十一、与 XXL-JOB 的对比

11.1 架构对比

Elastic-Job (去中心化)              XXL-JOB (中心化)
─────────────────────              ─────────────────
                                   
  [节点A](Quartz+执行)              [调度中心] (单点/集群)
  [节点B](Quartz+执行)               │    │    │
  [节点C](Quartz+执行)               ▼    ▼    ▼
       │    │    │               [执行器A][执行器B][执行器C]
       └────┼────┘
            ▼
      [ZooKeeper 集群]
维度 Elastic-Job Lite XXL-JOB
架构 去中心化,节点对等 中心化,调度中心 + 执行器
单点故障 无(ZK 集群保证高可用) 调度中心故障则全部停摆(可集群化但仍有状态同步问题)
注册中心 ZooKeeper 数据库(自研)
分片策略 平均、轮询、奇偶、自定义(4 种内置) 分片广播、固定分片
失效转移 ✅ 自动接管(秒级) ✅ 故障转移(依赖调度中心检测)
作业类型 Simple/Dataflow/Script/HTTP + SPI 内建 10+ 种(GLUE、Shell、Python、Node.js 等)
管理界面 ❌ 无内置(需事件追踪系统) ✅ 完善 Web 控制台
依赖 ZK(必须) 数据库(必须)
运维复杂度 低(只需 ZK) 中(数据库 + 调度中心部署)
社区 Apache ShardingSphere 子项目 大众点评开源,社区活跃

11.2 选型建议

选 Elastic-Job,如果:
  ✅ 团队已有 ZK 集群(不想引入新依赖)
  ✅ 不需要管理界面(或已有自己的监控平台)
  ✅ 看重去中心化的高可用设计
  ✅ 需要灵活的分片策略(平均分配)
  ✅ 系统对延迟敏感(去中心化调度延迟更低)

选 XXL-JOB,如果:
  ✅ 团队没有 ZK,但有数据库
  ✅ 需要管理界面(任务管理、日志查看、手动触发)
  ✅ 需要 GLUE 模式(在线编辑代码)
  ✅ 需要子任务依赖编排
  ✅ 中小团队,运维要简单

十二、实战最佳实践

12.1 分片数与实例数的关系

原则:分片数 > 实例数(预留扩容空间)

  反例:3 个实例,分片数 = 3
    扩容到 4 个实例时,无法继续拆分 → 需要修改配置 → 可能影响运行中任务
  
  正例:3 个实例,分片数 = 6
    扩容到 4 个实例:每个实例 1~2 个分片 → 自动重新分配
    扩容到 6 个实例:每个实例恰好 1 个分片 → 最大化并行度
    缩容到 2 个实例:每个实例 3 个分片 → 自动合并

建议:分片数 = 期望最大实例数 × 2

12.2 failover 与 monitorExecution 的配置策略

长任务(执行时间 > 30s):
  failover: true   ← 开启,避免长任务中断
  monitorExecution: true  ← 必须开启

短任务(执行时间 < 5s):
  failover: false  ← 关闭,等下次调度更简单
  monitorExecution: true  ← 仍建议开启,用于自诊断

高频任务(Cron < 10s):
  failover: false  ← 频繁转移的开销 > 收益
  misfire: false   ← 错过一次马上就有下次,不补偿

12.3 reconcileIntervalMinutes 设置

默认:-1(不自动修复,仅靠事件驱动)
建议:10 分钟(生产环境默认足够)
若网络不稳定:5 分钟(更快发现不一致)
若 Job 数量巨大(> 100):15~30 分钟(减少 ZK 读取压力)

12.4 ZK 连接参数调优

new ZookeeperConfiguration("zk1:2181,zk2:2181,zk3:2181", "elasticjob-namespace")
    .setSessionTimeoutMilliseconds(60000)    // 默认 60s,网络不稳可调大
    .setConnectionTimeoutMilliseconds(15000) // 默认 15s
    .setMaxRetries(3)                        // 重试次数
    .setBaseSleepTimeMilliseconds(1000)      // 重试间隔基数
    .setDigest("user:password");             // ZK ACL 认证

// Session 超时建议:60s ~ 120s
// 太短:轻微网络抖动就触发重新分片 → 集群不稳定
// 太长:节点真实宕机后要等太久才发现 → 失效转移延迟大

12.5 监控要点

关键 ZK 节点路径监控:
  /namespace/jobName/leader/election/instance  ← 主节点是否正常
  /namespace/jobName/instances/                ← 在线实例数是否符合预期?
  /namespace/jobName/leader/sharding/necessary ← 长期存在 = 分片卡住
  /namespace/jobName/leader/failover/items/    ← 长期存在 = 失效转移卡住

应用层监控:
  - 每个分片的执行耗时 (process 方法耗时分布)
  - 每次调度的触发延迟 (Cron 触发时间 vs 实际开始执行时间)
  - 分片分配变化次数 (频繁变化 = 集群不稳定)
  - ZK 连接状态变化次数 (频繁变化 = 网络问题)

十三、总结

核心设计理念回顾

Elastic-Job 的核心设计决策:

1. 去中心化 + ZK 协调
   "没有调度中心,就没有调度中心的单点故障"
   ZK 只做状态存储和变更通知,不做调度

2. 分片 = 执行权的分布式
   "分片不是数据分片,是谁执行哪部分的分工"
   通过分片将任务拆解为 N 个独立执行单元
   分片数 ≠ 实例数,两者设计上充分解耦

3. 本地 Quartz + ZK 守卫
   "所有节点同步触发,ZK 分片信息决定谁执行"
   不依赖中心触发,靠 ZK 状态做执行守卫
   Cron 是绝对壁钟时间,不是相对启动时间

4. 多层缓存,ZooKeeper 无压力
   TreeCache 缓存 /sharding 子树的全部数据
   NodeCache 缓存 /config 节点
   Cron 触发时只有 running 节点的一次创建和一次删除访问 ZK
   其余操作全部走本地内存

5. 三层一致性保障
   事件驱动(Watcher) → 失效转移(Failover) → 自诊断(Reconciliation)
   实时 → 秒级 → 分钟级,层层兜底

适用场景总结

┌─────────────────────────────────────────────┐
│ Elastic-Job 最适合:                          │
│ • 数据分片处理(按城市/用户ID区间/时间分段)    │
│ • 已有 ZK 集群的团队(零额外依赖)             │
│ • 对高可用要求高的核心业务(去中心化无单点)     │
│ • 需要弹性扩容的定时任务(实例增减自动重分片)   │
├─────────────────────────────────────────────┤
│ 不太适合:                                   │
│ • 需要 Web 管理界面(没有开箱即用的控制台)     │
│ • 小团队快速上手(需要先理解 ZK)             │
│ • 需要任务编排/依赖(不支持 DAG 工作流)       │
└─────────────────────────────────────────────┘

Elastic-Job 的设计哲学是”简单即鲁棒“:没有调度中心就没有中心化故障,依赖 ZK 的三大特性(临时节点、Watcher、分布式锁)就解决了分布式协调。它对开发者的要求是理解分片概念和 ZK 节点树——一旦掌握,你就能搭建一个高可用的分布式定时任务系统。

本文阅读量