引言
在XXL-JOB中,有一个xxl-job-admin项目,这个就相当于定时任务的调度平台,我们参考XXL-JOB,也添加这么一个调度平台,由于篇幅有限,我们先实现一个本地的定时任务调度平台,至于如何调用远程的定时任务,后面再进行讲解。
前期准备
首先我们创建一个springboot项目,引入下列依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| <dependencies> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.cronutils</groupId> <artifactId>cron-utils</artifactId> <version>9.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.3</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.1-jre</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> </dependencies>
|
创建yang-job表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| CREATE TABLE `yang_job` ( `id` int NOT NULL AUTO_INCREMENT, `job_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `description` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `cron` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `execute_strategy` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '执行策略:once/withFixedDelay/withFixedRate', `execute_class_path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `enable` tinyint(1) NOT NULL DEFAULT 1, `open` tinyint(1) NOT NULL DEFAULT 0, `create_time` datetime NOT NULL, `update_time` datetime NOT NULL, `features` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `execute_params` varchar(512) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 12 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
|
层级关系
对于分层,我们参考阿里巴巴的COLA项目,分为5层,分别是controller层、client层、applicatioon层、domain层和infrastructure层。如下图所示:
层级的依赖关系如下:
DDD四层架构,自上而下分别是用户界面层、应用层、领域层和基础设施层,上层可以依赖下层,而下层不能依赖上层。如下图所示:
COLA架构和DDD的四层架构很相似,但是它在四层架构的基础上,进一步进行了约束,如下图所示
在本项目中,尽量参考COLA架构的约束进行实现,但也会根据实际情况灵活进行修改,即便违背了COLA架构的相关约束条件,也会遵守DDD四层架构的约束。
项目搭建
client层
client其实在COLA中,属于可选的一层,主要用于存储服务对外透出的API和DTO。在client层中,所拥有的类如下所示:
api主要定义了对外提供的接口,这里我定义两个api——YangJobQueryService和YangJobService,一个是用于查询的,一个是用于命令的。
在dto中,定义了三个包,其中,common包存放的,是通用的dto,比如是结果类、错误信息类、分页类等。command包存放的,则是命令类,query包存放查询类,其余的数据实体,直接放在dto包下。
为什么要定义command和query?这个其实体现的是CQRS模式(Command Query Responsibility Seregation,命令查询职责分离)。所谓命令,指的是会引起数据发生变化操作的总称,比如新增、更新、删除等操作,该方法要么改变对象的内部状态,但不返回任何内容,要么只返回元数据。所谓查询,指不会对数据产生变化的操作,只是按照某些条件查找数据。
domain层
领域层主要封装了核心业务逻辑,并通过领域服务或领域对象的方法,对App提供业务实体和业务逻辑计算,领域是应用的核心,不依赖任何其他层次。
在目前的domain层中,其分层结构如上图所示,其中JobExecuteStrategyEnum就是我们之前提到的执行策略枚举。然后YangJobModel为具体的模型类,IJobModelRepository和JobModelQueryCondition为和数据库操作相关的类。
下面是IJobModelRepository的定义,其实这个IJobModelRepository的定义,违反我们刚才提到的”domain不依赖任何其他层次”,因为它使用到了client层提供的PageDTO,但是我觉得这里影响不大,如果说一定要遵循原则的话,可以在domain层也定义一个分页结果类,只是说在Application层中,如果涉及到分页结果的话,需要将domain层提供的分页结果转化为对外提供的分页结果。这里因为只是demo,就不做那么严格的限制了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package com.yang.job.domain.repository;
import com.yang.job.client.dto.common.PageDTO; import com.yang.job.domain.model.YangJobModel; import com.yang.job.domain.repository.request.JobModelQueryCondition;
import java.util.List;
public interface IJobModelRepository { boolean saveYangJobModel(YangJobModel yangJobModel);
boolean updateYangJobModel(YangJobModel yangJobModel);
YangJobModel getYangJobModelById(Integer id);
List<YangJobModel> getOpenYangJobModelList();
PageDTO<YangJobModel> queryYangJobModelPage(JobModelQueryCondition jobModelQueryCondition);
boolean deleteJobModel(Integer id); }
|
YangJobModel模型的代码如下所示,其实就目前来看,这个YangJobModel模型并没有什么业务逻辑,因为我们现在刚开始搭建,主要还是面向与增删改查的,所以业务逻辑相对来说较少。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| package com.yang.job.domain.model;
import com.yang.job.client.dto.common.BusinessException; import com.yang.job.client.dto.common.ErrorCode; import com.yang.job.domain.enums.JobExecuteStrategyEnum; import lombok.Data;
import java.io.Serializable; import java.util.Date; import java.util.HashMap; import java.util.Map;
@Data public class YangJobModel implements Serializable { private Integer jobId;
private String jobName;
private String description;
private String cron;
private String executeClassPath;
private Runnable runnable;
private JobExecuteStrategyEnum executeStrategy;
private Integer enable;
private Integer open;
private Date createTime;
private Date updateTime;
private Map<String, String> featureMap = new HashMap<>();
private Map<String, String> executeParamMap = new HashMap<>();
public boolean isEnable() { if (this.enable == null) { return false; } return this.enable == 1; }
public boolean isOpen() { if (!isEnable()) { return false; } if (this.open == null) { return false; } return this.open == 1; }
public boolean isClose() { return !isOpen(); }
public void setExecuteClassPath(String executeClassPath) { if (executeClassPath == null || executeClassPath.isEmpty()) { throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH); } try { Class.forName(executeClassPath); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH); } this.executeClassPath = executeClassPath; }
public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) { if (jobExecuteStrategyEnum == null) { throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST); } this.executeStrategy = jobExecuteStrategyEnum; } }
|
application层
应用层主要负责获取输入、组装上下文、参数校验、调用领域层做业务处理,如果需要的话,发送消息通知等,层次是开放的,应用层也可以绕过领域层,直接访问基础设施层。
目前application相对比较简单,convertor包用于将我们的模型转化外对外提供的DTO类。
YangJobApplicationService的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| @Service public class YangJobApplicationService implements YangJobService { @Resource private IJobModelRepository jobModelRepository;
@Resource private YangJobDTOConvertor yangJobDTOConvertor;
@Override public Response<YangJobDTO> saveYangJob(NewYangJobCommand newYangJobCommand) { YangJobModel yangJobModel = convert2YangJobModel(newYangJobCommand); if (jobModelRepository.saveYangJobModel(yangJobModel)) { YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); } return Response.fail(); }
private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) { YangJobModel yangJobModel = new YangJobModel(); yangJobModel.setJobName(newYangJobCommand.getJobName()); yangJobModel.setDescription(newYangJobCommand.getDescription()); yangJobModel.setCron(newYangJobCommand.getCron()); yangJobModel.setOpen(newYangJobCommand.getOpen()); yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy())); yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath()); yangJobModel.setExecuteParamMap(newYangJobCommand.getParams()); return yangJobModel; }
@Override public Response<YangJobDTO> updateYangJob(UpdateYangJobCommand updateYangJobCommand) { YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(updateYangJobCommand.getJobId()); if (yangJobModel == null) { return Response.fail(); }
if (updateYangJobCommand.getJobName() != null) { yangJobModel.setJobName(updateYangJobCommand.getJobName()); } if (updateYangJobCommand.getCron() != null) { yangJobModel.setCron(updateYangJobCommand.getCron()); } if (updateYangJobCommand.getOpen() != null) { yangJobModel.setOpen(updateYangJobCommand.getOpen()); } if (!updateYangJobCommand.getParams().isEmpty()) { yangJobModel.setExecuteParamMap(updateYangJobCommand.getParams()); } if (updateYangJobCommand.getExecuteClassPath() != null) { yangJobModel.setExecuteClassPath(updateYangJobCommand.getExecuteClassPath()); } if (updateYangJobCommand.getExecuteStrategy() != null) { yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(updateYangJobCommand.getExecuteStrategy())); } if (jobModelRepository.updateYangJobModel(yangJobModel)) { YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); } return Response.fail(); }
@Override public Response<YangJobDTO> deleteYangJob(DeleteYangJobCommand deleteYangJobCommand) { Integer jobId = deleteYangJobCommand.getJobId(); if (jobModelRepository.deleteJobModel(jobId)) { return Response.success(); } return Response.fail(); } }
|
YangJobQueryApplicationService的实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| @Service public class YangJobQueryApplicationService implements YangJobQueryService { @Resource private IJobModelRepository jobModelRepository;
@Resource private YangJobDTOConvertor yangJobDTOConvertor;
@Override public Response<PageDTO<YangJobDTO>> queryYangJobPage(PageYangJobQuery pageYangJobQuery) { JobModelQueryCondition jobModelQueryCondition = convert2JobModelQueryCondition(pageYangJobQuery); PageDTO<YangJobModel> modelPageDTO = jobModelRepository.queryYangJobModelPage(jobModelQueryCondition); PageDTO<YangJobDTO> pageDTO = new PageDTO<>(); pageDTO.setPageNo(modelPageDTO.getPageNo()); pageDTO.setPageSize(modelPageDTO.getPageSize()); pageDTO.setPages(modelPageDTO.getPages()); pageDTO.setTotal(modelPageDTO.getTotal());
List<YangJobDTO> dataList = modelPageDTO.getDataList().stream() .map(yangJobDTOConvertor::convert2DTO) .collect(Collectors.toList()); pageDTO.setDataList(dataList); return Response.success(pageDTO); }
@Override public Response<YangJobDTO> getYangJobDTOById(Integer id) { YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(id); YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); }
private JobModelQueryCondition convert2JobModelQueryCondition(PageYangJobQuery pageYangJobQuery) { JobModelQueryCondition jobModelQueryCondition = new JobModelQueryCondition(); jobModelQueryCondition.setPageNo(pageYangJobQuery.getPageNo()); jobModelQueryCondition.setPageSize(pageYangJobQuery.getPageSize()); jobModelQueryCondition.setOpen(pageYangJobQuery.getOpen()); return jobModelQueryCondition; } }
|
这两个类,也是一个用于命令,一个用于查询。命令查询职责分离,有一个主要的作用就是实现读写分离,有时候,读取所需要的模型和写入所需要的模型是不同的,比如说一些密码、地址等私密信息,我们在读取的时候,不会读取到这些数据,而在写入的时候,需要对这些数据做一些校验、加密等操作,因此就需要将读写模型进行分离。如果在本项目中,也要进行读写分离的话,那么我们可以将YangJobModel用于命令,然后额外创建一个YangJobQueryModel用于查询,不过一般情况下,无有必要勿增实体,目前我们的YangJobModel够用了,如果后续有相应的需求,我们再将读写分离也为时不晚。
controller层
controller层即COLA中的适配层,负责对前端展示的路由和适配,对于传统的B/S系统而言,adapter就相当于MVC中的controller层。
controller层的结构如下图所示,其实就和普通的MVC层一样,定义一些VO类,然后调用application层的方法返回信息。
infrastructure层
infrastructure层主要负责技术细节问题的处理,比如数据库的CRUD、搜索引擎、文件系统、分布式服务的RPC等,此外,领域防腐的重任也在这里,外部依赖需要通过gateway的转义处理,才能被上面的App层和domain层使用。
基础设施层的内容如下图所示,data包是数据库实体和对应的mapper,exception是全局异常处理,gateway则是我们的防腐层,job包为上一章提到的定时任务基础设施。
application.yml
application.yml配置信息如下所示:
1 2 3 4 5 6 7 8 9 10 11 12
| spring: datasource: username: root password: 3fa4d180 driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql: thymeleaf: cache: false prefix: classpath:/templates/ encoding: UTF-8 suffix: .html mode: HTML
|
测试
启动项目,通过接口调用,进行测试。因为我们的YangJobModel模型对executeClassPath做了校验,以确保执行的类实现了IJobExecutor接口,因此,为了方便测试,我们添加两个测试类,实现IJobExecutor接口进行测试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.yang.job.application.task;
import com.yang.job.infra.job.execute.IYangJobExecutor; import com.yang.job.infra.job.execute.YangJobExecuteRequest;
import java.text.SimpleDateFormat; import java.util.Date;
public class HelloWorldJobExecutor implements IYangJobExecutor { @Override public void execute(YangJobExecuteRequest yangJobExecuteRequest) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowTime = simpleDateFormat.format(new Date()); System.out.println(String.format("Hello world! jobId: %s, nowTime:%s", yangJobExecuteRequest.getJobId(), nowTime)); } }
package com.yang.job.application.task;
import com.yang.job.infra.job.execute.IYangJobExecutor; import com.yang.job.infra.job.execute.YangJobExecuteRequest;
import java.text.SimpleDateFormat; import java.util.Date;
public class TestJobExecutor implements IYangJobExecutor { @Override public void execute(YangJobExecuteRequest yangJobExecuteRequest) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowTime = simpleDateFormat.format(new Date()); System.out.println(String.format("class: %s, Hello world! jobId:%s, nowTime:%s", this.getClass().getName(), yangJobExecuteRequest.getJobId(), nowTime )); } }
|
然后我们,启动项目,调用接口,进行测试
启动时注册开启的定时任务
上述流程跑通后,我们查看数据库,可以看到对应的记录,说明项目的增删改查基本没有什么问题,但是,我们从数据库中,可以看到,这些定时任务是开启的,而我们现在的后端代码中,还没有将这些定时任务添加到YangJobManager中进行定时任务的执行或取消。
因此,我们先修改YangJobModel模块中,添加提交任务的方法submitJob,这里就不符合COLA中domain不依赖其他层级的要求,因为它依赖了infrastructure基础设施层,但是仍然符合DDD四层架构中,上层依赖下层,而下层不依赖上层的要求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| package com.yang.job.domain.model;
import com.yang.job.client.dto.common.BusinessException; import com.yang.job.client.dto.common.ErrorCode; import com.yang.job.domain.enums.JobExecuteStrategyEnum; import com.yang.job.infra.job.YangJobManager; import com.yang.job.infra.job.request.YangJobSubmitParam; import com.yang.job.infra.utils.CronUtils; import com.yang.job.infra.utils.SpringContextUtils; import lombok.Data;
import java.io.Serializable; import java.time.ZonedDateTime; import java.util.Date; import java.util.HashMap; import java.util.Map;
@Data public class YangJobModel implements Serializable { private Integer jobId;
private String jobName;
private String description;
private String cron;
private String executeClassPath;
private Runnable runnable;
private JobExecuteStrategyEnum executeStrategy;
private Integer enable;
private Integer open;
private Date createTime;
private Date updateTime;
private Map<String, String> featureMap = new HashMap<>();
private Map<String, String> executeParamMap = new HashMap<>();
public boolean isEnable() { if (this.enable == null) { return false; } return this.enable == 1; }
public boolean isOpen() { if (!isEnable()) { return false; } if (this.open == null) { return false; } return this.open == 1; }
public boolean isClose() { return !isOpen(); }
public void setExecuteClassPath(String executeClassPath) { if (executeClassPath == null || executeClassPath.isEmpty()) { throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH); } try { Class.forName(executeClassPath); } catch (ClassNotFoundException e) { e.printStackTrace(); throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH); } this.executeClassPath = executeClassPath; }
public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) { if (jobExecuteStrategyEnum == null) { throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST); } this.executeStrategy = jobExecuteStrategyEnum; }
public void submitJob() { YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam(); YangJobManager yangJobManager = getYangJobManager(); yangJobManager.submitJob(yangJobSubmitParam); }
private YangJobSubmitParam convert2YangJobSubmitParam() { YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam(); yangJobBuildParam.setJobId(this.jobId); yangJobBuildParam.setRunnable(this.runnable); ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now()); ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime); long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli(); long executeEochMill = nextExecutionTime.toInstant().toEpochMilli(); long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli(); yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000); yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000); yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy); return yangJobBuildParam; }
private YangJobManager getYangJobManager() { return SpringContextUtils.getBeanOfType(YangJobManager.class); } }
|
然后,我们添加YangJobContext类,作为定时任务的上下文类,在SpringBoot刷新容器的时候,监听到该刷新事件,然后从数据库中获取开启的定时任务,将该任务提交到YangJobManager中进行定时执行。当SpringBoot关闭容器的时候,监听关闭事件,然后调用YangJobManager的shutdown,关闭对应的线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.yang.job.infra.job;
import com.yang.job.domain.gateway.repository.IJobModelRepository; import com.yang.job.domain.model.YangJobModel; import com.yang.job.infra.utils.SpringContextUtils; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ApplicationContextEvent; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent;
import java.util.List;
public class YangJobContext implements ApplicationListener<ApplicationContextEvent> { private static YangJobContext instance;
private ApplicationContext applicationContext;
@Override public void onApplicationEvent(ApplicationContextEvent event) { if (event instanceof ContextRefreshedEvent) { System.out.println("刷新了========="); YangJobContext.instance = this; instance.applicationContext = applicationContext; init(); } else if (event instanceof ContextClosedEvent) { System.out.println("销毁了========="); shutdown(); } }
private void init() { IJobModelRepository jobModelRepository = SpringContextUtils.getBeanOfType(IJobModelRepository.class); List<YangJobModel> yangJobModelList = jobModelRepository.getOpenYangJobModelList(); for (YangJobModel yangJobModel : yangJobModelList) { yangJobModel.submitJob(); } }
private void shutdown() { YangJobManager yangJobManager = SpringContextUtils.getBeanOfType(YangJobManager.class); if (yangJobManager == null) { return; } yangJobManager.shutdown(); } }
|
最后,我们添加YangJob相关的配置类,将YangJobManager和YangJobContext注册到SpringBoot容器中进行统一管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yang.job.infra.job.configuration;
import com.yang.job.infra.job.YangJobContext; import com.yang.job.infra.job.YangJobManager; import com.yang.job.infra.job.thread.YangJobThreadFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
@Configuration public class YangJobConfiguration { @Bean public ScheduledThreadPoolExecutor scheduledThreadPoolExecutor() { int availableProcessors = Runtime.getRuntime().availableProcessors(); return new ScheduledThreadPoolExecutor(availableProcessors, new YangJobThreadFactory("yang")); }
@Bean public YangJobManager yangJobManager() { return new YangJobManager.YangJobManagerBuilder() .setScheduledExecutorService(scheduledThreadPoolExecutor()) .build(); } @Bean public YangJobContext yangJobContext() { return new YangJobContext(); } }
|
添加上述代码后,我们重新启动项目,然后观察控制台,如下图所示,jobId为12的只执行一次,jobId为13的每隔5秒执行一次,这个和它们在数据库中的执行策略保持一致,前者为once,后者为withFixedDelay。
领域事件
添加定时任务后置事件
我们在添加定时任务时,接口的入参如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.yang.job.client.dto.command;
import lombok.Data;
import java.io.Serializable; import java.util.HashMap; import java.util.Map;
@Data public class NewYangJobCommand implements Serializable { private String jobName;
private String description;
private String cron;
private String executeStrategy;
private String executeClassPath;
private Integer open;
private Map<String, String> params = new HashMap<>(); }
|
我们看到,可以通过open参数,来决定新建的定时任务是否开启,在之前的测试中,我们添加的定时任务open参数都设置为1,从数据库的角度看,数据确实持久化了,并且open的值也为1,但是从行为看是不对的,我们添加该任务,并且该任务是open的,那么当我们添加成功后,应该将该任务注册到YangJobManager。
最简单的方式,便是在添加成功后,做判断,如果open为1,那么调用YangJobModel的submitJob方法
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public Response<YangJobDTO> saveYangJob(NewYangJobCommand newYangJobCommand) { YangJobModel yangJobModel = convert2YangJobModel(newYangJobCommand); if (jobModelRepository.saveYangJobModel(yangJobModel)) { if (yangJobModel.isOpen()) { yangJobModel.submitJob(); } YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); } return Response.fail(); }
|
但上述方法有一个问题,如果后续我们多了其他需求,比如虽然open为0,但是这个任务比较特殊,初次入库时必须执行一次等等奇奇怪怪的需求,那么我们在添加成功后,需要有一大串判断逻辑,影响后续的维护。
为解决上述问题,我们可以通过事件的方式,进行解耦。在我的第一份实习中,遇到的项目,就是用事件的方式,来进行解耦,一开始我很不理解这种方式,因为发送的事件太多了,从开始到结束将近10个事件,每次看代码都要从一个事件处理类,跳到另一个事件处理类,但是后来看了一些DDD的书籍后,有点理解这种做法了,这种应该就是属于领域事件,因为这些事件,将不同子域串联起来,比如从收单到支付再到账户等域,通过领域事件,将这些不同的域串联起来,此外又不会使两个域之间存在强耦合的代码,从而方便后续的维护。
因此,这里也使用事件的方式,在将定时任务持久化到数据库后,发送一个持久化成功后置处理事件,由对应的事件处理者,进行逻辑判断以及相关的业务处理。
首先,我们在基础设施层中,添加事件的相关类。首先是我们定义一个事件类:
1 2 3 4 5
| package com.yang.job.infra.event;
public interface IEvent <T> { T getData(); }
|
然后定义对应的事件处理类:
1 2 3 4 5
| package com.yang.job.infra.event;
public interface IEventHandler <Event extends IEvent> { void execute(Event event); }
|
添加EventCenter事件中心类,用于推送同步事件或异步事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.yang.job.infra.event;
import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit;
public class EventCenter { private EventBus eventBus = new EventBus();
private AsyncEventBus asyncEventBus;
private ExecutorService asyncEventExecutorService;
private static Map<String, IEventHandler> eventHandlerMap = new ConcurrentHashMap<>();
public EventCenter(ExecutorService executorService) { this.asyncEventExecutorService = executorService; asyncEventBus = new AsyncEventBus(executorService); }
public void postEvent(IEvent iEvent) { String eventClassName = iEvent.getClass().getName(); IEventHandler iEventHandler = eventHandlerMap.get(eventClassName); eventBus.register(iEventHandler); eventBus.post(iEvent); }
public void asyncPostEvent(IEvent iEvent) { String eventClassName = iEvent.getClass().getName(); IEventHandler iEventHandler = eventHandlerMap.get(eventClassName); asyncEventBus.register(iEventHandler); asyncEventBus.post(iEvent); }
public void shutdown() { if (this.asyncEventExecutorService == null) { return; } if (!asyncEventExecutorService.isShutdown()) { asyncEventExecutorService.shutdown(); try { if (!asyncEventExecutorService.awaitTermination(10, TimeUnit.SECONDS)) { asyncEventExecutorService.shutdownNow(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
public static void registerEventHandler(String eventName, IEventHandler iEventHandler) { eventHandlerMap.put(eventName, iEventHandler); } }
|
因为我们使用的是google的guava事件框架,在guava中,事件处理者要在相关的方法上,添加@Subscribe注解,以便能正确监听到事件,但很多时候,我们在定义事件处理者的时候,经常会忘记加上一些注解,导致后续达不到预期的效果,难以排除问题,因此,这里添加EventBeanPostProcessor类,实现BeanPostProcessor接口,在SpringBoot的bean构造完毕后,我们检查这些事件处理者bean,判断其是否由subscribe注解。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.yang.job.infra.event.schema;
import com.google.common.eventbus.Subscribe; import com.yang.job.client.dto.common.BusinessException; import com.yang.job.client.dto.common.ErrorCode; import com.yang.job.infra.event.EventCenter; import com.yang.job.infra.event.IEventHandler; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Component public class EventBeanPostProcessor implements BeanPostProcessor
Method subscribeMethod = getSubscribeMethod(bean); Class<?>[] parameterTypes = subscribeMethod.getParameterTypes(); String eventName = parameterTypes[0].getName(); EventCenter.registerEventHandler(eventName, (IEventHandler) bean); return bean; }
private Method getSubscribeMethod(Object bean)
Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length > 1)
if (parameterTypes[0].isInterface())
++ subscribeCount; subscribeMethod = method; } if (subscribeCount == 0)
else if (subscribeCount > 1)
return subscribeMethod; } }
|
然后添加EventBusContext上下文类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yang.job.infra.event;
import com.yang.job.infra.utils.SpringContextUtils; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ApplicationContextEvent; import org.springframework.context.event.ContextClosedEvent; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.core.Ordered;
public class EventBusContext implements ApplicationListener<ApplicationContextEvent>, Ordered { private static EventBusContext instance;
private ApplicationContextEvent applicationContextEvent;
@Override public void onApplicationEvent(ApplicationContextEvent event) { if (event instanceof ContextRefreshedEvent) { System.out.println("刷新了=============="); EventBusContext.instance = this; instance.applicationContextEvent = applicationContextEvent; } else if (event instanceof ContextClosedEvent) { System.out.println("销毁了=============="); EventCenter eventcenter = SpringContextUtils.getBeanOfType(EventCenter.class); eventcenter.shutdown(); } }
@Override public int getOrder() { return 30; } }
|
最后,添加event相关的配置类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.yang.job.infra.event.configuration;
import com.yang.job.infra.event.EventBusContext; import com.yang.job.infra.event.EventCenter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.concurrent.*;
@Configuration public class EventBusConfiguration { @Bean public ExecutorService asyncEventExecutorService() { return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2 + 1, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(200), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy()); }
@Bean public EventCenter eventCenter() { return new EventCenter(asyncEventExecutorService()); }
@Bean public EventBusContext eventBusContext() { return new EventBusContext(); } }
|
上述event基础设施搭建完毕后,我们开始添加对应的事件。但是这个事件应该放在哪一层,是比较困惑我的,因为这个不算领域事件,毕竟我们项目中并没有涉及到其他的域,但是如果放在application层又不太好,毕竟application只是对业务流程的编排,是一层薄薄的封装,而这些事件处理者是承担一部分业务逻辑的,最后决定还是放在domain层,理由是domain层主要是业务逻辑的封装,而事件也是业务逻辑的一部分。
我们添加定时任务添加成功后置事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package com.yang.job.domain.event;
import com.yang.job.infra.event.IEvent;
public class SaveJobPostEvent implements IEvent<Integer> { private Integer jobId;
public SaveJobPostEvent(Integer jobId) { this.jobId = jobId; }
@Override public Integer getData() { return this.jobId; } }
|
添加对应的事件处理者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yang.job.domain.event.handler;
import com.google.common.eventbus.Subscribe; import com.yang.job.domain.event.SaveJobPostEvent; import com.yang.job.domain.gateway.repository.IJobModelRepository; import com.yang.job.domain.model.YangJobModel; import com.yang.job.infra.event.IEventHandler; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component public class SaveJobPostEventHandler implements IEventHandler<SaveJobPostEvent> { @Resource private IJobModelRepository jobModelRepository;
@Override @Subscribe public void execute(SaveJobPostEvent event) { Integer jobId = event.getData(); YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(jobId); if (yangJobModel == null) { return; } if (!yangJobModel.isOpen()) { return; } yangJobModel.submitJob(); } }
|
然后我们修改YangJobApplicationService的saveYangJob方法,在持久化数据库成功后,发送持久化成功后置事件:
1 2 3 4 5 6 7 8 9 10 11
| @Override public Response<YangJobDTO> saveYangJob(NewYangJobCommand newYangJobCommand) { YangJobModel yangJobModel = convert2YangJobModel(newYangJobCommand); if (jobModelRepository.saveYangJobModel(yangJobModel)) { yangJobModel.postSaveJobEvent(); YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); } return Response.fail(); }
|
修改YangJobModel,加上推送事件的方法:
1 2 3 4 5 6 7 8
| public void postSaveJobEvent() { SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId); getEventCenter().asyncPostEvent(saveJobPostEvent); }
private EventCenter getEventCenter() { return SpringContextUtils.getBeanOfType(EventCenter.class); }
|
重新启动项目,我们再次添加一个定时任务,并将定时任务设置为open
添加成功后,我们查看控制台,可以看到,确实有发送对应的事件,将定时任务注册到YangJobManger中。
更新定时任务后置事件
添加更新定时任务后置事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| package com.yang.job.domain.event;
import com.yang.job.infra.event.IEvent;
public class UpdateJobPostEvent implements IEvent<Integer> { private Integer jobId;
public UpdateJobPostEvent(Integer jobId) { this.jobId = jobId; } @Override public Integer getData() { return this.jobId; } }
|
添加对应的事件处理者,当我们更新定时任务后,可能更新的内容是将定时任务关闭或开启,也可能是对应的策略,因此,我们的事件处理者会首先关闭对应的定时任务,然后再判断定时任务是否开启,开启的话,重新将任务注册到YangJobManager中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yang.job.domain.event.handler;
import com.google.common.eventbus.Subscribe; import com.yang.job.domain.event.UpdateJobPostEvent; import com.yang.job.domain.gateway.repository.IJobModelRepository; import com.yang.job.domain.model.YangJobModel; import com.yang.job.infra.event.IEventHandler; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component public class UpdateJobPostEventHandler implements IEventHandler<UpdateJobPostEvent> { @Resource private IJobModelRepository jobModelRepository;
@Override @Subscribe public void execute(UpdateJobPostEvent event) { Integer jobId = event.getData(); YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(jobId); if (yangJobModel == null) { return; } yangJobModel.cancelJob(); if (yangJobModel.isOpen()) { yangJobModel.submitJob(); } } }
|
修改YangJobModel,加上取消任务和推送更新后置事件的方法
1 2 3 4 5 6 7 8 9
| public void cancelJob() { YangJobManager yangJobManager = getYangJobManager(); yangJobManager.cancelJob(this.jobId); }
public void postUpdateJobEvent() { UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId); getEventCenter().asyncPostEvent(updateJobPostEvent); }
|
最后,我们修改YangJobApplicationService的updateYangJob方法,在持久化数据库后,发送更新定时任务后置事件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Override public Response<YangJobDTO> updateYangJob(UpdateYangJobCommand updateYangJobCommand) { YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(updateYangJobCommand.getJobId()); if (yangJobModel == null) { return Response.fail(); }
if (updateYangJobCommand.getJobName() != null) { yangJobModel.setJobName(updateYangJobCommand.getJobName()); } if (updateYangJobCommand.getCron() != null) { yangJobModel.setCron(updateYangJobCommand.getCron()); } if (updateYangJobCommand.getOpen() != null) { yangJobModel.setOpen(updateYangJobCommand.getOpen()); } if (!updateYangJobCommand.getParams().isEmpty()) { yangJobModel.setExecuteParamMap(updateYangJobCommand.getParams()); } if (updateYangJobCommand.getExecuteClassPath() != null) { yangJobModel.setExecuteClassPath(updateYangJobCommand.getExecuteClassPath()); } if (updateYangJobCommand.getExecuteStrategy() != null) { yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(updateYangJobCommand.getExecuteStrategy())); } if (jobModelRepository.updateYangJobModel(yangJobModel)) { // 发送事件 yangJobModel.postUpdateJobEvent(); YangJobDTO yangJobDTO = yangJobDTOConvertor.convert2DTO(yangJobModel); return Response.success(yangJobDTO); } return Response.fail(); }
|
重启项目,我们先观察控制台,可以看到jobId为14的任务每5秒执行一次
然后我们调用close接口,关闭jobId为14的定时任务。
调用成功后,再次查看控制台,可见此时该定时任务不再执行。
删除定时任务后置事件
因为我们这里的删除,其实都是逻辑删除,本质上其实就是更新了enable字段,因此,删除成功后,推送更新定时任务后置事件即可,不过,这里虽然没有专门的删除定时任务后置事件,但还是在YangJobModel中,添加一个postDeleteJobEvent方法。
1 2 3 4
| public void postDeleteJobEvent() { UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId); getEventCenter().asyncPostEvent(updateJobPostEvent); }
|
然后我们修改YangJobApplicationService的deleteYangJob方法,在删除操作成功后,调用postDeleteJobEvent方法,推送对应的事件。
1 2 3 4 5 6 7 8 9 10
| @Override public Response<YangJobDTO> deleteYangJob(DeleteYangJobCommand deleteYangJobCommand) { Integer jobId = deleteYangJobCommand.getJobId(); if (jobModelRepository.deleteJobModel(jobId)) { YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(jobId); yangJobModel.postDeleteJobEvent(); return Response.success(); } return Response.fail(); }
|
提交任务后置事件
之前我们提到,任务的执行策略有四种——立即执行、执行一次、任务执行完毕后间隔执行、任务执行开始后间隔执行。但是我们现在的系统中,假设前两种策略对应的定时任务执行过了,那么按道理来说,不应该再继续执行了,可是,当我们重启系统时,因为它们在数据库中的open字段值为1,也就是任务状态是开启的,所以会再次被执行。
为解决上述问题,我们在提交任务给YangJobManager后,发送一个提交任务后置事件,在对应的事件处理者中,判断执行策略,如果是前两种策略,那么我们提交成功后需要把任务状态设置为关闭。
首先,我们添加提交任务后置事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.yang.job.domain.event;
import com.yang.job.infra.event.IEvent; import com.yang.job.infra.job.request.YangJobSubmitParam;
public class SubmitJobPostEvent implements IEvent<YangJobSubmitParam> { private YangJobSubmitParam yangJobSubmitParam;
public SubmitJobPostEvent(YangJobSubmitParam yangJobSubmitParam) { this.yangJobSubmitParam = yangJobSubmitParam; }
@Override public YangJobSubmitParam getData() { return yangJobSubmitParam; } }
|
添加对应的事件处理者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.yang.job.domain.event.handler;
import com.google.common.eventbus.Subscribe; import com.yang.job.domain.enums.JobExecuteStrategyEnum; import com.yang.job.domain.event.SubmitJobPostEvent; import com.yang.job.domain.gateway.repository.IJobModelRepository; import com.yang.job.domain.model.YangJobModel; import com.yang.job.infra.event.IEventHandler; import com.yang.job.infra.job.request.YangJobSubmitParam; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component public class SubmitJobPostEventHandler implements IEventHandler<SubmitJobPostEvent> { @Resource private IJobModelRepository jobModelRepository;
@Override @Subscribe public void execute(SubmitJobPostEvent event) { YangJobSubmitParam yangJobSubmitParam = event.getData(); JobExecuteStrategyEnum jobExecuteStrategy = yangJobSubmitParam.getJobExecuteStrategy(); if (jobExecuteStrategy == JobExecuteStrategyEnum.IMMEDIATE_EXECUTE || jobExecuteStrategy == JobExecuteStrategyEnum.ONCE) { YangJobModel yangJobModel = jobModelRepository.getYangJobModelById(yangJobSubmitParam.getJobId()); if (!yangJobModel.isOpen()) { return; } yangJobModel.setOpen(0); jobModelRepository.updateYangJobModel(yangJobModel); } } }
|
最后,我们修改YangJobModel的submitJob方法,在提交任务后,发送提交任务后置事件
1 2 3 4 5 6 7 8
| public void submitJob() { YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam() YangJobManager yangJobManager = getYangJobManager() yangJobManager.submitJob(yangJobSubmitParam) // 提交任务后,发送提交任务后置事件 SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam) getEventCenter().postEvent(submitJobPostEvent) }
|
上述准备完毕后,我们启动项目:
在数据库中,jobId=12的,执行策略为once,那么在启动项目后,发送提交任务后置事件,会将jobId的open属性重新设置为0
我们启动项目后,刷新数据库,可见open确实被重新设置为0。