支持集群,伪分布式,支持动态添加任务
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>2.1.5</version>
</dependency>
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import java.time.LocalTime;
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext shardingContext) {
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+
",总分片项:"+shardingContext.getShardingTotalCount()+",taskId"+
shardingContext.getTaskId());
}
}
/**
* Hello world!
*
*/
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configuration()).init();
}
/**
* zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter(){
ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
"java-simple-job");
ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
//注册中心初始化
crc.init();
return crc;
}
/**
* job配置
* @return
*/
public static LiteJobConfiguration configuration() {
//job核心配置,每5秒执行一次任务,2个分片
JobCoreConfiguration jcc = JobCoreConfiguration
.newBuilder("mySimpleJob","0/5 * * * * ?",2)
.build();
//job类型配置,传入job类的全包名
SimpleJobConfiguration jtc = new SimpleJobConfiguration(jcc,
MySimpleJob.class.getCanonicalName());
//job根的配置(LiteJobConfiguration)
LiteJobConfiguration ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
DataflowJob<T>
泛型,泛型规定了抓取数据的返回类型//模拟订单
public class Order {
private Integer orderId;
//0:未处理;1:已处理
private Integer status;
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", status=" + status +
'}';
}
}
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.example.model.Order;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import static java.util.stream.Collectors.toList;
public class MyDataflowJob implements DataflowJob<Order> {
private List<Order> orders = new ArrayList<>();
//初始化100个订单
{
for (int i=0;i<100;i++){
Order order = new Order();
order.setOrderId(i+1);
//未处理
order.setStatus(0);
orders.add(order);
}
}
//抓取数据
@Override
public List<Order> fetchData(ShardingContext shardingContext) {
//订单号 % 分片总数 == 当前分片项
List<Order> orderList = orders.stream().filter(o -> o.getStatus() == 0)
.filter(o -> o.getOrderId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem())
.collect(toList());
List<Order> subList = null;
if (orderList!=null && orderList.size()>0){
//如果list不为空,每次抓取10条
subList = orderList.subList(0, 10);
}
try {
//模拟数据处理
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我抓取的数据是:"+subList);
return subList;
}
//数据处理
@Override
public void processData(ShardingContext shardingContext, List<Order> data) {
//修改订单状态为已处理
data.forEach(o->o.setStatus(1));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LocalTime time = LocalTime.now();
System.out.println(time+",我是分片项:"+shardingContext.getShardingItem()+",我正在处理数据!");
}
}
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configurationDataflow()).init();
}
/**
* zookeeper注册中心
* @return
*/
public static CoordinatorRegistryCenter zkCenter(){
ZookeeperConfiguration zc = new ZookeeperConfiguration("localhost:2181",
"java-simple-job");
ZookeeperRegistryCenter crc=new ZookeeperRegistryCenter(zc);
//注册中心初始化
crc.init();
return crc;
}
public static LiteJobConfiguration configurationDataflow() {
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder("myDataflowJob","0/10 * * * * ?",2)
.build();
//job类型配置,重复执行
var jtc = new DataflowJobConfiguration(jcc,
MyDataflowJob.class.getCanonicalName(),true);
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
public class App {
public static void main( String[] args ) {
System.out.println( "Hello World!" );
new JobScheduler(zkCenter(),configurationScript()).init();
}
public static LiteJobConfiguration configurationScript() {
//job核心配置
var jcc = JobCoreConfiguration
.newBuilder("myScriptJob","0/10 * * * * ?",2)
.misfire(false)
.build();
//job类型配置
var jtc = new ScriptJobConfiguration(jcc,"d:/test.cmd");
//job根的配置(LiteJobConfiguration)
var ljc = LiteJobConfiguration
.newBuilder(jtc)
.overwrite(true)
.build();
return ljc;
}
}
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spring.version>5.1.5.RELEASE</spring.version>
<elasticjob.version>2.1.5</elasticjob.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.26</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.1</version>
</dependency>
</dependencies>