手撸XXL-JOB(四)—远程调用定时任务

Java Socket网络编程

网络编程是Java编程中的重要组成部分,包括服务端和客户端两部分内容。Socket是Java网络编程的基本组件之一,用于在应用程序之间提供双向通信,Socket提供了一种标准的接口,允许应用程序通过网络发送和接收数据,在Java中,Socket可以分为客户端Socket和服务端Socket两种类型。
客户端Socket:客户端 Socket 用于与服务端 Socket 进行通信。客户端 Socket 通过指定服务端的 IP 地址和端口号,连接到服务端 Socket,然后发送数据到服务端 Socket。
服务端Socket:服务端 Socket 用于接收来自客户端 Socket 的连接请求,并在连接成功后,与客户端 Socket 进行通信。服务端 Socket 首先需要创建一个 ServerSocket 对象,并通过 bind 方法绑定到一个本地端口,然后等待客户端 Socket 的连接请求。
下面是Socket的一个示例:
服务端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class Server {
public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8000);
System.out.println("Server started, waiting for client...");

Socket socket = serverSocket.accept();
System.out.println("Client connected.");

BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

String message;
while ((message = in.readLine()) != null) {
System.out.println("Client:" + message);
out.println("Server received message:" + message);
}

in.close();
out.close();
socket.close();
serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.example.demo1;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {
public static void main(String[] args) {
try {
Socket socket = new Socket("localhost", 8000);
System.out.println("Connected to server.");

BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);

BufferedReader consoleIn = new BufferedReader(new InputStreamReader(System.in));
String message;
while ((message = consoleIn.readLine()) != null) {
out.println(message);
System.out.println("Server:" + in.readLine());
}
consoleIn.close();
in.close();
out.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

首先启动服务端,然后启动客户端,在客户端的控制台,输入数据,服务端能接收到数据并返回对应的响应。

远程调用定时任务

首先,我们创建两个模块,core模块包含yang-job的一些核心内容,比如IJobExecutor执行器、JobExecuteRequest执行器请求等;client模块依赖core模块,并封装和socket客户端调用相关的一些内容。
然后创建一个sample1模块,用于演示。

core模块

core目前定义了定时任务执行类和其入参、出参等信息,其中,YangJobTransferDTO包含任务类路径和任务请求,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.yang.job.dto;

import com.yang.job.execute.YangJobExecuteRequest;

import java.io.Serializable;

public class YangJobTransferDTO implements Serializable {
private String className;

private YangJobExecuteRequest yangJobExecuteRequest;

public String getClassName() {
return className;
}

public void setClassName(String className) {
this.className = className;
}

public YangJobExecuteRequest getYangJobExecuteRequest() {
return yangJobExecuteRequest;
}

public void setYangJobExecuteRequest(YangJobExecuteRequest yangJobExecuteRequest) {
this.yangJobExecuteRequest = yangJobExecuteRequest;
}
}

client模块

client模块定义了客户端所需要的一些类,其中,YangJob为注解类,对于每一个定时任务,需要加上YangJob注解,才能被正确调用。

1
2
3
4
5
6
7
8
9
package com.yang.job.client.annotations;

import java.lang.annotation.*;

@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface YangJob {
}

YangJobClientProperty为配置信息类,目前需要两个配置信息,客户端socket的ip和端口号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yang.job.client.configuration;


import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class YangJobClientProperty {
@Value("${yang-job.executor.port}")
private Integer port;

@Value("${yang-job.executor.ip}")
private String ip;


public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}
}

YangJobClientPostProcessor在SpringBoot加载完毕后,扫描bean,将实现IYongJobExecutor的bean,注册到YangJobClientManager的map中,方便后续调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.yang.job.client.schema;


import com.yang.job.client.annotations.YangJob;
import com.yang.job.client.YangJobClientManager;
import com.yang.job.execute.IYangJobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

public class YangJobClientPostProcessor implements BeanPostProcessor {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof IYangJobExecutor)) {
return bean;
}
YangJob annotation = bean.getClass().getAnnotation(YangJob.class);
if (annotation == null) {
return bean;
}
YangJobClientManager.putJobExecutor(bean.getClass().getName(), (IYangJobExecutor) bean);
return bean;
}
}

