180902-Elastic-Job-Lite开发入门

Elastic-Job-Lite开发介绍

概述

Elastic-Job是一个分布式调度解决方案,定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部依赖仅Zookeeper。

基本概念

分片概念

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项

具体示例:

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

个性化参数的适用场景

个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

具体示例:

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

Elastic-Job主要功能

  1. 定时任务
    基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
  2. 作业注册中心
    基于Zookeeper和其客户端Curator实现的全局作业注册控制中心。用于注册,控制和协调分布式作业执行。
  3. 作业分片
    将一个任务分片成为多个小任务项在多服务器上同时执行。
  4. 弹性扩容缩容
    运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行。
  5. 支持多种作业执行模式
  6. 失效转移
    运行中的作业服务器崩溃不会导致重新分片,只会在下次作业启动时分片。启用失效转移功能可以在本次作业执行过程中,监测其他作业服务器空闲,抓取未完成的孤儿分片项执行。
  7. 运行时状态收集
    监控作业运行时状态,统计最近一段时间处理的数据成功和失败数量,记录作业上次运行开始时间,结束时间和下次运行时间。
  8. 作业停止,恢复和禁用
    用于操作作业启停,并可以禁止某作业运行(上线时常用)。
  9. 被错过执行的作业重触发
    自动记录错过执行的作业,并在上次作业完成后自动触发。可参考Quartz的misfire。
  10. 幂等性
    重复作业任务项判定,不重复执行已运行的作业任务项。由于开启幂等性需要监听作业运行状态,对瞬时反复运行的作业对性能有较大影响。
  11. 容错处理
    作业服务器与Zookeeper服务器通信失败则立即停止作业运行,防止作业注册中心将失效的分片分项配给其他作业服务器,而当前作业服务器仍在执行任务,导致重复执行。
  12. 运维平台
    提供运维界面,可以管理作业和注册中心。

核心理念

分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

核心流程图

作业启动:
作业启动

作业执行:
作业执行

实现原理

弹性分布式实现

  1. 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
  2. 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
  3. 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
  4. 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
  5. 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
  6. 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
  7. 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。

数据分片

数据分片的目的在于把一个任务分散到不同的机器上运行,既可以解决单机计算能力上限的问题,也能降低部分任务失败对整体系统的影响。elastic-job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器(其实是Job实例,部署在一台机器上的多个Job实例也能分片),开发者需要自行处理分片项与真实数据的对应关系。框架也预置了一些分片策略:平均分配算法策略,作业名哈希值奇偶数算法策略,轮转分片策略。同时也提供了自定义分片策略的接口。

分片原理:

elastic-job的分片是通过zookeeper来实现的。分片的分片由主节点分配,如下三种情况都会触发主节点上的分片算法执行:

  1. 新的Job实例加入集群
  2. 现有的Job实例下线(如果下线的是leader节点,那么先选举然后触发分片算法的执行)
  3. 主节点选举

上述三种情况,会让zookeeper上leader节点的sharding节点上多出来一个necessary的临时节点,主节点每次执行Job前,都会去看一下这个节点,如果有则执行分片算法。

sharding-necessary

分片的执行结果会存储在zookeeper上,如下图,3个分片,每个分片应该由哪个Job实例来运行都已经分配好。分配的过程就是上面触发分片算法之后的操作。分配完成之后,各个Job实例就会在下次执行的时候使用上这个分配结果。

sharding-instance

每个job实例任务触发前都会获取本任务在本实例上的分片情况(直接和上图zookeeper上instance节点比对某一个分片是否该有这个Job实例执行),然后封装成shardingContext,传递给调用任务的实际执行方法:

1
2
3
4
5
@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName(), "SIMPLE"));
}

分片算法:

