百味皆苦 java后端开发攻城狮

定时任务框架Elastic-Job和Quartz

2019-12-16
百味皆苦

简介

项目源码

Elastic-Job

  • 支持集群,支持分布式(将一个任务拆分成多个独立的任务项,由分布式服务器分别执行某一个或几个分片项),不支持动态添加任务
  • 分布式调度解决方案,使用jar包提供协调服务
  • 外部依赖ZooKeeper(作为注册中心)进行集群
  • zookeeper可视化工具:zooinspector
  • 使用分片概念:一个任务拆分为多个独立的任务项,每个服务获得一个或几个分片项(用两个服务器遍历数据库的某张表,每个服务器执行任务的一般,A服务器获取id为奇数的数据,B服务器获取id为偶数的数据)
  • 三种任务类型:Simple,Dataflow,Script
  • 三种整合方式:javaAPI,spring,springboot
  • 高级知识:自定义分片,作业监听器,事件追踪
  • 实战应用:订单自动取消,第三方订单抓取

Quartz

  • 支持集群,伪分布式,支持动态添加任务

  • 基础知识:Job和JobDetail,Trigger触发器,Cron表达式
  • 三种整合方式:javaAPI,spring,springboot
  • 高级知识:Job监听器,Trigger监听器,Scheduler监听器,Quartz集群
  • 实战应用:按小时统计订单信息

Elastic-Job

Simple作业

  • 意为定时任务的简单实现,只需要实现execute方法
  • 提供了弹性扩容和分片功能
  • 操作步骤:
    • 实现SimpleJob接口,实现execute方法
    • 定义作业核心配置(作业名称,定时策略,分片总数)
    • 定义作业类型(Simple,Dataflow,Script,实现类的全包名)
    • 定义Lite作业根配置(overwrite属性的重要性)
    • 配置注册中心ZooKeeper(可以单机,可以集群)。包括ip,端口,命名空间
    • 编写Main函数,启动定时任务
    • 新增启动配置,修改端口,启动定时任务第二个实例
    • 控制台观察两个实例打印出的分片项
<dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-core</artifactId>
      <version>2.1.5</version>
</dependency>
  • job
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;

import java.time.LocalTime;

public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        LocalTime time = LocalTime.now();
        System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+
                ",总分片项:"+shardingContext.getShardingTotalCount()+",taskId"+
                shardingContext.getTaskId());
    }
}
  • 主函数
/**
 * Hello world!
 *
 */
public class App {
    public static void main( String[] args )    {
        System.out.println( "Hello World!" );
        new JobScheduler(zkCenter(),configuration()).init();
    }

    /**
     * zookeeper注册中心
     * @return
     */
    public static CoordinatorRegistryCenter zkCenter(){
        ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
                "java-simple-job");

        ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
        //注册中心初始化
        crc.init();
        return crc;
    }


    /**
     * job配置
     * @return
     */
    public static LiteJobConfiguration configuration() {
        //job核心配置,每5秒执行一次任务,2个分片
        JobCoreConfiguration jcc = JobCoreConfiguration
                .newBuilder("mySimpleJob","0/5 * * * * ?",2)
                .build();
        //job类型配置,传入job类的全包名
        SimpleJobConfiguration jtc = new SimpleJobConfiguration(jcc,
                MySimpleJob.class.getCanonicalName());

        //job根的配置(LiteJobConfiguration)
        LiteJobConfiguration ljc = LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();

        return ljc;
    }
 
}
  • 先启动一次Main函数类,发现打印结果为:我是分片项0,总分片项2,我是分片项1,总分片项2
  • 再启动一个Main函数类,发现第一个控制台输出:我是分片项0,总分片项2。第二个控制台输出:我是分片项1,总分片项2
  • 验证了分片处理

Dataflow

  • Dataflow类型用于处理流式作业,分为数据抓取(fetchData)和数据处理(processData)
  • 应用场景:适用于不间断的数据处理,比如第三方订单的抓取
  • 执行流程:
    • 定时任务根据规则触发
    • 抓取数据
    • 处理数据,完成后再次抓取
    • 若数据存在,继续处理;若不存在,则本次任务结束
    • 等待任务规则,下次触发
  • 实现DataflowJob接口,注意DataflowJob<T>泛型,泛型规定了抓取数据的返回类型
  • 案例:处理100个订单,分片处理,每次处理10个订单
//模拟订单
public class Order {
    private Integer orderId;
    //0:未处理;1:已处理
    private Integer status;

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "Order{" +
                "orderId=" + orderId +
                ", status=" + status +
                '}';
    }
}
  • 流式任务
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.model.Order;