YangJobClientManager负责监听端口和管理定时任务的执行,它会监听我们配置的yang-job.execute.port端口号,然后当接收到消息时,将消息转为入参,并取出对应的定时任务执行类,执行对应的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.yang.job.client;


import com.alibaba.fastjson.JSONObject;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.dto.ResultT;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class YangJobClientManager {
private static Map<String, IYangJobExecutor> className2JobExecutorMap = new ConcurrentHashMap<>();

private YangJobClientPropertyDTO yangJobClientPropertyDTO;

private ServerSocket serverSocket;

public YangJobClientManager(YangJobClientPropertyDTO yangJobClientPropertyDTO) {
this.yangJobClientPropertyDTO = yangJobClientPropertyDTO;
}

public void init() {
Integer port = this.yangJobClientPropertyDTO.getPort();
try {
this.serverSocket = new ServerSocket(port);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("init success============");
new Thread(() -> {
while (true) {
try {
Socket socket = serverSocket.accept();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
String params = bufferedReader.readLine();
YangJobTransferDTO yangJobTransferDTO = JSONObject.parseObject(params, YangJobTransferDTO.class);
System.out.println(yangJobTransferDTO);
String className = yangJobTransferDTO.getClassName();
YangJobExecuteRequest yangJobExecuteRequest = yangJobTransferDTO.getYangJobExecuteRequest();
IYangJobExecutor jobExecutor = getJobExecutor(className);
if (jobExecutor != null) {
ResultT response = jobExecutor.execute(yangJobExecuteRequest);
printWriter.println(JSONObject.toJSONString(response));
}
bufferedReader.close();
printWriter.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
if (serverSocket.isClosed() || serverSocket == null) {
break;
}
}
}).start();
}

public void shutdown() {
if (this.serverSocket != null) {
try {
this.serverSocket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

public YangJobClientPropertyDTO getYangJobPropertyDTO() {
return this.yangJobClientPropertyDTO;
}

public static void putJobExecutor(String className, IYangJobExecutor iJobExecutor) {
className2JobExecutorMap.put(className, iJobExecutor);
}

public static IYangJobExecutor getJobExecutor(String className) {
return className2JobExecutorMap.get(className);
}

}

YangJobClientContext为客户端的上下文,负责监听SpringBoot刷新消息和关闭消息,并执行对应的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.yang.job.client;


import com.yang.job.client.utils.SpringContextUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ApplicationContextEvent;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;

public class YangJobClientContext implements ApplicationListener<ApplicationContextEvent> {
private static YangJobClientContext instance;

private ApplicationContext applicationContext;

@Override
public void onApplicationEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
System.out.println("刷新了=========");
YangJobClientContext.instance = this;
instance.applicationContext = applicationContext;
init();
} else if (event instanceof ContextClosedEvent) {
System.out.println("销毁了=========");
shutdown();
}
}

private void init() {
YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
yangJobClientManager.init();
}

private void shutdown() {
YangJobClientManager yangJobClientManager = SpringContextUtils.getBeanOfType(YangJobClientManager.class);
yangJobClientManager.shutdown();
}
}

YangJobClientConfiguration为配置类,负责对YangJobClientPostProcessor、YangJobClientManager和YangJobClientContext的统一配置管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.yang.job.client.configuration;


import com.yang.job.client.YangJobClientManager;
import com.yang.job.client.YangJobClientContext;
import com.yang.job.client.dto.YangJobClientPropertyDTO;
import com.yang.job.client.schema.YangJobClientPostProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class YangJobClientConfiguration {
@Autowired
private YangJobClientProperty yangJobClientProperty;

@Bean
public YangJobClientPostProcessor yangJobPostProcessor() {
return new YangJobClientPostProcessor();
}

@Bean
public YangJobClientManager yangJobClientManager() {
YangJobClientPropertyDTO yangJobClientPropertyDTO = new YangJobClientPropertyDTO();
yangJobClientPropertyDTO.setIp(yangJobClientProperty.getIp());
yangJobClientPropertyDTO.setPort(yangJobClientProperty.getPort());
return new YangJobClientManager(yangJobClientPropertyDTO);
}

@Bean
public YangJobClientContext yangJobContext() {
return new YangJobClientContext();
}
}

最后,为了使引入client依赖的应用,能自动装配我们提供的bean,我们在resources目录下创建META-INF目录,在该目录下创建spring.factories文件,文件内容如下:

1
2
3
4
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.yang.job.client.utils.SpringContextUtils,\
com.yang.job.client.configuration.YangJobClientProperty,\
com.yang.job.client.configuration.YangJobClientConfiguration

sample1

我们创建一个sample1项目,引入spring-boot-starter-web依赖和yang-client,yang-core的依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.yang</groupId>
<artifactId>yang-job-core</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yang</groupId>
<artifactId>yang-job-client</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

创建启动类

1
2
3
4
5
6
7
8
9
10
11
12
package com.yang.job.sample1;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class YangJobSample1App {
public static void main(String[] args) {
SpringApplication.run(YangJobSample1App.class, args);
}
}

创建一个任务类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.yang.job.sample1.task;

import com.yang.job.client.annotations.YangJob;
import com.yang.job.dto.ResultT;
import com.yang.job.execute.IYangJobExecutor;
import com.yang.job.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

@Component
@YangJob
public class TestTask1 implements IYangJobExecutor {
@Override
public ResultT execute(YangJobExecuteRequest yangJobExecuteRequest) {
System.out.println("开启定时任务了,入参为:" + yangJobExecuteRequest);
return ResultT.success();
}
}

添加配置文件,因为client模块的YangJobClientProperty需要有yang-job.executor.port和yang-job.executor.ip这两个配置,如果我们的配置文件中,缺少这些配置,会导致报错,无法启动项目。

1
2
3
4
5
6
7
8
9
spring:
application:
name: YangJobSample1App
yang-job:
executor:
port: 9999
ip: 127.0.0.1
server:
port: 8001

测试

我们先启动刚才的sample1项目,然后执行下列代码,来远程调用TestTask1方法执行类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1", 9999);
System.out.println("链接成功=============");
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
yangJobExecuteRequest.setJobId("1");
yangJobExecuteRequest.addParam("num", "1");
YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
yangJobTransferDTO.setClassName("com.yang.job.sample1.task.TestTask1");
yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);

printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));
System.out.println("response:" + bufferedReader.readLine());
bufferedReader.close();
printWriter.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}