所有的分片策略都继承JobShardingStrategy接口。根据当前注册到ZK的实例列表和在客户端配置的分片数量来进行数据分片。最终将每个Job实例应该获得的分片数字返回出去。 方法签名如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 作业分片策略.
*
* @author zhangliang
*/
public interface JobShardingStrategy {

/**
* 作业分片.
*
* @param jobInstances 所有参与分片的单元列表
* @param jobName 作业名称
* @param shardingTotalCount 分片总数
* @return 分片结果
*/
Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}

分片函数的触发,只会在leader选举的时候触发,也就是说只会在刚启动和leader节点离开的时候触发,并且是在leader节点上触发,而其他节点不会触发。

  1. 基于平均分配算法的分片策略
    基于平均分配算法的分片策略对应的类是:AverageAllocationJobShardingStrategy。它是默认的分片策略。它的分片效果如下:

    如果有3个Job实例, 分成9片, 则每个Job实例分到的分片是: 1=[0,1,2], 2=[3,4,5], 3=[6,7,8].
    如果有3个Job实例, 分成8片, 则每个Job实例分到的分片是: 1=[0,1,6], 2=[2,3,7], 3=[4,5].
    如果有3个Job实例, 分成10片, 则个Job实例分到的分片是: 1=[0,1,2,9], 2=[3,4,5], 3=[6,7,8].

  2. 作业名的哈希值奇偶数决定IP升降序算法的分片策略
    这个策略的对应的类是:OdevitySortByNameJobShardingStrategy,它内部其实也是使用AverageAllocationJobShardingStrategy实现,只是在传入的节点实例顺序不一样,也就是上面接口参数的List<JobInstance>。AverageAllocationJobShardingStrategy的缺点是一旦分片数小于Job实例数,作业将永远分配至IP地址靠前的Job实例上,导致IP地址靠后的Job实例空闲。而OdevitySortByNameJobShardingStrategy则可以根据作业名称重新分配Job实例负载。如:

    如果有3个Job实例,分成2片,作业名称的哈希值为奇数,则每个Job实例分到的分片是:1=[0], 2=[1], 3=[]
    如果有3个Job实例,分成2片,作业名称的哈希值为偶数,则每个Job实例分到的分片是:3=[0], 2=[1], 1=[]
    实现比较简单:

    1
    2
    3
    4
    5
    long jobNameHash = jobName.hashCode();
    if (0 == jobNameHash % 2) {
    Collections.reverse(jobInstances);
    }
    return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
  3. 根据作业名的哈希值对Job实例列表进行轮转的分片策略
    这个策略的对应的类是:RotateServerByNameJobShardingStrategy,和上面介绍的策略一样,内部同样是用AverageAllocationJobShardingStrategy实现,也是在传入的List<JobInstance>列表顺序上做文章。

  4. 自定义分片策略
    除了可以使用上述分片策略之外,elastic-job还允许自定义分片策略。我们可以自己实现JobShardingStrategy接口,并且配置到分片方法上去。

注册中心数据结构

注册中心在定义的命名空间下,创建作业名称节点,用于区分不同作业,所以作业一旦创建则不能修改作业名称,如果修改名称将视为新的作业。作业名称节点下又包含4个数据子节点,分别是config, instances, sharding, servers和leader。

sharding-instance

config节点

作业配置信息,以JSON格式存储

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"jobName":"xin.laosan.job.SpringSimpleJob",
"jobClass":"xin.laosan.job.SpringSimpleJob",
"jobType":"SIMPLE",
"cron":"0/5 * * * * ?",
"shardingTotalCount":3,
"shardingItemParameters":"0=Beijing,1=Shanghai,2=Guangzhou",
"jobParameter":"",
"failover":false,
"misfire":true,
"description":"",
"jobProperties":{
"job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",
"executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler"
},
"monitorExecution":true,
"maxTimeDiffSeconds":-1,
"monitorPort":-1,
"jobShardingStrategyClass":"",
"reconcileIntervalMinutes":10,
"disabled":false,
"overwrite":true
}

instances节点