import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;

import static java.util.stream.Collectors.toList;

public class MyDataflowJob implements DataflowJob<Order> {

    private List<Order> orders = new ArrayList<>();
	//初始化100个订单
    {
        for (int i=0;i<100;i++){
            Order order = new Order();
            order.setOrderId(i+1);
            //未处理
            order.setStatus(0);
            orders.add(order);
        }
    }

	//抓取数据
    @Override
    public List<Order> fetchData(ShardingContext shardingContext) {
        //订单号 % 分片总数 == 当前分片项
        List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
                .filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
                .collect(toList());

        List<Order> subList = null;
        if (orderList!=null && orderList.size()>0){
            //如果list不为空,每次抓取10条
            subList  = orderList.subList(0, 10);

        }

        try {
            //模拟数据处理
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        LocalTime time = LocalTime.now();

        System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我抓取的数据是:"+subList);

        return subList;
    }

    //数据处理
    @Override
    public void processData(ShardingContext shardingContext, List<Order> data) {
        //修改订单状态为已处理
        data.forEach(o->o.setStatus(1));
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        LocalTime time = LocalTime.now();
        System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我正在处理数据!");
    }
}
  • 主函数类
public class App {
    public static void main( String[] args )    {
        System.out.println( "Hello World!" );
        new JobScheduler(zkCenter(),configurationDataflow()).init();
    }
    
    /**
     * zookeeper注册中心
     * @return
     */
    public static CoordinatorRegistryCenter zkCenter(){
        ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
                "java-simple-job");

        ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
        //注册中心初始化
        crc.init();
        return crc;
    }
    
    public static LiteJobConfiguration configurationDataflow() {
        //job核心配置
        var jcc = JobCoreConfiguration
                .newBuilder("myDataflowJob","0/10 * * * * ?",2)
                .build();

        //job类型配置,重复执行
        var jtc = new DataflowJobConfiguration(jcc,
                MyDataflowJob.class.getCanonicalName(),true);

        //job根的配置(LiteJobConfiguration)
        var ljc = LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();

        return ljc;
    }
}

script作业

  • 脚本类型作业
  • 支持shell,Python,Perl等
  • 无需编码,配置任务时,添加可执行脚本的命令,作业信息作为最后一个参数自动追加
public class App {
    public static void main( String[] args )    {
        System.out.println( "Hello World!" );
        new JobScheduler(zkCenter(),configurationScript()).init();
    }
    public static LiteJobConfiguration configurationScript() {
        //job核心配置
        var jcc = JobCoreConfiguration
                .newBuilder("myScriptJob","0/10 * * * * ?",2)
                .misfire(false)
                .build();

        //job类型配置
        var jtc = new ScriptJobConfiguration(jcc,"d:/test.cmd");

        //job根的配置(LiteJobConfiguration)
        var ljc = LiteJobConfiguration
                .newBuilder(jtc)
                .overwrite(true)
                .build();

        return ljc;
    }
}

spring整合作业

  • 使用spring的schema整合作业
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <spring.version>5.1.5.RELEASE</spring.version>
    <elasticjob.version>2.1.5</elasticjob.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-web</artifactId>
      <version>${spring.version}</version>
    </dependency>
    <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-core</artifactId>
      <version>${elasticjob.version}</version>
    </dependency>
    <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-spring</artifactId>
      <version>${elasticjob.version}</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.6</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-dbcp2</artifactId>
      <version>2.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.quartz-scheduler</groupId>
      <artifactId>quartz</artifactId>
      <version>2.3.1</version>
    </dependency>
  </dependencies>
  • 在web.xml中配置listener,监听spring容器
<!DOCTYPE web-app PUBLIC
 "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
 "http://java.sun.com/dtd/web-app_2_3.dtd" >

<web-app>
  <display-name>spring-elasticjob</display-name>
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath*:spring-config.xml</param-value>
  </context-param>

  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>

</web-app>

  • 编辑作业
@Slf4j
public class MySimpleJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("我是分片项:"+shardingContext.getShardingItem());
    }
}
public class MyDataflowJob implements DataflowJob<Integer> {

    private List<Integer> list = new ArrayList<>();

    {
        list.add(0);
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        list.add(6);
        list.add(7);
        list.add(8);
        list.add(9);

    }

