前言

在项目中对于多线程的使用还是非常普遍的,像并行、串行、优先级顺序执行,还有一些方法涉及到上一个方法的返回值等等。如果我们把人力放在这上面显然是有点多余的,这次给大家介绍京东零售使用的线程编排框架-asyncTool

项目引入

引入这里介绍2种方法,一种是maven引入,一种则是直接引入源码,源码结构和层级都很清晰,代码量也很少。这里推荐源码引入,不过根据具体情况具体分析,大家自行选择

maven引入

这里因为我是源码引入的,所以直接放官方的maven引入方法

外网请使用jitpack.io上打的包 先添加repositories节点

1
2
3
4
5
6
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories

然后添加如下maven依赖

1
2
3
4
5
<dependency>
<groupId>com.gitee.jd-platform-opensource</groupId>
<artifactId>asyncTool</artifactId>
<version>V1.4-SNAPSHOT</version>
</dependency>

源码引入

源码GITEE地址 直接把async这个包放入到项目共用模块即可

初步认识

Worker

worker是一个最小的任务执行单元,代表一个任务,任务中的逻辑有我们具体实现,例如查询数据库或者是一次网络调用等等。对应源码的IWorker接口。接口的T、V两个泛型,分别对应入参类型和出参类型。也就是参数类型和返回值类型。

下面是IWorker接口源码,是一个函数式接口。方法action就是我们的任务逻辑。我们需要把我们任务的具体逻辑写到该方法内。方法有2个入参,第一个object就是我们的参数,第二个是所有的任务列表,我们可以通过id获取到对应的任务,后面会说。defaultValue这个方法是默认返回值,我们可以在这里定义如果任务出错了返回的内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 每个最小执行单元需要实现该接口
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface IWorker<T, V> {
/**
* 在这里做耗时操作,如rpc请求、IO等
*
* @param object object
* @param allWrappers 任务包装
*/
V action(T object, Map<String, WorkerWrapper> allWrappers);

/**
* 超时、异常时,返回的默认值
*
* @return 默认值
*/
default V defaultValue() {
return null;
}
}

ICallback

callback则是对每个worker的回调。也是一个函数式接口,包含begin方法和result方法。begin方法是任务开始执行的一个前置方法。而result方法是任务执行完成后的一个后置方法,我们可以根据布尔值success来判断方法是否执行成功,当然也可以获取方法的入参以及返回值的详细结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 每个执行单元执行完毕后,会回调该接口</p>
* 需要监听执行结果的,实现该接口即可
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface ICallback<T, V> {

/**
* 任务开始的监听
*/
default void begin() {

}

/**
* 耗时操作执行完毕后,就给value注入值
*/
void result(boolean success, T param, WorkResult<V> workResult);
}

Async

Async是一个类,其中beginWork是所有任务的入口,我们这里只需要关心线程池的选择。默认使用的是可缓存线程池,这个线程池适合执行一些轻量级的异步任务。但如果我们的异步任务多为CPU密集型,需要大量的CPU计算,这个时候创建过多的线程反而不能提高程序的运行效率,反而会导致线程切换频繁,消耗更多的CPU资源。因此,对于线程池的选择,我们需要根据具体的业务创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 默认不定长线程池
*/
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();
/**
* 注意,这里是个static,也就是只能有一个线程池。用户自定义线程池时,也只能定义一个
*/
private static ExecutorService executorService;

/**
* 出发点
*/
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
if (workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//保存线程池变量
Async.executorService = executorService;
//定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
Set<WorkerWrapper> set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
}
return false;
}
}

这是可缓存线程池的创建参数,可以看到线程回收时间是60s

1
2
3
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

这里正好把线程池的7大参数以及创建规则放一下

  1. corePoolSize:核心线程数。线程池中始终存活的线程数,当线程池中的线程数达到这个数目后,新的任务就会被放入队列中等待。
  2. maximumPoolSize:线程池的最大线程数。线程池中允许的最大线程数。只有当任务队列满了之后,线程池才会创建超过corePoolSize的线程。
  3. keepAliveTime:线程的空闲时间。当线程池中的线程数量超过corePoolSize时,多余的线程会在空闲了keepAliveTime之后被终止。
  4. unit:时间单位。keepAliveTime的时间单位。
  5. workQueue:任务队列。用于保存等待被执行的任务的阻塞队列。
  6. threadFactory:线程工厂。用于创建新线程的工厂。
  7. handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。