执行结果如下,说明我们能成功地进行远程调用。

添加远程任务

domain层

在上一篇文章中,我们操作的任务,都是本地任务,现在我们需要对远程任务进行操作,为了区分任务类型,我们首先在domain层添加一个任务类型枚举

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yang.job.admin.domain.enums;

public enum JobTypeEnum {
LOCAL("local", "本地任务"),
REMOTE("remote", "远程任务");

private String name;

private String description;

JobTypeEnum(String name, String description) {
this.name = name;
this.description = description;
}

public String getName() {
return name;
}

public String getDescription() {
return description;
}

public static JobTypeEnum getJobTypeByName(String name) {
for (JobTypeEnum value : values()) {
if (value.getName().equals(name)) {
return value;
}
}
return null;
}
}

然后修改YangJobModel,添加上任务类型枚举和远程任务信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.yang.job.admin.domain.model;


import com.yang.job.admin.client.dto.common.BusinessException;
import com.yang.job.admin.client.dto.common.ErrorCode;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.event.SaveJobPostEvent;
import com.yang.job.admin.domain.event.SubmitJobPostEvent;
import com.yang.job.admin.domain.event.UpdateJobPostEvent;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.event.EventCenter;
import com.yang.job.admin.infra.job.YangJobManager;
import com.yang.job.admin.infra.job.request.YangJobSubmitParam;
import com.yang.job.admin.infra.utils.CronUtils;
import com.yang.job.admin.infra.utils.SpringContextUtils;
import lombok.Data;