    @Override
    public List<Integer> fetchData(ShardingContext shardingContext) {
        //数字 % 分片总数 == 当前分片项
        List<Integer> rtnList = new ArrayList<>();
        for (Integer index : list){
            if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
                rtnList.add(index);
                break;
            }
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是分片项:"+shardingContext.getShardingItem()+",我获取的数据是:"+rtnList);

        return rtnList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Integer> data) {
        list.removeAll(data);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("我是分片项:"+shardingContext.getShardingItem()+",我移除数据是:"+data);
    }
}
  • 在xml中配置zookeeper注册中心,配置作业
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
       xmlns:job="http://www.dangdang.com/schema/ddframe/job"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.dangdang.com/schema/ddframe/reg
            http://www.dangdang.com/schema/ddframe/reg/reg.xsd
            http://www.dangdang.com/schema/ddframe/job
            http://www.dangdang.com/schema/ddframe/job/job.xsd
">
    <!--配置数据源-->
    <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource">
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
        <property name="url" value="jdbc:mysql://localhost:3306/dataflow?serverTimezone=Asia/Shanghai&amp;useSSL=false"/>
    </bean>



    <!--注册中心配置-->
    <reg:zookeeper server-lists="localhost:2181" base-sleep-time-milliseconds="" namespace="spring-elasticjob" id="zkCenter"/>

    <!--simple作业配置-->
    <!--<job:simple id="mySimpleJob" registry-center-ref="zkCenter" cron="0/10 * * * * ?" sharding-total-count="2"-->
                <!--class="com.example.job.MySimpleJob" overwrite="true"/>-->

    <!--Dataflow作业-->
    <!--event-trace-rdb-data-source事件追踪配置-->
    <job:dataflow registry-center-ref="zkCenter" cron="0/10 * * * * ?" sharding-total-count="2" id="myDataflowJob"
                  class="com.example.job.MyDataflowJob" event-trace-rdb-data-source="dataSource"  overwrite="true" streaming-process="true" >
        <job:listener class=""/>
        <job:distributed-listener class="" started-timeout-milliseconds="" completed-timeout-milliseconds=""/>
        
    </job:dataflow>
</beans>
  • 放入Tomcat启动定时任务

springboot整合作业

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-core</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>com.dangdang</groupId>
            <artifactId>elastic-job-lite-spring</artifactId>
            <version>2.1.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.15</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <groupId>org.mybatis.generator</groupId>
                <artifactId>mybatis-generator-maven-plugin</artifactId>
                <version>1.3.7</version>
                <dependencies>
                    <dependency>
                        <groupId>mysql</groupId>
                        <artifactId>mysql-connector-java</artifactId>
                        <version>8.0.15</version>
                    </dependency>
                </dependencies>
            </plugin>
        </plugins>
    </build>
  • 目录结构
    • com.example
      • autoconfig
      • springbootelasticjob
    • resources
      • META-INF
  • 编写zookeeper中心配置类
package com.example.autoconfig;
@ConfigurationProperties(prefix = "elasticjob.zookeeper")
@Setter@Getter
public class ZookeeperProperties {
    //zookeeper地址列表
    private String serverList;
    //zookeeper命名空间
    private String namespace;

}
elasticjob.zookeeper.server-list=localhost:2181
elasticjob.zookeeper.namespace=springboot-elasticjob

spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.url=jdbc:mysql://localhost:3306/dataflow?serverTimezone=Asia/Shanghai&useSSL=false

mybatis.mapper-locations=/mybatis/*.xml
logging.pattern.dateformat=HH:mm:ss
  • zookeeper中心自动配置
package com.example.autoconfig;
@Configuration
@ConditionalOnProperty("elasticjob.zookeeper.server-list")
@EnableConfigurationProperties(ZookeeperProperties.class)
public class ZookeeperAutoConfig {

    @Autowired
    private ZookeeperProperties zookeeperProperties;

    /**
     * zookeeper注册中心
     * @return
     */
    @Bean(initMethod = "init")
    public CoordinatorRegistryCenter zkCenter(){
        String serverList = zookeeperProperties.getServerList();
        String namespace = zookeeperProperties.getNamespace();
        var zc = new ZookeeperConfiguration(serverList,namespace);

        var crc=new ZookeeperRegistryCenter(zc);

        return crc;
    }

}
  • 导入自动配置处理依赖,属性文件自动提示
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
  • 编写@ElasticSimpleJob注解