这些参数的推荐设置规则如下:

  • corePoolSizemaximumPoolSize 的选择取决于具体需求和硬件资源。一般来说,如果任务是CPU密集型的,可以设为CPU核心数+1(这里对核心数+1做一个解释:设置线程数为 CPU 核心数 + 1,就是为了在我们的任务运行的时候,如果有其他系统任务需要运行,那么多出来的那一个线程就可以暂停运行,释放 CPU 资源给其他任务使用。这样,我们的任务在大多数情况下都可以使用到全部的 CPU 核心,同时也不会阻止其他任务运行),如果任务是IO密集型的,可以设为CPU核心数*2。maximumPoolSize一般设为系统资源允许的最大线程数。这里再对CPU密集型和IO密集型做一下解释。CPU密集型是我们的任务中需要大量的CPU计算,例如图像处理、科学计算等,CPU是瓶颈,这种情况下我们提高线程的数量并不会增加效率,反而会增加切换线程的开销。而IO密集型则是更多的网络请求、文件读写等,这种情况下IO操作是瓶颈,需要增加更多的线程来并行执行IO操作,提高效率。
  • keepAliveTime 的设置取决于任务特性。如果在短时间内会有大量的并发请求,那么我们可以设置的时间长一点,相反,如果我们的并发没有那么高并且想要节省一些系统资源,可以设置时间小一点,以便及时回收空闲线程
  • unit 通常设为 TimeUnit.SECONDS,具体可以根据需要来设置。
  • workQueue 一般使用 LinkedBlockingQueue 或者ArrayBlockingQueue。ArrayBlockingQueue读写共用一把锁ReentrantLock,而LinkedBlockingQueue读写锁是分离的,分别是takeLock和putLock,因此,在高并发的情况下,LinkedBlockingQueue可以一遍出队一遍入队的操作,效率相比ArrayBlockingQueue更高一点。
  • threadFactory 默认可以使用 Executors.defaultThreadFactory()。
  • handler 有四种策略可以选择:AbortPolicy(默认,直接抛出异常)、CallerRunsPolicy(由调用线程处理任务)、DiscardOldestPolicy(丢弃队列中最旧的任务)和 DiscardPolicy(直接丢弃任务,不做任何处理)。一般多使用CallerRunsPolicy

实战运用

1 -> 2(2个任务同时执行)

首先我创建了一个任务叫Work1,实现了IWorker和ICallback接口,指定该任务的入参和出参都是String。任务逻辑在action中,首先睡眠1s,打印一句话,然后返回入参即完成任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Work1 implements IWorker<String, String>, ICallback<String, String> {
private static final Logger logger = LoggerFactory.getLogger(Work1.class);

@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【默认线程】开始线程任务,方法入参[{}]", object);
return object;
}

@Override
public String defaultValue() {
return "【默认线程】线程执行失败";
}


@Override
public void begin() {
logger.info("【默认线程】线程开始执行,线程名称:[{}]", Thread.currentThread().getName());
}

@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
logger.info("【默认线程】线程执行完成!");
} else {
logger.error("【默认线程】线程执行失败!");
}
}
}

下面是一个测试类,我这里使用的是自定义线程池。同时创建了2个work1的对象。设置好定时器,启动任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class SoraTestApplication {

private final Logger logger = LoggerFactory.getLogger(SoraTestApplication.class);

private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(9, 20, 300, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(512), new ThreadPoolExecutor.CallerRunsPolicy());

@Test
public void test1() throws ExecutionException, InterruptedException {
Work1 work1 = new Work1();
Work1 work2 = new Work1();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(work1)
.callback(work1)
.param("第一个任务")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(work2)
.callback(work2)
.param("第二个任务")
.build();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 通过beginWork开始执行任务,第一个参数表示任务超时时间,第二个参数表示异步任务,可以为多个
Async.beginWork(10000, threadPool,workerWrapper,workerWrapper2);
stopWatch.stop();
logger.info("线程任务执行完成:共耗时[{}]MS", stopWatch.getLastTaskTimeMillis());
logger.info("任务1返回值:[{}]", workerWrapper.getWorkResult().getResult());
logger.info("任务2返回值:[{}]", workerWrapper2.getWorkResult().getResult());
}

}

这里我直接放返回值截图,我们对返回值进行解析,首先我们通过前2行可以发现,启用了2个线程,继续往后打印了每个线程任务的参数,同时开始执行。两个任务执行完成的时间总共是1006毫秒,返回值也成功获取。