作业运行实例信息,子节点是当前作业运行实例的主键。作业运行实例主键由作业运行服务器的IP地址和PID构成。作业运行实例主键均为临时节点,当作业实例上线时注册,下线时自动清理。注册中心监控这些节点的变化来协调分布式作业的分片以及高可用。 可在作业运行实例节点写入TRIGGER表示该实例立即执行一次。

sharding节点

作业分片信息,子节点是分片项序号,从零开始,至分片总数减一。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。节点详细信息说明:

子节点名 临时节点 描述
instance 执行该分片项的作业运行实例主键
running 分片项正在运行的状态,仅配置monitorExecution时有效
failover 如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器IP
misfire 是否开启错过任务重新执行
disabled 是否禁用此分片项

servers节点

作业服务器信息,子节点是作业服务器的IP地址。可在IP地址节点写入DISABLED表示该服务器禁用。 在新的cloud native架构下,servers节点大幅弱化,仅包含控制服务器是否可以禁用这一功能。为了更加纯粹的实现job核心,servers功能未来可能删除,控制服务器是否禁用的能力应该下放至自动化部署系统。

leader节点

作业服务器主节点信息,分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。

leader节点是内部使用的节点,如果对作业框架原理不感兴趣,可不关注此节点。

子节点名 临时节点 描述
election\instance 主节点服务器IP地址
一旦该节点被删除将会触发重新选举
重新选举的过程中一切主节点相关的操作都将阻塞
election\latch 主节点选举的分布式锁
为curator的分布式锁使用
sharding\necessary 是否需要重新分片的标记
如果分片总数变化,或作业服务器节点上下线或启用/禁用,以及主节点选举,会触发设置重分片标记
作业在下次执行时使用主节点重新分片,且中间不会被打断
作业执行时不会触发分片
sharding\processing 主节点在分片时持有的节点
如果有此节点,所有的作业执行都将阻塞,直至分片结束
主节点分片结束或主节点崩溃会删除此临时节点
failover\items\分片项 一旦有作业崩溃,则会向此节点记录
当有空闲作业服务器时,会从此节点抓取需失效转移的作业项
failover\items\latch 分配失效转移分片项时占用的分布式锁
为curator的分布式锁使用

快速入门

引入maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>

<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${latest.release.version}</version>
</dependency>

SimpleJob作业开发

1
2
3
4
5
6
7
8
public class SpringSimpleJob implements SimpleJob {

@Override
public void execute(ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getName(), "SIMPLE"));
}
}