package com.example.autoconfig;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticSimpleJob {
    //作业名称
    String jobName() default "";
    //定时表达式
    String cron() default "";
    //总分片数
    int shardingTotalCount() default 1;
    //是否覆盖zookeeper中的配置
    boolean overwrite() default false;
    //分片策略,默认为平均分配法
    Class<? extends JobShardingStrategy> jobStrategy() default AverageAllocationJobShardingStrategy.class;
    //是否进行事件追踪
    boolean jobEvent() default false;
    //作业监听器
    Class<? extends ElasticJobListener>[] jobListner() default {};
}
package com.example.autoconfig;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface ElasticDataflowJob {

    //作业名称
    String jobName() default "";
    //定时表达式
    String cron() default "";
    //总分片数
    int shardingTotalCount() default 1;
    //是否覆盖zookeeper中的配置
    boolean overwrite() default false;
    //是否开启流式处理
    boolean streamingProcess() default false;
    //分片策略,默认为平均分配法
    Class<? extends JobShardingStrategy> jobStrategy() default AverageAllocationJobShardingStrategy.class;
    //是否进行事件追踪
    boolean jobEvent() default false;
    //作业监听器
    Class<? extends ElasticJobListener>[] jobListner() default {};
}
  • 使用java反射完成作业注册
package com.example.autoconfig;
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class SimpleJobAutoConfig {

    @Autowired
    private CoordinatorRegistryCenter zkCenter;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private DataSource dataSource;

    @PostConstruct
    public void initSimpleJob() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
        //获取所有标注了作业注解的bean
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticSimpleJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()){
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces){
                //如果作业类实现了SimpleJob这个接口就进行注册
                if (superInterface == SimpleJob.class){
                    //获取作业注解信息
                    ElasticSimpleJob annotation = instance.getClass().getAnnotation(ElasticSimpleJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    Class<?> jobStrategy = annotation.jobStrategy();
                    //是否开启事件追踪
                    boolean isJobEvent = annotation.jobEvent();
                    //作业监听器
                    Class<? extends ElasticJobListener>[] listeners = annotation.jobListner();
                    ElasticJobListener[] listenerInstances = null;
                    //如果监听器不为空且长度大于0
                    if (listeners!=null && listeners.length>0){
                        listenerInstances = new ElasticJobListener[listeners.length];
                        int i = 0;
                        for (Class<? extends ElasticJobListener> listener : listeners){
                            ElasticJobListener listenerInstance = listener.getDeclaredConstructor().newInstance();
                            listenerInstances[i] = listenerInstance;
                            i++;
                        }
                    }else {
                        listenerInstances = new ElasticJobListener[0];
                    }


                    //job核心配置
                    var jcc = JobCoreConfiguration
                            .newBuilder(jobName,cron,shardingTotalCount)
                            .build();
                    //job类型配置
                    var jtc = new SimpleJobConfiguration(jcc,
                            instance.getClass().getCanonicalName());

                    //job根的配置(LiteJobConfiguration)
                    var ljc = LiteJobConfiguration
                            .newBuilder(jtc)
                            .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                            .overwrite(overwrite)
                            .build();
					//判断是否开启事件追踪
                    if (isJobEvent){
                        JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
                        new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,jec,listenerInstances).init();
                    }else {
                        new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,listenerInstances).init();
                    }
                }
            }
        }
    }

}
@Configuration
@ConditionalOnBean(CoordinatorRegistryCenter.class)
@AutoConfigureAfter(ZookeeperAutoConfig.class)
public class DataflowJobAutoConfig {

    @Autowired
    private CoordinatorRegistryCenter zkCenter;

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private DataSource dataSource;