image-20230603170106537

当我把超时时间改为1毫秒,我们再来看一下结果,很明显会因为我们任务的休眠1s而超时,返回值是我们defaultValue中设置的对应值。回调函数也返回了false

image-20230603170334248

1 - 23(首先执行第1个任务,第1个任务完成后同时执行第2个任务和第3个任务)

这里修改起来也很简单,我们先创建第三个任务,然后在第一个任务中用next方法将第二个任务和第三个任务绑定到自己的后面。这样第一个任务执行完后就会同时去执行第二个任务和第三个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void test1() throws ExecutionException, InterruptedException {
Work1 work1 = new Work1();
Work1 work2 = new Work1();
Work1 work3 = new Work1();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(work2)
.callback(work2)
.param("第二个任务")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(work3)
.callback(work3)
.param("第三个任务")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(work1)
.callback(work1)
.param("第一个任务")
.next(workerWrapper2,workerWrapper3)
.build();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Async.beginWork(10000, threadPool,workerWrapper);
stopWatch.stop();
logger.info("线程任务执行完成:共耗时[{}]MS", stopWatch.getLastTaskTimeMillis());
}

通过控制台可以发现首先开启了1个线程去执行第一个任务,第一个任务回调完成后开启2个线程,去执行2、3任务。总耗时2s

image-20230603171210336

12 -> 3(第一个任务和第二个任务同时执行,两个都完成后执行第三个任务)

我们需要在第一个任务和第二个任务后面同时使用next方法指向第三个任务,在beginWork开启任务的时候同时执行第一个任务和第二个任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void test1() throws ExecutionException, InterruptedException {
Work1 work1 = new Work1();
Work1 work2 = new Work1();
Work1 work3 = new Work1();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(work3)
.callback(work3)
.param("第三个任务")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(work2)
.callback(work2)
.param("第二个任务")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(work1)
.callback(work1)
.param("第一个任务")
.next(workerWrapper3)
.build();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Async.beginWork(10000, threadPool,workerWrapper,workerWrapper2);
stopWatch.stop();
logger.info("线程任务执行完成:共耗时[{}]MS", stopWatch.getLastTaskTimeMillis());
}

成功实现我们的要求。这个时候可能有人会说这些都没用呀,如果涉及到返回值要怎么办,你又怎么获取到对应任务的返回值?别急,还记得我之前说过的id吗,每一个worker都是有一个id的。下面我们再来说一种情况

image-20230603171619678

1 -> 2 -> 3(第二个任务依赖于第一个任务的返回值、第三个任务依赖于第二个任务的返回值)

这里我改变了原有的work1逻辑。同时加入了2个新work。目前共有三个work。

work1(入参:随机字符串 返回值:1-2随机数)返回1-2的随机数

work2(入参:用户对象 出参:用户对象)判断work1的返回值,如果取模为0,则对传入的用户对象年龄+1,否则返回null

Work3(入参:一个map 出参:一个map)对work2的返回值做判断,如果不为空则将用户对象加入到map中

看完3个work,我们可以得知只有2种情况。一种是work1返回了1,那么work2返回null,work3不会执行。map为空。第二种是work1返回2,work2中对用户对象年龄+1,同时work3加入到map并返回。

我们先来看下三个work代码,因为我们的第二个任务和第三个任务都是需要依赖上一个方法的返回值,我们需要通过next方法将3个方法的拼接起来,之后通过action方法中的allWrappers对象,get到对应的异步任务id,进而获取到对应的返回值方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// work1
public class Work1 implements IWorker<String, Integer>, ICallback<String, Integer> {
private static final Logger logger = LoggerFactory.getLogger(Work1.class);

@Override
public Integer action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
logger.info("【默认线程】开始线程任务,方法入参[{}]", object);
return RandomUtil.randomInt(1,3);
}

@Override
public Integer defaultValue() {
return 0;
}


@Override
public void begin() {
logger.info("【默认线程】线程开始执行,线程名称:[{}]", Thread.currentThread().getName());
}

@Override
public void result(boolean success, String param, WorkResult<Integer> workResult) {
if (success) {
logger.info("【默认线程】线程执行完成!");
} else {
logger.error("【默认线程】线程执行失败!");
}
}

}


