Elastic-Job-Lite使用介绍 概述 Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部依赖仅Zookeeper。
功能列表:
基本概念 分片概念 任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项
具体示例:
例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。
分片项与业务处理解耦 Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
个性化参数的适用场景 个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。
具体示例:
例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。
核心理念 分布式调度 Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。
作业高可用 Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。
一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。
最大限度利用资源 Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。
例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。
快速入门 引入maven依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 <dependency > <groupId > io.elasticjob</groupId > <artifactId > elastic-job-lite-core</artifactId > <version > ${latest.release.version}</version > </dependency > <dependency > <groupId > io.elasticjob</groupId > <artifactId > elastic-job-lite-spring</artifactId > <version > ${latest.release.version}</version > </dependency >
SimpleJob作业开发 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class MyElasticJob implements SimpleJob { @Override public void execute (ShardingContext context) { switch (context.getShardingItem()) { case 0 : break ; case 1 : break ; case 2 : break ; } } }
SimpleJob作业配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg ="http://www.dangdang.com/schema/ddframe/reg" xmlns:job ="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd " > <reg:zookeeper id ="regCenter" server-lists ="yourhost:2181" namespace ="dd-job" base-sleep-time-milliseconds ="1000" max-sleep-time-milliseconds ="3000" max-retries ="3" /> <job:simple id ="oneOffElasticJob" class ="xxx.MyElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" /> </beans >
开发指南 环境要求
名称
版本
Java
请使用JDK1.7及其以上版本
Zookeeper
请使用Zookeeper 3.4.6及其以上版本
Maven
请使用Maven 3.0.4及其以上版本
作业开发 Elastic-Job提供Simple、Dataflow和Script 3种作业类型。
方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。
Simple类型作业:
意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class MyElasticJob implements SimpleJob { @Override public void execute (ShardingContext context) { switch (context.getShardingItem()) { case 0 : break ; case 1 : break ; case 2 : break ; } } }
Dataflow类型作业:
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MyElasticJob implements DataflowJob <Foo > { @Override public List<Foo> fetchData (ShardingContext context) { switch (context.getShardingItem()) { case 0 : List<Foo> data = return data; case 1 : List<Foo> data = return data; case 2 : List<Foo> data = return data; } } @Override public void processData (ShardingContext shardingContext, List<Foo> data) { } }
可通过DataflowJobConfiguration配置是否流式处理。
流式处理数据只有fetchData方法的返回值为null或集合长度为空时,作业才停止抓取,否则作业将一直运行下去; 非流式处理数据则只会在每次作业执行过程中执行一次fetchData方法和processData方法,随即完成本次作业。
如果采用流式作业处理方式,建议processData处理数据后更新其状态,避免fetchData再次抓取到,从而使得作业永不停止。 流式数据处理参照TbSchedule设计,适用于不间歇的数据处理。
Script类型作业:
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
1 2 # !/bin/bash echo sharding execution context is $*
作业运行时输出
sharding execution context is
1 {"jobName" :"scriptElasticDemoJob" ,"shardingTotalCount" :10 ,"jobParameter" :"" ,"shardingItem" :0 ,"shardingParameter" :"A" }
作业配置 Elastic-Job配置分为3个层级,分别是Core, Type和Root。每个层级使用相似于装饰者模式的方式装配。
Core对应JobCoreConfiguration,用于提供作业核心配置信息,如:作业名称、分片总数、CRON表达式等。
Type对应JobTypeConfiguration,有3个子类分别对应SIMPLE, DATAFLOW和SCRIPT类型作业,提供3种作业需要的不同配置,如:DATAFLOW类型是否流式处理或SCRIPT类型的命令行等。
Root对应JobRootConfiguration,有2个子类分别对应Lite和Cloud部署类型,提供不同部署类型所需的配置,如:Lite类型的是否需要覆盖本地配置或Cloud占用CPU或Memory数量等。
使用Java代码配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob" , "0/15 * * * * ?" , 10 ).build(); SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class .getCanonicalName ()) ; JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build(); JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder("demoDataflowJob" , "0/30 * * * * ?" , 10 ).build(); DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, DataflowDemoJob.class .getCanonicalName (), true ) ; JobRootConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).build(); JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder("demoScriptJob" , "0/45 * * * * ?" , 10 ).build(); ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "test.sh" ); JobRootConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptCoreConfig).build();
Spring命名空间配置:
与Spring容器配合使用作业,可将作业Bean配置为Spring Bean,并在作业中通过依赖注入使用Spring容器管理的数据源等对象。可用placeholder占位符从属性文件中取值。Lite可考虑使用Spring命名空间方式简化配置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg ="http://www.dangdang.com/schema/ddframe/reg" xmlns:job ="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation ="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd " > <reg:zookeeper id ="regCenter" server-lists ="yourhost:2181" namespace ="dd-job" base-sleep-time-milliseconds ="1000" max-sleep-time-milliseconds ="3000" max-retries ="3" /> <job:simple id ="simpleElasticJob" class ="xxx.MySimpleElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" /> <bean id ="yourRefJobBeanId" class ="xxx.MySimpleRefElasticJob" > <property name ="fooService" ref ="xxx.FooService" /> </bean > <job:simple id ="simpleRefElasticJob" job-ref ="yourRefJobBeanId" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" /> <job:dataflow id ="throughputDataflow" class ="xxx.MyThroughputDataflowElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" /> <job:script id ="scriptElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" script-command-line ="/your/file/path/demo.sh" /> <job:simple id ="listenerElasticJob" class ="xxx.MySimpleListenerElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" > <job:listener class ="xx.MySimpleJobListener" /> <job:distributed-listener class ="xx.MyOnceSimpleJobListener" started-timeout-milliseconds ="1000" completed-timeout-milliseconds ="2000" /> </job:simple > <job:simple id ="eventTraceElasticJob" class ="xxx.MySimpleListenerElasticJob" registry-center-ref ="regCenter" cron ="0/10 * * * * ?" sharding-total-count ="3" sharding-item-parameters ="0=A,1=B,2=C" event-trace-rdb-data-source ="yourDataSource" > </job:simple > </beans >
作业启动 Java启动方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class JobDemo { public static void main (String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } private static CoordinatorRegistryCenter createRegistryCenter () { CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181" , "elastic-job-demo" )); regCenter.init(); return regCenter; } private static LiteJobConfiguration createJobConfiguration () { ... } }
Spring启动方式:
将配置Spring命名空间的xml通过Spring启动,作业将自动加载。
部署指南 应用部署
启动Elastic-Job-Lite指定注册中心的Zookeeper
运行包含Elastic-Job-Lite和业务代码的jar文件。不限与jar或war的启动方式。
运维平台和RESTFul API部署(可选)
解压缩elastic-job-lite-console-${version}.tar.gz并执行bin\start.sh
。
打开浏览器访问http://localhost:8899/ 即可访问控制台。8899为默认端口号,可通过启动脚本输入-p自定义端口号
访问RESTFul API方法同控制台。
elastic-job-lite-console-${version}.tar.gz可通过mvn install编译获取。
FAQ
为什么在代码或Spring配置文件中修改了作业配置,注册中心配置却没有更新?
Elastic-Job-Lite采用无中心化设计,若每个客户端的配置不一致,不做控制的话,最后一个启动的客户端配置将会成为注册中心的最终配置。
Elastic-Job-Lite提出了overwrite概念,可通过JobConfiguration或Spring命名空间配置。overwrite=true即允许客户端配置覆盖注册中心,反之则不允许。如果注册中心无相关作业的配置,则无论overwrite是否配置,客户端配置都将写入注册中心。
怀疑Elastic-Job-Lite在分布式环境中有问题,但无法重现又不能在线上环境调试,应该怎么做?
分布式问题非常难于调试和重现,为此Elastic-Job-Lite提供了dump命令。
如果您怀疑某些场景出现问题,可参照dump文档将作业运行时信息发给我们、提交issue或发至QQ群讨论。我们已将IP等敏感信息过滤,dump出来的信息可在公网安全传输。
Elastic-Job有何使用限制?
作业启动成功后修改作业名称视为新作业,原作业废弃。
同一台作业服务器可以运行多个相同的作业实例,但每个作业实例必须使用不同的JobInstanceId,因为作业运行时是按照IP和JobInstanceId注册和管理的。JobInstanceId可在作业配置中设置。
一旦有服务器波动,或者修改分片项,将会触发重新分片;触发重新分片将会导致运行中的流式处理的作业在执行完本次作业后不再继续执行,等待分片结束后再恢复正常。
开启monitorExecution才能实现分布式作业幂等性(即不会在多个作业服务器运行同一个分片)的功能,但monitorExecution对短时间内执行的作业(如每5秒一触发)性能影响较大,建议关闭并自行实现幂等性。
是否支持动态添加作业?
动态添加作业这个概念每个人理解不尽相同。
elastic-job-lite为jar包,由开发或运维人员负责启动。启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息。 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动。elastic-job-lite并不会包含ssh免密管理等功能。
综上所述,elastic-job已做了基本动态添加功能,但无法做到真正意义的完全自动化添加。
使用Spring版本有何限制?
Elastic-Job的Spring版本支持从3.1.0.RELEASE至4的任何版本。Spring 5由于仅支持JDK 8及其以上版本,因此目前并不支持。Spring 3.1.0之前的版本对占位符的使用与目前不同,因此不再支持。Elastic-Job并未包含Spring的maven依赖,请自行添加您需要的版本。
Zookeeper版本不是3.4.6会有什么问题?
根据测试,使用3.3.6版本的Zookeeper在使用Curator 2.10.0的CuratorTransactionFinal的commit时会导致死锁。
界面Console无法正常显示?
使用Web Console时应确保与Elastic-Job相关jar包版本保持一致,否则会导致不可用。
作业与注册中心无法通信会如何?
为了保证作业的在分布式场景下的一致性,一旦作业与注册中心无法通信,运行中的作业会立刻停止执行,但作业的进程不会退出,这样做的目的是为了防止作业重分片时,将与注册中心失去联系的节点执行的分片分配给另外节点,导致同一分片在两个节点中同时执行。 当作业节点恢复与注册中心联系时,将重新参与分片并恢复执行新的分配到的分片。
为什么界面Console中的作业状态是分片待调整 分片待调整表示作业已启动但尚未获得分片时的状态。