    @PostConstruct
    public void initDataflowJob() throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(ElasticDataflowJob.class);
        for (Map.Entry<String, Object> entry : beans.entrySet()){
            Object instance = entry.getValue();
            Class<?>[] interfaces = instance.getClass().getInterfaces();
            for (Class<?> superInterface : interfaces){
                if (superInterface == DataflowJob.class){
                    ElasticDataflowJob annotation = instance.getClass().getAnnotation(ElasticDataflowJob.class);
                    String jobName = annotation.jobName();
                    String cron = annotation.cron();
                    int shardingTotalCount = annotation.shardingTotalCount();
                    boolean overwrite = annotation.overwrite();
                    boolean streamingProcess = annotation.streamingProcess();
                    Class<?> jobStrategy = annotation.jobStrategy();
                    boolean isJobEvent = annotation.jobEvent();
                    Class<? extends ElasticJobListener>[] listeners = annotation.jobListner();
                    ElasticJobListener[] listenerInstances = null;
                    if (listeners!=null && listeners.length>0){
                        listenerInstances = new ElasticJobListener[listeners.length];
                        int i = 0;
                        for (Class<? extends ElasticJobListener> listener : listeners){
                            ElasticJobListener listenerInstance = listener.getDeclaredConstructor().newInstance();
                            listenerInstances[i] = listenerInstance;
                            i++;
                        }
                    }else {
                        listenerInstances = new ElasticJobListener[0];
                    }
                    //job核心配置
                    var jcc = JobCoreConfiguration
                            .newBuilder(jobName,cron,shardingTotalCount)
                            .build();

                    //job类型配置
                    var jtc = new DataflowJobConfiguration(jcc,
                            instance.getClass().getCanonicalName(),streamingProcess);

                    //job根的配置(LiteJobConfiguration)
                    var ljc = LiteJobConfiguration
                            .newBuilder(jtc)
                            .jobShardingStrategyClass(jobStrategy.getCanonicalName())
                            .overwrite(overwrite)
                            .build();

                    if (isJobEvent){
                        JobEventConfiguration jec = new JobEventRdbConfiguration(dataSource);
                        new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,jec,listenerInstances).init();
                    }else {
                        new SpringJobScheduler((ElasticJob) instance,zkCenter,ljc,listenerInstances).init();
                    }
                }
            }
        }
    }
}
  • 编写作业实现类,在类上加@ElasticSimpleJob注解
package com.example.springbootelasticjob.job;
@Slf4j
@ElasticSimpleJob(
        jobName = "mySimpleJob" ,
        cron = "0/5 * * * * ?",
        shardingTotalCount = 1,
        overwrite = true
)
public class MySimpleJob implements SimpleJob {
    @Autowired
    private OrderService orderService;

    @Override
    public void execute(ShardingContext shardingContext) {

        LocalTime time = LocalTime.now();
        System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+
                ",总分片项:"+shardingContext.getShardingTotalCount()+",taskId"+
                shardingContext.getTaskId());

    }
}
package com.example.springbootelasticjob.job;
@Slf4j
@ElasticDataflowJob(
        jobName = "myDataflowJob",
        cron = "0/10 * * * * ?",
        shardingTotalCount = 2,
        overwrite = true,
        streamingProcess = true
)
public class MyDataflowJob implements DataflowJob<Integer> {

    private List<Integer> list = new ArrayList<>();

    {
        list.add(0);
        list.add(1);
        list.add(2);
        list.add(3);
        list.add(4);
        list.add(5);
        list.add(6);
        list.add(7);
        list.add(8);
        list.add(9);
    }

    @Override
    public List<Integer> fetchData(ShardingContext shardingContext) {
        List<Integer> rtnList = new ArrayList<>();
        //数字 % 分片总数 == 当前分片项
        for (Integer index : list){
            if (index % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()){
                rtnList.add(index);
                break;
            }
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        log.info("我是分片项:"+shardingContext.getShardingItem()+",我是抓取的数据是:"+rtnList);

        return rtnList;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Integer> data) {
        list.removeAll(data);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("我是分片项:"+shardingContext.getShardingItem()+"我移除了数据:"+data);
    }
}
  • 为了把项目打成jar包后能让其他项目也能使用到我们自定义的自动配置,需要在resources目录下创建META-INF文件夹,创建spring.factories文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.example.autoconfig.ZookeeperAutoConfig,\
com.example.autoconfig.SimpleJobAutoConfig,\
com.example.autoconfig.DataflowJobAutoConfig
  • 启动项目进行测试

定时轮询取消订单

  • 模拟30秒未支付订单自动取消,使用ElasticSimpleJob实现
  • 模拟订单生成过程,编写创建订单方法,配置定时任务每5秒执行一次
public class Order {

    private Integer id;

    private BigDecimal amount;

    private Integer status;

    private String receiveName;

    private String receiveAddress;

    private String receiveMobile;

    private String createUser;

    private Date createTime;
    
    private String updateUser;

    private Date updateTime;
@Slf4j
@ElasticSimpleJob(
        jobName = "mySimpleJob" ,
        cron = "0/5 * * * * ?",
        shardingTotalCount = 1,
        overwrite = true
)
public class MySimpleJob implements SimpleJob {
    @Autowired
    private OrderService orderService;