// work2
public class Work2 implements IWorker<User, User>, ICallback<User, User> {

private static final Logger logger = LoggerFactory.getLogger(Work2.class);


@Override
public void begin() {
logger.info("【用户年龄转换线程】正在对用户年龄+1");
}

@Override
public void result(boolean success, User param, WorkResult<User> workResult) {
if (success) {
logger.info("【用户年龄转换线程】线程任务执行完成!");
} else {
logger.error("【用户年龄转换线程】线程执行失败!输入用户[{}]", param);
}
}

@Override
public User action(User object, Map<String, WorkerWrapper> allWrappers) {
// 获取第一个异步任务的结果
WorkerWrapper stWorker = allWrappers.get("st");
// 对结果进行判断
if (stWorker != null && (Integer) stWorker.getWorkResult().getResult() % 2 == 0) {
logger.info("【用户年龄转换线程】第一个异步任务返回值为[{}],正在对用户年龄进行转换", stWorker.getWorkResult().getResult());
object.setAge(object.getAge() + 1);
return object;
} else {
logger.warn("第一个异步任务结果返回值为[{}],不符合年龄转换条件,终止转换", stWorker.getWorkResult().getResult());
}
return null;
}

@Override
public User defaultValue() {
logger.error("【用户年龄转换线程】转换年龄出错!已返回默认用户对象");
return new User("default",0);
}
}


// work3
public class Work3 implements IWorker<HashMap<String, Object>, HashMap<String, Object>>, ICallback<HashMap<String, Object>, HashMap<String, Object>> {

private static final Logger logger = LoggerFactory.getLogger(Work3.class);


@Override
public void begin() {
logger.info("【最终汇总线程】正在将用户对象加入到map");
}

@Override
public void result(boolean success, HashMap<String, Object> param, WorkResult<HashMap<String, Object>> workResult) {
if (success) {
logger.info("【最终汇总线程】线程任务执行完成!");
} else {
logger.error("【最终汇总线程】线程执行失败!输入对象[{}]", param);
}
}

@Override
public HashMap<String, Object> action(HashMap<String, Object> object, Map<String, WorkerWrapper> allWrappers) {
// 获取第二个异步任务的结果
WorkerWrapper ndWorker = allWrappers.get("nd");
// 对结果进行判断
if (ndWorker != null && ndWorker.getWorkResult().getResult() != null) {
User user = (User)ndWorker.getWorkResult().getResult();
logger.info("【最终汇总线程】第二个异步任务返回用户名称为[{}],正在对用户加入到集合", user.getName());
object.put(user.getName(), user);
return object;
} else {
logger.warn("【最终汇总线程】第二个异步任务返回值为null,终止汇总");
}
return object;
}

@Override
public HashMap<String, Object> defaultValue() {
return new HashMap<>();
}
}

下面是测试代码。我们通过id设置每个异步任务的唯一标识(默认是UUID),第一个异步任务的入参我们随便传,不影响最终结果。第二个入参我们固定传入一个用户对象,传入的时候年龄为27,第三个异步任务传入一个空的map。现在我们尝试执行一下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Test
public void test2() throws ExecutionException, InterruptedException {
Work1 work1 = new Work1();
Work2 work2 = new Work2();
Work3 work3 = new Work3();
HashMap<String, Object> hashMap = new HashMap<>();
WorkerWrapper<HashMap<String, Object>, HashMap<String, Object>> workerWrapper3 = new WorkerWrapper.Builder<HashMap<String, Object>, HashMap<String, Object>>()
.worker(work3)
.callback(work3)
.id("rd")
.param(hashMap)
.build();
WorkerWrapper<User, User> workerWrapper2 = new WorkerWrapper.Builder<User, User>()
.worker(work2)
.callback(work2)
.id("nd")
.next(workerWrapper3)
.param(new User("sora", 27))
.build();
WorkerWrapper<String, Integer> workerWrapper = new WorkerWrapper.Builder<String, Integer>()
.worker(work1)
.callback(work1)
.id("st")
.next(workerWrapper2)
.param("test")
.build();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Async.beginWork(10000, threadPool,workerWrapper);
stopWatch.stop();
logger.info("线程任务执行完成:共耗时[{}]MS", stopWatch.getLastTaskTimeMillis());
logger.info("最终集合为[{}]", hashMap);
}

可以看到这次是work1返回了2,第二个任务和第三个任务自然就是通的。最终集合内的对象年龄从27变为了28

image-20230603181535546

而这次work1返回了1,后两个任务条件未满足,自然终止掉了。

image-20230603182147945

那这次对于asyncTool就先了解到这,个人感觉还是不错的一个框架。