import java.io.Serializable;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobModel implements Serializable {
private Integer jobId;

private String jobName;

private String description;

private String cron;

private String executeClassPath;

private Runnable runnable;

private JobExecuteStrategyEnum executeStrategy;

private JobTypeEnum jobType;

private RemoteExecutorMessage remoteExecutorMessage;

private Integer enable;

private Integer open;

private Date createTime;

private Date updateTime;

private Map<String, String> featureMap = new HashMap<>();

private Map<String, String> executeParamMap = new HashMap<>();

public boolean isEnable() {
if (this.enable == null) {
return false;
}
return this.enable == 1;
}

public boolean isOpen() {
if (!isEnable()) {
return false;
}
if (this.open == null) {
return false;
}
return this.open == 1;
}

public boolean isClose() {
return !isOpen();
}

public boolean isLocalJob() {
return JobTypeEnum.LOCAL == this.jobType;
}

public boolean isRemoteJob() {
return JobTypeEnum.REMOTE == this.jobType;
}

public void setExecuteStrategy(JobExecuteStrategyEnum jobExecuteStrategyEnum) {
if (jobExecuteStrategyEnum == null) {
throw new BusinessException(ErrorCode.EXECUTE_STRATEGY_NO_EXIST);
}
this.executeStrategy = jobExecuteStrategyEnum;
}


public void submitJob() {
YangJobSubmitParam yangJobSubmitParam = convert2YangJobSubmitParam();
YangJobManager yangJobManager = getYangJobManager();
yangJobManager.submitJob(yangJobSubmitParam);
// 提交任务后,发送提交任务后置事件
SubmitJobPostEvent submitJobPostEvent = new SubmitJobPostEvent(yangJobSubmitParam);
getEventCenter().postEvent(submitJobPostEvent);
}

public void cancelJob() {
YangJobManager yangJobManager = getYangJobManager();
yangJobManager.cancelJob(this.jobId);
}

private YangJobSubmitParam convert2YangJobSubmitParam() {
YangJobSubmitParam yangJobBuildParam = new YangJobSubmitParam();
yangJobBuildParam.setJobId(this.jobId);
yangJobBuildParam.setRunnable(this.runnable);
ZonedDateTime nextExecutionTime = CronUtils.nextExecutionTime(this.cron, ZonedDateTime.now());
ZonedDateTime nextNextExecutionTime = CronUtils.nextExecutionTime(this.cron, nextExecutionTime);
long nowEochMill = ZonedDateTime.now().toInstant().toEpochMilli();
long executeEochMill = nextExecutionTime.toInstant().toEpochMilli();
long secondExecuteEochMill = nextNextExecutionTime.toInstant().toEpochMilli();
yangJobBuildParam.setInitialDelay((int)(executeEochMill - nowEochMill) / 1000);
yangJobBuildParam.setPeriod((int)(secondExecuteEochMill - executeEochMill) / 1000);
yangJobBuildParam.setJobExecuteStrategy(this.executeStrategy);
return yangJobBuildParam;
}

public void postSaveJobEvent() {
SaveJobPostEvent saveJobPostEvent = new SaveJobPostEvent(this.jobId);
getEventCenter().asyncPostEvent(saveJobPostEvent);
}

public void postUpdateJobEvent() {
UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
getEventCenter().asyncPostEvent(updateJobPostEvent);
}

public void postDeleteJobEvent() {
UpdateJobPostEvent updateJobPostEvent = new UpdateJobPostEvent(this.jobId);
getEventCenter().asyncPostEvent(updateJobPostEvent);
}

private YangJobManager getYangJobManager() {
return SpringContextUtils.getBeanOfType(YangJobManager.class);
}

private EventCenter getEventCenter() {
return SpringContextUtils.getBeanOfType(EventCenter.class);
}

}

远程任务信息类:

1
2
3
4
5
6
7
8
9
10
11
12
13
package com.yang.job.admin.domain.valueobject;

import lombok.Data;