    @Override
    public void execute(ShardingContext shardingContext) {
		//每5秒向数据库中插入10条订单数据
        for (int i=0;i<10;i++){
            orderService.insertOrder();
        }

    }
}
public int insertOrder(){
        Order order = new Order();
        order.setAmount(BigDecimal.TEN);
        order.setReceiveName("Green");
        order.setReceiveAddress("中国北京朝阳区xxx");
        order.setReceiveMobile("13811112222");
        order.setStatus(1);//未支付
        order.setCreateUser("Green");
        order.setCreateTime(new Date());
        order.setUpdateUser("Green");
        order.setUpdateTime(new Date());
        int i = orderMapper.insertSelective(order);
        return i;
    }
  • 编写处理超时订单的任务,编写超时订单sql,使用多线程取消订单,使用乐观锁实现取消订单业务(根据update_time),防止我们在取消订单的时候用户突然付款了
@ElasticSimpleJob(
        jobName = "orderCancelJob",
        cron = "0/15 * * * * ?",
        shardingTotalCount = 2,
        overwrite = true
)
public class OrderCancelJob implements SimpleJob {

    @Autowired
    private OrderService orderService;

    @Override
    public void execute(ShardingContext shardingContext) {
        //获取30秒之前的时间点
        Calendar now = Calendar.getInstance();
        now.add(Calendar.SECOND,-30);
        //订单尾号 % 分片总数 == 当前分片项
        List<Order> orders = orderService.getOrder(now,
                shardingContext.getShardingTotalCount(),shardingContext.getShardingItem());

        if (orders!=null&&orders.size()>0){
            //创建线程池
            ExecutorService es = Executors.newFixedThreadPool(4);
            for(Order order : orders){
                es.execute(()->{
                    //更新条件
                    Integer orderId = order.getId();
                    Date updateTime = order.getUpdateTime();
                    //更新内容
                    int status = 3;//已取消
                    String updateUser = "system";
                    Date updateNow = new Date();

                    orderService.cancelOrder(orderId,updateTime,status,updateUser,updateNow);
                });
            }
            es.shutdown();
        }
    }
}
<select id="getOrder" resultMap="BaseResultMap">
      select
        <include refid="Base_Column_List"/>
      from t_order
      <where>
        create_time &lt; #{param1}
        and status = 1
        and id % #{param2} = #{param3}
      </where>
</select>

<update id="cancelOrder">
      update t_order set
      status = #{param3},
      update_user = #{param4},
      update_time = #{param5}
      <where>
        id = #{param1}
        and update_time = #{param2}
      </where>
</update>
  • Junit测试取消订单业务是否正确
  • 使用@ElasticSimpleJob配置分片总数,定时规则
  • 启动定时任务,测试功能正确性

第三方订单导入案例

  • Dataflow流式作业实战【模拟第三方订单导入】
//第三方京东订单
public class JdOrder {

    private Integer id;

    private Integer status;

    private BigDecimal amount;

    private String createUser;

    private Date createTime;

    private String updateUser;

    private Date updateTime;
}
//第三方天猫订单
public class TmallOrder {

    private Integer id;

    private Integer orderStatus;

    private BigDecimal money;

    private String createUser;

    private Date createTime;

    private String updateUser;

    private Date updateTime;
}
//本系统订单
public class AllOrder {

    private Integer id;

    private Integer thirdOrderId;

    private Integer type;

    private BigDecimal totalAmount;

    private String createUser;

    private Date createTime;

    private String updateUser;

    private Date updateTime;
  • 编写添加订单作业,模拟第三方订单产生
@ElasticSimpleJob(
        jobName = "thirdOrderProduceJob",
        cron = "0/5 * * * * ?",
        shardingTotalCount = 1,
        overwrite = true
)
public class ThirdOrderProduceJob implements SimpleJob {
    @Autowired
    private OrderService orderService;
    @Override
    public void execute(ShardingContext shardingContext) {
        orderService.produceThirdOrder();
    }
}
public void produceThirdOrder() {
        for (int i=0;i<5;i++){
            Random random = new Random();
            int randomInt = random.nextInt(2);
            //京东订单
            if (randomInt ==0){
                log.info("插入京东订单");
                JdOrder jdOrder = new JdOrder();
                jdOrder.setStatus(0);//未抓取
                jdOrder.setAmount(BigDecimal.TEN);
                jdOrder.setCreateUser("jdUser");
                jdOrder.setCreateTime(new Date());
                jdOrder.setUpdateUser("jdUser");
                jdOrder.setUpdateTime(new Date());
                jdOrderMapper.insertSelective(jdOrder);
            }else {//天猫订单
                log.info("插入天猫订单");
                TmallOrder tmallOrder = new TmallOrder();
                tmallOrder.setOrderStatus(0);//未抓取
                tmallOrder.setMoney(new BigDecimal(100));
                tmallOrder.setCreateUser("tmallUser");
                tmallOrder.setCreateTime(new Date());
                tmallOrder.setUpdateUser("tmallUser");
                tmallOrder.setUpdateTime(new Date());
                tmallOrderMapper.insertSelective(tmallOrder);
            }
        }
    }
  • 使用Dataflow抓取第三方订单,分片总数为2,分片项0抓取京东订单,分片项1抓取天猫订单,作业每15秒执行一次,每个分片抓取5个订单
@ElasticDataflowJob(
        jobName = "fetchThirdOrderJob",
        cron = "0/15 * * * * ?",
        shardingTotalCount = 2,
        overwrite = true,
        streamingProcess = true
)
public class FetchThirdOrderJob implements DataflowJob<Object> {
    @Autowired
    private OrderService orderService;
    @Autowired
    private JdOrderMapper jdOrderMapper;
    @Autowired
    private TmallOrderMapper tmallOrderMapper;