SimpleJob作业配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Configuration
public class SimpleJobConfig {

@Resource
private ZookeeperRegistryCenter regCenter;

@Resource
private JobEventConfiguration jobEventConfiguration;

@Bean
public SpringSimpleJobFirst simpleJobFirst() {
return new SpringSimpleJobFirst();
}

@Bean(initMethod = "init")
public JobScheduler simpleJobFirstScheduler(final SpringSimpleJobFirst simpleJobFirst, @Value("${simpleJobFirst.cron}") final String cron, @Value("${simpleJobFirst.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJobFirst.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJobFirst, regCenter, getLiteJobConfiguration(simpleJobFirst.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

@Bean
public SpringSimpleJobSecond simpleJobSecond() {
return new SpringSimpleJobSecond();
}

@Bean(initMethod = "init")
public JobScheduler simpleJobSecondScheduler(final SpringSimpleJobSecond simpleJobSecond, @Value("${simpleJobSecond.cron}") final String cron, @Value("${simpleJobSecond.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJobSecond.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJobSecond, regCenter, getLiteJobConfiguration(simpleJobSecond.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new SimpleJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName())).overwrite(true).build();
}
}

如果是新增SimpleJob类型作业,只需要做如下配置即可:

1
2
3
4
5
6
7
8
9
10
@Bean
public SpringSimpleJobSecond simpleJobSecond() {
return new SpringSimpleJobSecond();
}

@Bean(initMethod = "init")
public JobScheduler simpleJobSecondScheduler(final SpringSimpleJobSecond simpleJobSecond, @Value("${simpleJobSecond.cron}") final String cron, @Value("${simpleJobSecond.shardingTotalCount}") final int shardingTotalCount,
@Value("${simpleJobSecond.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(simpleJobSecond, regCenter, getLiteJobConfiguration(simpleJobSecond.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

开发指南

环境要求

名称 版本
Java 请使用JDK1.7及其以上版本
Zookeeper 请使用Zookeeper 3.4.6及其以上版本
Maven 请使用Maven 3.0.4及其以上版本

作业开发

Elastic-Job提供Simple、Dataflow和Script 3种作业类型。

方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。

Simple类型作业:

请参考 SimpleJob作业配置

Dataflow类型作业:

Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Configuration
public class DataflowJobConfig {

@Resource
private ZookeeperRegistryCenter regCenter;

@Resource
private JobEventConfiguration jobEventConfiguration;

@Bean
public DataflowJob dataflowJob() {
return new SpringDataflowJob();
}

@Bean(initMethod = "init")
public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}

private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends DataflowJob> jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters) {
return LiteJobConfiguration.newBuilder(new DataflowJobConfiguration(JobCoreConfiguration.newBuilder(
jobClass.getName(), cron, shardingTotalCount).shardingItemParameters(shardingItemParameters).build(), jobClass.getCanonicalName(), true)).overwrite(true).build();
}
}

如果是新增Dataflow类型作业,只需要做如下配置即可:

1
2
3
4
5
6
7
8
9
10
@Bean
public DataflowJob dataflowJob() {
return new SpringDataflowJob();
}

@Bean(initMethod = "init")
public JobScheduler dataflowJobScheduler(final DataflowJob dataflowJob, @Value("${dataflowJob.cron}") final String cron, @Value("${dataflowJob.shardingTotalCount}") final int shardingTotalCount,
@Value("${dataflowJob.shardingItemParameters}") final String shardingItemParameters) {
return new SpringJobScheduler(dataflowJob, regCenter, getLiteJobConfiguration(dataflowJob.getClass(), cron, shardingTotalCount, shardingItemParameters), jobEventConfiguration);
}
  1. 可通过DataflowJobConfiguration配置是否流式处理。
  2. 流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
  3. 如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。

Script类型作业:

项目汇总暂无开发案例,待需要时添加

部署指南

数据库

  1. 执行如下建表语句即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CREATE TABLE JOB_EXECUTION_LOG (
id varchar(40) NOT NULL,
job_name varchar(100) NOT NULL,
task_id varchar(255) NOT NULL,
hostname varchar(255) NOT NULL,
ip varchar(50) NOT NULL,
sharding_item int(11) NOT NULL,
execution_source varchar(20) NOT NULL,
failure_cause varchar(4000) DEFAULT NULL,
is_success int(11) NOT NULL,
start_time timestamp NULL DEFAULT NULL,
complete_time timestamp NULL DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE JOB_STATUS_TRACE_LOG (
id varchar(40) NOT NULL,
job_name varchar(100) NOT NULL,
original_task_id varchar(255) NOT NULL,
task_id varchar(100) NOT NULL,
slave_id varchar(50) NOT NULL,
source varchar(50) NOT NULL,
execution_type varchar(20) NOT NULL,
sharding_item varchar(100) NOT NULL,
state varchar(20) NOT NULL,
message varchar(4000) DEFAULT NULL,
creation_time timestamp NULL DEFAULT NULL,
PRIMARY KEY (id),
KEY TASK_ID_STATE_INDEX (task_id,state)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

应用部署

  1. 启动Elastic-Job-Lite指定注册中心的Zookeeper
  2. 运行包含Elastic-Job-Lite和业务代码的jar文件。不限与jar或war的启动方式。

运维平台和RESTFul API部署(可选)

  1. 解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh
  2. 打开浏览器访问http://localhost:8899/即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号
  3. 访问RESTFul API方法同控制台。
  4. elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。
Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×