import java.io.Serializable;

@Data
public class RemoteExecutorMessage implements Serializable {
private String ip;

private Integer port;
}

接着我们添加一个features枚举,用于记录映射features字段中各个key表示的含义,因为我们现在表的设计中没有任务类型字段和远程信息相关的字段,所以会将这些信息添加到features字段中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yang.job.admin.domain.enums;

public enum JobModelFeatureEnum {
JOB_TYPE("jobType", "任务类型"),
REMOTE_EXECUTOR_IP("executorIp", "执行器ip"),
REMOTE_EXECUTOR_PORT("executorPort", "执行器端口"),
REMOTE_EXECUTOR_MESSAGE("r_executor_m", "远程执行器的信息");

private String name;

private String description;

JobModelFeatureEnum(String name, String description) {
this.name = name;
this.description = description;
}


public String getName() {
return name;
}

public String getDescription() {
return description;
}
}

client层

我们修改原先的NewYangJobCommand类,加上任务类型属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yang.job.admin.client.dto.command;


import lombok.Data;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

@Data
public class NewYangJobCommand implements Serializable {
private String jobName;

private String description;

private String cron;

private String executeStrategy;

private String jobType;

private String executeClassPath;

private Integer open;

private Map<String, String> params = new HashMap<>();
}

然后修改YangJobDTO类,也加上jobType属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.yang.job.admin.client.dto;


import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Data
public class YangJobDTO implements Serializable {
private Integer jobId;

private String jobName;

private String description;

private String cron;

private String executeStrategy;

private String executeClassPath;

private String jobType;

private Integer enable;

private Integer open;

private Date createTime;

private Date updateTime;

private Map<String, String> featureMap = new HashMap<>();

private Map<String, String> executeParamMap = new HashMap<>();
}

application层

接着修改YangJobApplicationService类的convertYangJobModel方法,将jobType任务类型和远程任务信息添加到YangJobModel中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private YangJobModel convert2YangJobModel(NewYangJobCommand newYangJobCommand) {
String jobType = newYangJobCommand.getJobType();
JobTypeEnum jobTypeEnum = JobTypeEnum.getJobTypeByName(jobType);
if (jobType == null) {
throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
}
YangJobModel yangJobModel = new YangJobModel();
yangJobModel.setJobName(newYangJobCommand.getJobName());
yangJobModel.setDescription(newYangJobCommand.getDescription());
yangJobModel.setCron(newYangJobCommand.getCron());
yangJobModel.setOpen(newYangJobCommand.getOpen());
yangJobModel.setExecuteStrategy(JobExecuteStrategyEnum.getJobExecuteStrategyByName(newYangJobCommand.getExecuteStrategy()));
yangJobModel.setExecuteClassPath(newYangJobCommand.getExecuteClassPath());
yangJobModel.setExecuteParamMap(newYangJobCommand.getParams());
yangJobModel.setJobType(jobTypeEnum);
if (jobTypeEnum == JobTypeEnum.REMOTE) {
String ip = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_IP.getName());
String port = newYangJobCommand.getParams().get(JobModelFeatureEnum.REMOTE_EXECUTOR_PORT.getName());
if (ip == null || port == null) {
throw new BusinessException(ErrorCode.PARAM_VALID_ERROR);
}
RemoteExecutorMessage remoteExecutorMessage = new RemoteExecutorMessage();
remoteExecutorMessage.setIp(ip);
remoteExecutorMessage.setPort(Integer.valueOf(port));
yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);
} else {
if (yangJobModel.getExecuteClassPath() == null || yangJobModel.getExecuteClassPath().isEmpty()) {
throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
}
try {
Class.forName(yangJobModel.getExecuteClassPath());
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new BusinessException(ErrorCode.UN_LEGAL_CLASS_PATH);
}
}
return yangJobModel;
}

infra层