    @Override
    public List<Object> fetchData(ShardingContext shardingContext) {
        //京东订单
        if (shardingContext.getShardingItem() == 0){
            //获取未抓取的5条数据
            List<JdOrder> jdOrders = jdOrderMapper.getNotFetchedOrder(5);
            if (jdOrders!=null&&jdOrders.size()>0){
                List<Object> jdOrderList = jdOrders.stream().map(jdOrder -> (Object) jdOrder).collect(toList());
                return jdOrderList;
            }
        }else {//天猫订单
            //获取未抓取的5条数据
            List<TmallOrder> tmallOrders = tmallOrderMapper.getNotFetchedOrder(5);
            if (tmallOrders!=null&&tmallOrders.size()>0){
                List<Object> tmallOrderList = tmallOrders.stream().map(tmallOrder -> (Object)tmallOrder).collect(toList());
                return tmallOrderList;
            }
        }
        return null;
    }

    @Override
    public void processData(ShardingContext shardingContext, List<Object> data) {
        //京东订单
        if (shardingContext.getShardingItem() ==0){
            if (data!=null&&data.size()>0){
                List<JdOrder> jdOrders = data.stream().map(d -> (JdOrder) d).collect(toList());
                for (JdOrder jdOrder : jdOrders){
                    AllOrder allOrder = new AllOrder();
                    allOrder.setThirdOrderId(jdOrder.getId());
                    allOrder.setType(0);//京东订单
                    allOrder.setTotalAmount(jdOrder.getAmount());
                    allOrder.setCreateUser("system");
                    allOrder.setCreateTime(new Date());
                    allOrder.setUpdateUser("system");
                    allOrder.setUpdateTime(new Date());
                    orderService.processJdOrder(allOrder);
                }
            }
        }else {//天猫订单
            if (data!=null&&data.size()>0){
                List<TmallOrder> tmallOrders = data.stream().map(d -> (TmallOrder) d).collect(toList());
                for (TmallOrder tmallOrder : tmallOrders){
                    AllOrder allOrder = new AllOrder();
                    allOrder.setThirdOrderId(tmallOrder.getId());
                    allOrder.setType(1);//天猫订单
                    allOrder.setTotalAmount(tmallOrder.getMoney());
                    allOrder.setCreateUser("system");
                    allOrder.setCreateTime(new Date());
                    allOrder.setUpdateUser("system");
                    allOrder.setUpdateTime(new Date());
                    orderService.processTmallOrder(allOrder);
                }
            }
        }


    }
}
<select id="getNotFetchedOrder" resultType="com.example.springbootelasticjob.model.JdOrder">
      SELECT
          <include refid="Base_Column_List"/>
      FROM
          jd_order
      WHERE
          `status` = 0
      LIMIT #{param1}
</select>

<select id="getNotFetchedOrder" resultType="com.example.springbootelasticjob.model.TmallOrder">
      select <include refid="Base_Column_List"/>
      from tmall_order
      <where>
        order_status =0
      </where>
      limit #{param1}
</select>
@Transactional
public void processJdOrder(AllOrder allOrder) {
    //订单入库
    allOrderMapper.insertSelective(allOrder);
    //修改京东订单状态
    JdOrder jdOrder = new JdOrder();
    jdOrder.setId(allOrder.getThirdOrderId());
    jdOrder.setStatus(1);//已抓取
    jdOrder.setUpdateUser("system");
    jdOrder.setUpdateTime(new Date());
    jdOrderMapper.updateByPrimaryKeySelective(jdOrder);
}

@Transactional
public void processTmallOrder(AllOrder allOrder) {
    //订单入库
    allOrderMapper.insertSelective(allOrder);
    //修改天猫订单状态
    TmallOrder tmallOrder = new TmallOrder();
    tmallOrder.setId(allOrder.getThirdOrderId());
    tmallOrder.setOrderStatus(1);//已抓取
    tmallOrder.setUpdateUser("system");
    tmallOrder.setUpdateTime(new Date());
    tmallOrderMapper.updateByPrimaryKeySelective(tmallOrder);
}
  • 测试

自定义分片策略

