简介
Elastic-Job
- 支持集群,支持分布式(将一个任务拆分成多个独立的任务项,由分布式服务器分别执行某一个或几个分片项),不支持动态添加任务
- 分布式调度解决方案,使用jar包提供协调服务
- 外部依赖ZooKeeper(作为注册中心)进行集群
- zookeeper可视化工具:zooinspector
- 使用分片概念:一个任务拆分为多个独立的任务项,每个服务获得一个或几个分片项(用两个服务器遍历数据库的某张表,每个服务器执行任务的一般,A服务器获取id为奇数的数据,B服务器获取id为偶数的数据)
- 三种任务类型:Simple,Dataflow,Script
- 三种整合方式:javaAPI,spring,springboot
- 高级知识:自定义分片,作业监听器,事件追踪
- 实战应用:订单自动取消,第三方订单抓取
Quartz
-
支持集群,伪分布式,支持动态添加任务
- 基础知识:Job和JobDetail,Trigger触发器,Cron表达式
- 三种整合方式:javaAPI,spring,springboot
- 高级知识:Job监听器,Trigger监听器,Scheduler监听器,Quartz集群
- 实战应用:按小时统计订单信息
Elastic-Job
Simple作业
- 意为定时任务的简单实现,只需要实现execute方法
- 提供了弹性扩容和分片功能
- 操作步骤:
- 实现SimpleJob接口,实现execute方法
- 定义作业核心配置(作业名称,定时策略,分片总数)
- 定义作业类型(Simple,Dataflow,Script,实现类的全包名)
- 定义Lite作业根配置(overwrite属性的重要性)
- 配置注册中心ZooKeeper(可以单机,可以集群)。包括ip,端口,命名空间
- 编写Main函数,启动定时任务
- 新增启动配置,修改端口,启动定时任务第二个实例
- 控制台观察两个实例打印出的分片项
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
- job
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.time.LocalTime;
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+
",总分片项:"+shardingContext.getShardingTotalCount()+",taskId"+
shardingContext.getTaskId());
}
}
- 主函数
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configuration()).init();
}
/**
* zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter(){
ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
"java-simple-job");
ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
//注册中心初始化
crc.init();
return crc;
}
/**
* job配置
* @return
*/
public static LiteJobConfiguration configuration() {
//job核心配置,每5秒执行一次任务,2个分片
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder("mySimpleJob","0/5 * * * * ?",2)
.build();
//job类型配置,传入job类的全包名
SimpleJobConfiguration jtc = new SimpleJobConfiguration(jcc,
MySimpleJob.class.getCanonicalName());
//job根的配置(LiteJobConfiguration)
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
- 先启动一次Main函数类,发现打印结果为:我是分片项0,总分片项2,我是分片项1,总分片项2
- 再启动一个Main函数类,发现第一个控制台输出:我是分片项0,总分片项2。第二个控制台输出:我是分片项1,总分片项2
- 验证了分片处理
Dataflow
- Dataflow类型用于处理流式作业,分为数据抓取(fetchData)和数据处理(processData)
- 应用场景:适用于不间断的数据处理,比如第三方订单的抓取
- 执行流程:
- 定时任务根据规则触发
- 抓取数据
- 处理数据,完成后再次抓取
- 若数据存在,继续处理;若不存在,则本次任务结束
- 等待任务规则,下次触发
- 实现DataflowJob接口,注意
DataflowJob<T>
泛型,泛型规定了抓取数据的返回类型 - 案例:处理100个订单,分片处理,每次处理10个订单
//模拟订单
public class Order {
private Integer orderId;
//0:未处理;1:已处理
private Integer status;
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", status=" + status +
'}';
}
}
- 流式任务
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.model.Order;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class MyDataflowJob implements DataflowJob<Order> {
private List<Order> orders = new ArrayList<>();
//初始化100个订单
{
for (int i=0;i<100;i++){
Order order = new Order();
order.setOrderId(i+1);
//未处理
order.setStatus(0);
orders.add(order);
}
}
//抓取数据
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
//订单号 % 分片总数 == 当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(toList());
List<Order> subList = null;
if (orderList!=null && orderList.size()>0){
//如果list不为空,每次抓取10条
subList = orderList.subList(0, 10);
}
try {
//模拟数据处理
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我抓取的数据是:"+subList);
return subList;
}
//数据处理
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
//修改订单状态为已处理
data.forEach(o->o.setStatus(1));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我正在处理数据!");
}
}
- 主函数类
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configurationDataflow()).init();
}
/**
* zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter(){
ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
"java-simple-job");
ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
//注册中心初始化
crc.init();
return crc;
}
public static LiteJobConfiguration configurationDataflow() {
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder("myDataflowJob","0/10 * * * * ?",2)
.build();
//job类型配置,重复执行
var jtc = new DataflowJobConfiguration(jcc,
MyDataflowJob.class.getCanonicalName(),true);
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
script作业
- 脚本类型作业
- 支持shell,Python,Perl等
- 无需编码,配置任务时,添加可执行脚本的命令,作业信息作为最后一个参数自动追加
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configurationScript()).init();
}
public static LiteJobConfiguration configurationScript() {
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder("myScriptJob","0/10 * * * * ?",2)
.misfire(false)
.build();
//job类型配置
var jtc = new ScriptJobConfiguration(jcc,"d:/test.cmd");
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
spring整合作业
- 使用spring的schema整合作业
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring.version>5.1.5.RELEASE</spring.version>
<elasticjob.version>2.1.5</elasticjob.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>
- 在web.xml中配置listener,监听spring容器
<!DOCTYPE web-app PUBLIC
"-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
"http://java.sun.com/dtd/web-app_2_3.dtd" >
<web-app>
<display-name>spring-elasticjob</display-name>
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:spring-config.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
</web-app>
- 编辑作业
@Slf4j
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("我是分片项:"+shardingContext.getShardingItem());
}
}
public class MyDataflowJob implements DataflowJob<Integer> {
private List<Integer> list = new ArrayList<>();
{
list.add(0);
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
list.add(6);
list.add(7);
list.add(8);
list.add(9);
}
@Override
public List<Integer> fetchData(ShardingContext shardingContext) {
//数字 % 分片总数 == 当前分片项
List<Integer> rtnList = new ArrayList<>();
for (Integer index : list){
if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
rtnList.add(index);
break;
}
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是分片项:"+shardingContext.getShardingItem()+",我获取的数据是:"+rtnList);
return rtnList;
}
@Override
public void processData(ShardingContext shardingContext, List<Integer> data) {
list.removeAll(data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是分片项:"+shardingContext.getShardingItem()+",我移除数据是:"+data);
}
}
- 在xml中配置zookeeper注册中心,配置作业
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置数据源-->
<bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource">
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="url" value="jdbc:mysql://localhost:3306/dataflow?serverTimezone=Asia/Shanghai&useSSL=false"/>
</bean>
<!--注册中心配置-->
<reg:zookeeper server-lists="localhost:2181" base-sleep-time-milliseconds="" namespace="spring-elasticjob" id="zkCenter"/>
<!--simple作业配置-->
<!--<job:simple id="mySimpleJob" registry-center-ref="zkCenter" cron="0/10 * * * * ?" sharding-total-count="2"-->
<!--class="com.example.job.MySimpleJob" overwrite="true"/>-->
<!--Dataflow作业-->
<!--event-trace-rdb-data-source事件追踪配置-->
<job:dataflow registry-center-ref="zkCenter" cron="0/10 * * * * ?" sharding-total-count="2" id="myDataflowJob"
class="com.example.job.MyDataflowJob" event-trace-rdb-data-source="dataSource" overwrite="true" streaming-process="true" >
<job:listener class=""/>
<job:distributed-listener class="" started-timeout-milliseconds="" completed-timeout-milliseconds=""/>
</job:dataflow>
</beans>
- 放入Tomcat启动定时任务
springboot整合作业
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.7</version>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
- 目录结构
- com.example
- autoconfig
- springbootelasticjob
- resources
- META-INF
- com.example
- 编写zookeeper中心配置类
package com.example.autoconfig;
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@Setter@Getter
public class ZookeeperProperties {
//zookeeper地址列表
private String serverList;
//zookeeper命名空间
private String namespace;
}
elasticjob.zookeeper.server-list=localhost:2181
elasticjob.zookeeper.namespace=springboot-elasticjob
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.url=jdbc:mysql://localhost:3306/dataflow?serverTimezone=Asia/Shanghai&useSSL=false
mybatis.mapper-locations=/mybatis/*.xml
logging.pattern.dateformat=HH:mm:ss
- zookeeper中心自动配置
package com.example.autoconfig;
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {
@Autowired
private ZookeeperProperties zookeeperProperties;
/**
* zookeeper注册中心
* @return
*/
@Bean(initMethod = "init")
public CoordinatorRegistryCenter zkCenter(){
String serverList = zookeeperProperties.getServerList();
String namespace = zookeeperProperties.getNamespace();
var zc = new ZookeeperConfiguration(serverList,namespace);
var crc=new ZookeeperRegistryCenter(zc);
return crc;
}
}
- 导入自动配置处理依赖,属性文件自动提示
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
- 编写@ElasticSimpleJob注解
package com.example.autoconfig;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
//作业名称
String jobName() default "";
//定时表达式
String cron() default "";
//总分片数
int shardingTotalCount() default 1;
//是否覆盖zookeeper中的配置
boolean overwrite() default false;
//分片策略,默认为平均分配法
Class<? extends JobShardingStrategy> jobStrategy() default AverageAllocationJobShardingStrategy.class;
//是否进行事件追踪
boolean jobEvent() default false;
//作业监听器
Class<? extends ElasticJobListener>[] jobListner() default {};
}
package com.example.autoconfig;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticDataflowJob {
//作业名称
String jobName() default "";
//定时表达式
String cron() default "";
//总分片数
int shardingTotalCount() default 1;
//是否覆盖zookeeper中的配置
boolean overwrite() default false;
//是否开启流式处理
boolean streamingProcess() default false;
//分片策略,默认为平均分配法
Class<? extends JobShardingStrategy> jobStrategy() default AverageAllocationJobShardingStrategy.class;
//是否进行事件追踪
boolean jobEvent() default false;
//作业监听器
Class<? extends ElasticJobListener>[] jobListner() default {};
}
- 使用java反射完成作业注册
package com.example.autoconfig;
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {
@Autowired
private CoordinatorRegistryCenter zkCenter;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initSimpleJob() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
//获取所有标注了作业注解的bean
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()){
Object instance = entry.getValue();
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> superInterface : interfaces){
//如果作业类实现了SimpleJob这个接口就进行注册
if (superInterface == SimpleJob.class){
//获取作业注解信息
ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
Class<?> jobStrategy = annotation.jobStrategy();
//是否开启事件追踪
boolean isJobEvent = annotation.jobEvent();
//作业监听器
Class<? extends ElasticJobListener>[] listeners = annotation.jobListner();
ElasticJobListener[] listenerInstances = null;
//如果监听器不为空且长度大于0
if (listeners!=null && listeners.length>0){
listenerInstances = new ElasticJobListener[listeners.length];
int i = 0;
for (Class<? extends ElasticJobListener> listener : listeners){
ElasticJobListener listenerInstance = listener.getDeclaredConstructor().newInstance();
listenerInstances[i] = listenerInstance;
i++;
}
}else {
listenerInstances = new ElasticJobListener[0];
}
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder(jobName,cron,shardingTotalCount)
.build();
//job类型配置
var jtc = new SimpleJobConfiguration(jcc,
instance.getClass().getCanonicalName());
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
//判断是否开启事件追踪
if (isJobEvent){
JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,jec,listenerInstances).init();
}else {
new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,listenerInstances).init();
}
}
}
}
}
}
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class DataflowJobAutoConfig {
@Autowired
private CoordinatorRegistryCenter zkCenter;
@Autowired
private ApplicationContext applicationContext;
@Autowired
private DataSource dataSource;
@PostConstruct
public void initDataflowJob() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticDataflowJob.class);
for (Map.Entry<String, Object> entry : beans.entrySet()){
Object instance = entry.getValue();
Class<?>[] interfaces = instance.getClass().getInterfaces();
for (Class<?> superInterface : interfaces){
if (superInterface == DataflowJob.class){
ElasticDataflowJob annotation = instance.getClass().getAnnotation(ElasticDataflowJob.class);
String jobName = annotation.jobName();
String cron = annotation.cron();
int shardingTotalCount = annotation.shardingTotalCount();
boolean overwrite = annotation.overwrite();
boolean streamingProcess = annotation.streamingProcess();
Class<?> jobStrategy = annotation.jobStrategy();
boolean isJobEvent = annotation.jobEvent();
Class<? extends ElasticJobListener>[] listeners = annotation.jobListner();
ElasticJobListener[] listenerInstances = null;
if (listeners!=null && listeners.length>0){
listenerInstances = new ElasticJobListener[listeners.length];
int i = 0;
for (Class<? extends ElasticJobListener> listener : listeners){
ElasticJobListener listenerInstance = listener.getDeclaredConstructor().newInstance();
listenerInstances[i] = listenerInstance;
i++;
}
}else {
listenerInstances = new ElasticJobListener[0];
}
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder(jobName,cron,shardingTotalCount)
.build();
//job类型配置
var jtc = new DataflowJobConfiguration(jcc,
instance.getClass().getCanonicalName(),streamingProcess);
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.jobShardingStrategyClass(jobStrategy.getCanonicalName())
.overwrite(overwrite)
.build();
if (isJobEvent){
JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,jec,listenerInstances).init();
}else {
new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,listenerInstances).init();
}
}
}
}
}
}
- 编写作业实现类,在类上加@ElasticSimpleJob注解
package com.example.springbootelasticjob.job;
@Slf4j
@ElasticSimpleJob(
jobName = "mySimpleJob" ,
cron = "0/5 * * * * ?",
shardingTotalCount = 1,
overwrite = true
)
public class MySimpleJob implements SimpleJob {
@Autowired
private OrderService orderService;
@Override
public void execute(ShardingContext shardingContext) {
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+
",总分片项:"+shardingContext.getShardingTotalCount()+",taskId"+
shardingContext.getTaskId());
}
}
package com.example.springbootelasticjob.job;
@Slf4j
@ElasticDataflowJob(
jobName = "myDataflowJob",
cron = "0/10 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
streamingProcess = true
)
public class MyDataflowJob implements DataflowJob<Integer> {
private List<Integer> list = new ArrayList<>();
{
list.add(0);
list.add(1);
list.add(2);
list.add(3);
list.add(4);
list.add(5);
list.add(6);
list.add(7);
list.add(8);
list.add(9);
}
@Override
public List<Integer> fetchData(ShardingContext shardingContext) {
List<Integer> rtnList = new ArrayList<>();
//数字 % 分片总数 == 当前分片项
for (Integer index : list){
if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
rtnList.add(index);
break;
}
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项:"+shardingContext.getShardingItem()+",我是抓取的数据是:"+rtnList);
return rtnList;
}
@Override
public void processData(ShardingContext shardingContext, List<Integer> data) {
list.removeAll(data);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("我是分片项:"+shardingContext.getShardingItem()+"我移除了数据:"+data);
}
}
- 为了把项目打成jar包后能让其他项目也能使用到我们自定义的自动配置,需要在resources目录下创建META-INF文件夹,创建spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.autoconfig.ZookeeperAutoConfig,\
com.example.autoconfig.SimpleJobAutoConfig,\
com.example.autoconfig.DataflowJobAutoConfig
- 启动项目进行测试
定时轮询取消订单
- 模拟30秒未支付订单自动取消,使用ElasticSimpleJob实现
- 模拟订单生成过程,编写创建订单方法,配置定时任务每5秒执行一次
public class Order {
private Integer id;
private BigDecimal amount;
private Integer status;
private String receiveName;
private String receiveAddress;
private String receiveMobile;
private String createUser;
private Date createTime;
private String updateUser;
private Date updateTime;
@Slf4j
@ElasticSimpleJob(
jobName = "mySimpleJob" ,
cron = "0/5 * * * * ?",
shardingTotalCount = 1,
overwrite = true
)
public class MySimpleJob implements SimpleJob {
@Autowired
private OrderService orderService;
@Override
public void execute(ShardingContext shardingContext) {
//每5秒向数据库中插入10条订单数据
for (int i=0;i<10;i++){
orderService.insertOrder();
}
}
}
public int insertOrder(){
Order order = new Order();
order.setAmount(BigDecimal.TEN);
order.setReceiveName("Green");
order.setReceiveAddress("中国北京朝阳区xxx");
order.setReceiveMobile("13811112222");
order.setStatus(1);//未支付
order.setCreateUser("Green");
order.setCreateTime(new Date());
order.setUpdateUser("Green");
order.setUpdateTime(new Date());
int i = orderMapper.insertSelective(order);
return i;
}
- 编写处理超时订单的任务,编写超时订单sql,使用多线程取消订单,使用乐观锁实现取消订单业务(根据update_time),防止我们在取消订单的时候用户突然付款了
@ElasticSimpleJob(
jobName = "orderCancelJob",
cron = "0/15 * * * * ?",
shardingTotalCount = 2,
overwrite = true
)
public class OrderCancelJob implements SimpleJob {
@Autowired
private OrderService orderService;
@Override
public void execute(ShardingContext shardingContext) {
//获取30秒之前的时间点
Calendar now = Calendar.getInstance();
now.add(Calendar.SECOND,-30);
//订单尾号 % 分片总数 == 当前分片项
List<Order> orders = orderService.getOrder(now,
shardingContext.getShardingTotalCount(),shardingContext.getShardingItem());
if (orders!=null&&orders.size()>0){
//创建线程池
ExecutorService es = Executors.newFixedThreadPool(4);
for(Order order : orders){
es.execute(()->{
//更新条件
Integer orderId = order.getId();
Date updateTime = order.getUpdateTime();
//更新内容
int status = 3;//已取消
String updateUser = "system";
Date updateNow = new Date();
orderService.cancelOrder(orderId,updateTime,status,updateUser,updateNow);
});
}
es.shutdown();
}
}
}
<select id="getOrder" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from t_order
<where>
create_time < #{param1}
and status = 1
and id % #{param2} = #{param3}
</where>
</select>
<update id="cancelOrder">
update t_order set
status = #{param3},
update_user = #{param4},
update_time = #{param5}
<where>
id = #{param1}
and update_time = #{param2}
</where>
</update>
- Junit测试取消订单业务是否正确
- 使用@ElasticSimpleJob配置分片总数,定时规则
- 启动定时任务,测试功能正确性
第三方订单导入案例
- Dataflow流式作业实战【模拟第三方订单导入】
//第三方京东订单
public class JdOrder {
private Integer id;
private Integer status;
private BigDecimal amount;
private String createUser;
private Date createTime;
private String updateUser;
private Date updateTime;
}
//第三方天猫订单
public class TmallOrder {
private Integer id;
private Integer orderStatus;
private BigDecimal money;
private String createUser;
private Date createTime;
private String updateUser;
private Date updateTime;
}
//本系统订单
public class AllOrder {
private Integer id;
private Integer thirdOrderId;
private Integer type;
private BigDecimal totalAmount;
private String createUser;
private Date createTime;
private String updateUser;
private Date updateTime;
- 编写添加订单作业,模拟第三方订单产生
@ElasticSimpleJob(
jobName = "thirdOrderProduceJob",
cron = "0/5 * * * * ?",
shardingTotalCount = 1,
overwrite = true
)
public class ThirdOrderProduceJob implements SimpleJob {
@Autowired
private OrderService orderService;
@Override
public void execute(ShardingContext shardingContext) {
orderService.produceThirdOrder();
}
}
public void produceThirdOrder() {
for (int i=0;i<5;i++){
Random random = new Random();
int randomInt = random.nextInt(2);
//京东订单
if (randomInt ==0){
log.info("插入京东订单");
JdOrder jdOrder = new JdOrder();
jdOrder.setStatus(0);//未抓取
jdOrder.setAmount(BigDecimal.TEN);
jdOrder.setCreateUser("jdUser");
jdOrder.setCreateTime(new Date());
jdOrder.setUpdateUser("jdUser");
jdOrder.setUpdateTime(new Date());
jdOrderMapper.insertSelective(jdOrder);
}else {//天猫订单
log.info("插入天猫订单");
TmallOrder tmallOrder = new TmallOrder();
tmallOrder.setOrderStatus(0);//未抓取
tmallOrder.setMoney(new BigDecimal(100));
tmallOrder.setCreateUser("tmallUser");
tmallOrder.setCreateTime(new Date());
tmallOrder.setUpdateUser("tmallUser");
tmallOrder.setUpdateTime(new Date());
tmallOrderMapper.insertSelective(tmallOrder);
}
}
}
- 使用Dataflow抓取第三方订单,分片总数为2,分片项0抓取京东订单,分片项1抓取天猫订单,作业每15秒执行一次,每个分片抓取5个订单
@ElasticDataflowJob(
jobName = "fetchThirdOrderJob",
cron = "0/15 * * * * ?",
shardingTotalCount = 2,
overwrite = true,
streamingProcess = true
)
public class FetchThirdOrderJob implements DataflowJob<Object> {
@Autowired
private OrderService orderService;
@Autowired
private JdOrderMapper jdOrderMapper;
@Autowired
private TmallOrderMapper tmallOrderMapper;
@Override
public List<Object> fetchData(ShardingContext shardingContext) {
//京东订单
if (shardingContext.getShardingItem() == 0){
//获取未抓取的5条数据
List<JdOrder> jdOrders = jdOrderMapper.getNotFetchedOrder(5);
if (jdOrders!=null&&jdOrders.size()>0){
List<Object> jdOrderList = jdOrders.stream().map(jdOrder -> (Object) jdOrder).collect(toList());
return jdOrderList;
}
}else {//天猫订单
//获取未抓取的5条数据
List<TmallOrder> tmallOrders = tmallOrderMapper.getNotFetchedOrder(5);
if (tmallOrders!=null&&tmallOrders.size()>0){
List<Object> tmallOrderList = tmallOrders.stream().map(tmallOrder -> (Object)tmallOrder).collect(toList());
return tmallOrderList;
}
}
return null;
}
@Override
public void processData(ShardingContext shardingContext, List<Object> data) {
//京东订单
if (shardingContext.getShardingItem() ==0){
if (data!=null&&data.size()>0){
List<JdOrder> jdOrders = data.stream().map(d -> (JdOrder) d).collect(toList());
for (JdOrder jdOrder : jdOrders){
AllOrder allOrder = new AllOrder();
allOrder.setThirdOrderId(jdOrder.getId());
allOrder.setType(0);//京东订单
allOrder.setTotalAmount(jdOrder.getAmount());
allOrder.setCreateUser("system");
allOrder.setCreateTime(new Date());
allOrder.setUpdateUser("system");
allOrder.setUpdateTime(new Date());
orderService.processJdOrder(allOrder);
}
}
}else {//天猫订单
if (data!=null&&data.size()>0){
List<TmallOrder> tmallOrders = data.stream().map(d -> (TmallOrder) d).collect(toList());
for (TmallOrder tmallOrder : tmallOrders){
AllOrder allOrder = new AllOrder();
allOrder.setThirdOrderId(tmallOrder.getId());
allOrder.setType(1);//天猫订单
allOrder.setTotalAmount(tmallOrder.getMoney());
allOrder.setCreateUser("system");
allOrder.setCreateTime(new Date());
allOrder.setUpdateUser("system");
allOrder.setUpdateTime(new Date());
orderService.processTmallOrder(allOrder);
}
}
}
}
}
<select id="getNotFetchedOrder" resultType="com.example.springbootelasticjob.model.JdOrder">
SELECT
<include refid="Base_Column_List"/>
FROM
jd_order
WHERE
`status` = 0
LIMIT #{param1}
</select>
<select id="getNotFetchedOrder" resultType="com.example.springbootelasticjob.model.TmallOrder">
select <include refid="Base_Column_List"/>
from tmall_order
<where>
order_status =0
</where>
limit #{param1}
</select>
@Transactional
public void processJdOrder(AllOrder allOrder) {
//订单入库
allOrderMapper.insertSelective(allOrder);
//修改京东订单状态
JdOrder jdOrder = new JdOrder();
jdOrder.setId(allOrder.getThirdOrderId());
jdOrder.setStatus(1);//已抓取
jdOrder.setUpdateUser("system");
jdOrder.setUpdateTime(new Date());
jdOrderMapper.updateByPrimaryKeySelective(jdOrder);
}
@Transactional
public void processTmallOrder(AllOrder allOrder) {
//订单入库
allOrderMapper.insertSelective(allOrder);
//修改天猫订单状态
TmallOrder tmallOrder = new TmallOrder();
tmallOrder.setId(allOrder.getThirdOrderId());
tmallOrder.setOrderStatus(1);//已抓取
tmallOrder.setUpdateUser("system");
tmallOrder.setUpdateTime(new Date());
tmallOrderMapper.updateByPrimaryKeySelective(tmallOrder);
}
- 测试
自定义分片策略
-
分片策略:按照某种规则将分片项分配到生效的服务中
-
现有的分片策略:
-
平均分配分片(默认算法),比如有10个分片,3个服务,1=【0,1,2,9】,2=【3,4,5】,3=【6,7,8】
-
作业名的哈希值奇偶数决定IP升降序算法的分片策略,比如2个分片,三个服务。当作业名称hash值为奇数时,1=【0】,2=【1】,3=【】。当作业名称hash值为偶数时,3=【0】,2=【1】,1=【】
-
-
自定义分片策略步骤:
- 自定义分片类实现JobShardingStrategy接口
- 实现sharding方法
- Sharding方法入参,服务实例列表,分片总数,job名称
- 返回值,返回服务实例对应的分片项
- 配置分片策略
- JobShardingStrategyClass配置分片策略类的全路径
package com.example.springbootelasticjob.sharding;
//自定义分片策略
public class MyShardingStrategy implements JobShardingStrategy {
@Override
public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances,
String jobName, int shardingTotalCount) {
Map<JobInstance, List<Integer>> rtnMap = new HashMap<>();
//分片队列
ArrayDeque<Integer> queue = new ArrayDeque<>(shardingTotalCount);
for (int i=0;i<shardingTotalCount;i++){
queue.add(i);
}
//进行轮询给服务分配分片
while (queue.size()>0){
for (JobInstance jobInstance : jobInstances){
if (queue.size()>0){
Integer shardingItem = queue.pop();
List<Integer> integers = rtnMap.get(jobInstance);
if (integers!=null&&integers.size()>0){
integers.add(shardingItem);
}else {
List<Integer> list = new ArrayList<>();
list.add(shardingItem);
rtnMap.put(jobInstance,list);
}
}
}
}
return rtnMap;
}
}
package com.example.springbootelasticjob.job;
@ElasticSimpleJob(
jobName = "myShardingJob",
cron = "0/10 * * * * ?",
overwrite = true,
shardingTotalCount = 10,
jobStrategy = MyShardingStrategy.class,
jobEvent = true,
jobListner = {MyNormalListener.class,MyNormalListener.class}
)
@Slf4j
public class MyShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
log.info("我是分片项:"+shardingContext.getShardingItem());
}
}
事件追踪
- 作业的变更情况,配置数据源,对应的数据库自动创建表
- 整合springboot详情请看SimpleJobAutoConfig和DataflowJobAutoConfig
- 整合spring详情请看spring-config.xml
作业监听器
- 监听作业执行前和作业执行后
- 监听器类型分为两种:
- 每个作业节点均执行,无需考虑分布式(推荐)
- 分布式场景中,仅单一节点执行,整个任务只有一头一尾(谨慎使用)
- 实现监听器接口,实现作业执行前,后方法
- 每台作业均监听,需实现ElasticJobListener
- 分布式场景中监听,需继承AbstractDistributeOnceElasticJobListener
- 最后将监听器传入JobScheduler中
package com.example.springbootelasticjob.listener;
public class MyNormalListener implements ElasticJobListener {
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
System.out.println("我是"+shardingContexts.getJobName()+"作业,在方法前!");
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
System.out.println("我是"+shardingContexts.getJobName()+"作业,在方法后!");
}
}
package com.example.springbootelasticjob.listener;
//分布式监听器,存在缺陷,不推荐使用
public class MyDistributeListener extends AbstractDistributeOnceElasticJobListener {
public MyDistributeListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
}
@Override
public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
System.out.println("我是分布式监听器,我在方法执行前");
}
@Override
public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
System.out.println("我是分布式监听器,我在方法执行后");
}
}
- 修改注解和自动配置类