最后修改基础设施层,首先修改YangJobModelConvertor类,将RemoteMessage和JobType转化到features中,以及从features中取出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package com.yang.job.admin.infra.gatewayimpl.repository.convertor;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.enums.JobExecuteStrategyEnum;
import com.yang.job.admin.domain.enums.JobModelFeatureEnum;
import com.yang.job.admin.domain.enums.JobTypeEnum;
import com.yang.job.admin.domain.model.YangJobModel;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.admin.infra.data.YangJobData;
import com.yang.job.admin.infra.job.thread.RemoteJobExecuteThread;
import com.yang.job.admin.infra.utils.FeaturesUtils;
import com.yang.job.core.dto.YangJobTransferDTO;
import com.yang.job.core.execute.IYangJobExecutor;
import com.yang.job.core.execute.YangJobExecuteRequest;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class YangJobModelConvertor {
public YangJobData convert2Data(YangJobModel yangJobModel) {
if (yangJobModel == null) {
return null;
}
YangJobData yangJobData = new YangJobData();
yangJobData.setJobId(yangJobModel.getJobId());
yangJobData.setJobName(yangJobModel.getJobName());
yangJobData.setDescription(yangJobModel.getDescription());
yangJobData.setCron(yangJobModel.getCron());
yangJobData.setExecuteClassPath(yangJobModel.getExecuteClassPath());
yangJobData.setEnable(yangJobModel.getEnable());
yangJobData.setOpen(yangJobModel.getOpen());
yangJobData.setCreateTime(yangJobModel.getCreateTime());
yangJobData.setUpdateTime(yangJobModel.getUpdateTime());
Map<String, String> featureMap = yangJobModel.getFeatureMap();
featureMap.put(JobModelFeatureEnum.JOB_TYPE.getName(), yangJobModel.getJobType().getName());
featureMap.put(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName(), JSONObject.toJSONString(yangJobModel.getRemoteExecutorMessage()));
yangJobData.setFeatures(FeaturesUtils.convert2Features(featureMap));
yangJobData.setExecuteParams(FeaturesUtils.convert2Features(yangJobModel.getExecuteParamMap()));
yangJobData.setExecuteStrategy(yangJobModel.getExecuteStrategy().getName());
return yangJobData;
}

public YangJobModel convert2Model(YangJobData yangJobData) {
if (yangJobData == null) {
return null;
}
YangJobModel yangJobModel = new YangJobModel();
yangJobModel.setJobId(yangJobData.getJobId());
yangJobModel.setDescription(yangJobData.getDescription());
yangJobModel.setCron(yangJobData.getCron());
yangJobModel.setJobName(yangJobData.getJobName());
yangJobModel.setExecuteClassPath(yangJobData.getExecuteClassPath());
yangJobModel.setEnable(yangJobData.getEnable());
yangJobModel.setOpen(yangJobData.getOpen());
yangJobModel.setCreateTime(yangJobData.getCreateTime());
yangJobModel.setUpdateTime(yangJobData.getUpdateTime());
yangJobModel.setFeatureMap(FeaturesUtils.convert2FeatureMap(yangJobData.getFeatures()));
yangJobModel.setExecuteParamMap(FeaturesUtils.convert2FeatureMap(yangJobData.getExecuteParams()));
JobExecuteStrategyEnum executeStrategy = JobExecuteStrategyEnum.getJobExecuteStrategyByName(yangJobData.getExecuteStrategy());
if (executeStrategy == null) {
throw new RuntimeException("执行策略有误!");
}

JobTypeEnum jobType = JobTypeEnum.getJobTypeByName(yangJobModel.getFeatureMap().get(JobModelFeatureEnum.JOB_TYPE.getName()));
yangJobModel.setJobType(jobType);
String remoteMessageStr = yangJobModel.getFeatureMap().get(JobModelFeatureEnum.REMOTE_EXECUTOR_MESSAGE.getName());
RemoteExecutorMessage remoteExecutorMessage = JSONObject.parseObject(remoteMessageStr, RemoteExecutorMessage.class);
yangJobModel.setRemoteExecutorMessage(remoteExecutorMessage);

yangJobModel.setExecuteStrategy(executeStrategy);
yangJobModel.setRunnable(buildRunnable(yangJobModel));

return yangJobModel;
}

private Runnable buildRunnable(YangJobModel yangJobModel) {
if (yangJobModel.isLocalJob()) {
String executeClassPath = yangJobModel.getExecuteClassPath();
try {
Class<?> aClass = Class.forName(executeClassPath);
if (!IYangJobExecutor.class.isAssignableFrom(aClass)) {
throw new RuntimeException("该类必须实现IYangJobExecutor接口");
}
IYangJobExecutor executor = (IYangJobExecutor) aClass.newInstance();
YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
Runnable runnable = () -> executor.execute(yangJobExecuteRequest);
return runnable;
} catch (InstantiationException | IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
System.out.println(String.format("%s 类路径对应的类不存在", executeClassPath));
e.printStackTrace();
}
} else {
RemoteExecutorMessage remoteExecutorMessage = yangJobModel.getRemoteExecutorMessage();
String executeClassPath = yangJobModel.getExecuteClassPath();

YangJobTransferDTO yangJobTransferDTO = new YangJobTransferDTO();
yangJobTransferDTO.setClassName(executeClassPath);

YangJobExecuteRequest yangJobExecuteRequest = convert2YangJobExecuteRequest(yangJobModel);
yangJobTransferDTO.setYangJobExecuteRequest(yangJobExecuteRequest);

return new RemoteJobExecuteThread(remoteExecutorMessage, yangJobTransferDTO);
}
return null;
}

private static YangJobExecuteRequest convert2YangJobExecuteRequest(YangJobModel yangJobModel) {
YangJobExecuteRequest yangJobExecuteRequest = new YangJobExecuteRequest();
yangJobExecuteRequest.setJobId(yangJobModel.getJobId().toString());
yangJobExecuteRequest.setParams(yangJobModel.getExecuteParamMap());
return yangJobExecuteRequest;
}
}