  • 分片策略:按照某种规则将分片项分配到生效的服务中

  • 现有的分片策略:

    • 平均分配分片(默认算法),比如有10个分片,3个服务,1=【0,1,2,9】,2=【3,4,5】,3=【6,7,8】

    • 作业名的哈希值奇偶数决定IP升降序算法的分片策略,比如2个分片,三个服务。当作业名称hash值为奇数时,1=【0】,2=【1】,3=【】。当作业名称hash值为偶数时,3=【0】,2=【1】,1=【】

  • 自定义分片策略步骤:

    • 自定义分片类实现JobShardingStrategy接口
    • 实现sharding方法
    • Sharding方法入参,服务实例列表,分片总数,job名称
    • 返回值,返回服务实例对应的分片项
    • 配置分片策略
    • JobShardingStrategyClass配置分片策略类的全路径
package com.example.springbootelasticjob.sharding;
//自定义分片策略
public class MyShardingStrategy implements JobShardingStrategy {
    @Override
    public Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances,
                                                    String jobName, int shardingTotalCount) {
        Map<JobInstance, List<Integer>> rtnMap = new HashMap<>();
        //分片队列
        ArrayDeque<Integer> queue = new ArrayDeque<>(shardingTotalCount);
        for (int i=0;i<shardingTotalCount;i++){
            queue.add(i);
        }
		//进行轮询给服务分配分片
        while (queue.size()>0){
            for (JobInstance jobInstance : jobInstances){
                if (queue.size()>0){
                    Integer shardingItem = queue.pop();
                    List<Integer> integers = rtnMap.get(jobInstance);
                    if (integers!=null&&integers.size()>0){
                        integers.add(shardingItem);
                    }else {
                        List<Integer> list = new ArrayList<>();
                        list.add(shardingItem);
                        rtnMap.put(jobInstance,list);
                    }
                }
            }
        }

        return rtnMap;
    }
}
package com.example.springbootelasticjob.job;

@ElasticSimpleJob(
        jobName = "myShardingJob",
        cron = "0/10 * * * * ?",
        overwrite = true,
        shardingTotalCount = 10,
        jobStrategy = MyShardingStrategy.class,
        jobEvent = true,
        jobListner = {MyNormalListener.class,MyNormalListener.class}
)
@Slf4j
public class MyShardingJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        log.info("我是分片项:"+shardingContext.getShardingItem());
    }
}

事件追踪

  • 作业的变更情况,配置数据源,对应的数据库自动创建表
  • 整合springboot详情请看SimpleJobAutoConfig和DataflowJobAutoConfig
  • 整合spring详情请看spring-config.xml

作业监听器

  • 监听作业执行前和作业执行后
  • 监听器类型分为两种:
    • 每个作业节点均执行,无需考虑分布式(推荐)
    • 分布式场景中,仅单一节点执行,整个任务只有一头一尾(谨慎使用)
  • 实现监听器接口,实现作业执行前,后方法
  • 每台作业均监听,需实现ElasticJobListener
  • 分布式场景中监听,需继承AbstractDistributeOnceElasticJobListener
  • 最后将监听器传入JobScheduler中
package com.example.springbootelasticjob.listener;

public class MyNormalListener  implements ElasticJobListener {
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("我是"+shardingContexts.getJobName()+"作业,在方法前!");
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("我是"+shardingContexts.getJobName()+"作业,在方法后!");
    }
}
package com.example.springbootelasticjob.listener;
//分布式监听器,存在缺陷,不推荐使用
public class MyDistributeListener extends AbstractDistributeOnceElasticJobListener {
    public MyDistributeListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) {
        super(startedTimeoutMilliseconds, completedTimeoutMilliseconds);
    }

    @Override
    public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) {
        System.out.println("我是分布式监听器,我在方法执行前");
    }

    @Override
    public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) {
        System.out.println("我是分布式监听器,我在方法执行后");
    }
}
  • 修改注解和自动配置类

上一篇 Quartz

Comments

Content