然后添加一个RemoteJobExecuteThread类,该类实现runnable接口,当我们的任务类型为远程调用时,其YangJobModel的runnable属性为remoteJobExecuteThread类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.yang.job.admin.infra.job.thread;

import com.alibaba.fastjson.JSONObject;
import com.yang.job.admin.domain.valueobject.RemoteExecutorMessage;
import com.yang.job.core.dto.YangJobTransferDTO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;

public class RemoteJobExecuteThread implements Runnable {
private YangJobTransferDTO yangJobTransferDTO;

private RemoteExecutorMessage remoteExecutorMessage;

public RemoteJobExecuteThread(RemoteExecutorMessage remoteExecutorMessage, YangJobTransferDTO yangJobTransferDTO) {
this.remoteExecutorMessage = remoteExecutorMessage;
this.yangJobTransferDTO = yangJobTransferDTO;
}

@Override
public void run() {
try {
String ip = remoteExecutorMessage.getIp();
Integer port = remoteExecutorMessage.getPort();
Socket socket = new Socket(ip, port);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(), true);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

printWriter.println(JSONObject.toJSONString(yangJobTransferDTO));

bufferedReader.close();
printWriter.close();
socket.close();
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

测试

我们先启动之前的sample1项目,然后启动yang-job-admin,调用http://localhost:8080/job添加任务,请求体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"jobName": "RemoteJobExecutor",
"description":"RemoteJobExecutor",
"cron": "0/10 * * * * ?",
"executeStrategy": "withFixedDelay",
"executeClassPath": "com.yang.job.sample1.task.TestTask1",
"open":1,
"jobType":"remote",
"params":{
"executorIp":"127.0.0.1",
"executorPort":"9999"
}
}

添加成功后,我们查看Sample1项目的控制台,可以看到,每10秒,这个TestTask1任务会被调用一次

参考文章

https://www.yihuo.tech/programming/server-stack/exploring-the-java-network-programming-paradigm-socket-udp-nio-and-netty-in-focus/


手撸XXL-JOB(四)—远程调用定时任务
https://cxydhi.github.io/2024/05/14/手撸XXL-JOB(四)—远程调用定时任务/
作者
沉河不浮
发布于
2024年5月14日
许可协议