学习Java函数式编程(七):CompletableFuture实战

这篇文章记录《Java 8实战》第十一章的实例,将各个例子厘清,这里主要涉及到thenApply、thenAccept、thenCompose等函数的用法,通过这些例子了解CompletableFuture基本的使用方法。

异步获取产品价格

第一个实例展示了一个基本的Future模式。

传统的获得价格的方式:

1
2
3
public double getPrice(String product) {
return calculatePrice(product);
}

单纯返回计算结果。

异步方式:

1
2
3
4
5
6
7
8
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
double price = calculatePrice(product);
futurePrice.complete(price);
}).start();
return futurePrice;
}

该方式返回一个Future对象,当需要查询价格的时候再调用方法,这里新开了一个线程计算商品价格。

两种计算价格的方式最大区别在于在主线程中调用getPriceAsync()会早早返回。

用工厂方法创建CompletableFuture对象

之前的写法太繁琐了有木有,而且还没有处理可能产生的异常。CompletableFuture提供了很多工厂方法,比如supplyAsync(),他接受一个生产者为参数,返回CompletableFuture对象:

1
2
3
public Future<Double> getPriceAsync02(String product) {
return CompletableFuture.supplyAsync(()-> calculatePrice(product));
}

这种方法和提供了错误管理机制的getPriceAsync方法(这里并没有提供错误处理)是完全等价的。

最佳价格查询器01:单纯采用stream()

多个Shop对象(这里是4家),每一个都要获取价格,第一种方式:

1
2
3
4
5
6
7
// 该方法生成多个写死了店铺名生成的传入产品的价格,以一个列表的方式返回
public List<String> findPrices(String product) {
return shops.Stream()
.map(shop -> String.format("%s price is %.2f",
shop.getName(), shop.getPrice(product)))
.collect(Collectors.toList());
}

输出:

1
2
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
stream done in 4060 msecs

串行流就是一个个挨着执行,没什么特别的。

最佳价格查询器02:采用parallelStream()

将stream()更改为parallelStream():

输出:

1
2
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74]
parallelStream done in 1043 msecs

并行流的结果确实很优秀!

最佳价格查询器03:采用CompletableFuture

首先要说明的是,这里的例子仍然采用串行流,实践起来分两步,第一步如下:

1
2
3
4
5
6
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getName() + "price is" +
shop.getPrice(product)))
.collect(Collectors.toList());

在第一步采用了map函数后就用收集器收集是因为流操作之间的延迟性,这一步之后先创建多个Future对象,因为这些对象已经创建,所以即便发生getPrice的阻塞,也不会影响那些线程的执行,因为它们已经在之前start了。

第二步:

1
2
3
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

join功能和get类似,只是不需要抛异常。

运行一下结果如下:

1
2
[BestPriceprice is123.25651664705744, LetsSaveBigprice is169.4653393606115, MyFavoriteShopprice is214.12914480588853, BuyItAllprice is184.74384995303313]
CompletableFuture done in 2028 msecs

最后的效果是02 > 03 > 01。

将上面的例子的条件更改成处理5个Shop对象,02的输出如下:

1
2
[BestPrice price is 123.26, LetsSaveBig price is 169.47, MyFavoriteShop price is 214.13, BuyItAll price is 184.74, chenchen price is 219.55]
parallelStream done in 2058 msecs

03的输出如下:

1
2
[BestPriceprice is123.25651664705744, LetsSaveBigprice is169.4653393606115, MyFavoriteShopprice is214.12914480588853, BuyItAllprice is184.74384995303313, chenchenprice is219.5526451599223]
CompletableFuture done in 2024 msecs

这个时候结果变成03 > 02 > 01。

从店铺数大于4开始,并行流和CompletableFuture开始不分伯仲,这是因为它们背后都是采用通用线程池来完成任务。

设置线程数

实例中的线程池选取了newFixedThreadPool,它的corePoolSize和maximumPoolSize是相同的,这里本人比较关心守护线程的效果,先将其设置为true:

1
2
3
4
5
6
7
8
9
10
11
private final Executor executor =
Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
);

主线程执行结束后程序就退出了,如果不将其设为守护线程,那么程序将会一直阻塞。之前在解析线程池的时候分析过shutdown()效果,newFixedThreadPool采用的是LinkedBlockingQueue(),当前任务执行完后线程池的线程不会自行结束,会阻塞在那等待新的任务。

添加折扣服务

这里模拟远程的折扣服务,它出现在获得价格的操作之后,这里简单的说一说基础逻辑:

  • 修改Shop类下的getPrice方法,将Shop名、价格、折扣码通过“:”组合成字符串,作为该方法的返回;
  • 构造Quote类,该类提供parse静态方法对上一步返回的字符串进行解析,将解析的各部分保存在Quote类字段中;
  • 创建Discount类,该类首先构造一个内部枚举类Code,保存已经设定好的折扣代码;再次提供私有方法apply(),该方法计算出折扣后的价格并返回,延迟模拟也放在该方法中;最后提供appDiscount方法,该方法将打折后的价格拼接成对用户友好的字符串并返回;

串行流执行添加折扣后的服务

和之前的逻辑相似,先采用串行流,添加findPricesDis_stream方法:

1
2
3
4
5
6
7
public List<String> findPricesDis_stream(String product) {
return shops.stream()
.map(shop -> shop.getPrice(product))
.map(Quote::parse)
.map(Discount::applyDiscount)
.collect(Collectors.toList());
}

在BestPriceFinderMain类中将相关方法切换为findPricesDis_stream并执行,最后输出结果:

1
2
[BestPrice price is 110.934, LetsSaveBig price is 135.576, MyFavoriteShop price is 192.717, BuyItAll price is 184.74, chenchen price is 208.5725]
Stream done in 10066 msecs

这个结果很好解释,整个执行属于顺序执行,期间一共阻塞过10秒。

使用CompletableFuture执行添加折扣后的服务

这里就不演示并行流了,因为它和CompletableFuture都是通过线程池来实现,但是前者随着商店数增加扩展性不够好,而后者可以指定要执行的线程数。

直接代码放送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public List<String> findPricesDis_Compl(String product) {
List<CompletableFuture<String>> priceFutures = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map( x -> x.thenApply(Quote::parse))
.map(x -> x.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)))
.collect(Collectors.toList());
return priceFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}

输出:

1
2
[BestPrice price is 110.934, LetsSaveBig price is 135.576, MyFavoriteShop price is 192.717, BuyItAll price is 184.74, chenchen price is 208.5725]
CompletableFuture done in 2056 msecs

这个方法最主要的部分在于thenApply()以及thenCompose()。为了理解这两个函数,我们再将上面的例子分开书写,一个个研究:

1
2
Stream<CompletableFuture<String>> cf01 = shops.stream().map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor));

cf01是一个Stream>类型,该串行流的流元素是CompletableFuture类型,CompletableFuture封装计算的结果的类型是String,接下来进行map函数运算:

1
2
Stream<CompletableFuture<Quote>> cf02 = cf01.map(x ->
x.thenApply(Quote::parse));

cf01的流元素是CompletableFuture类型,所以map中的x指代的是CompletableFuture类型的对象。

thenApply()之于CompletableFuture,类似于map()之于Stream。那不妨继续打比方,既然map处理的是Stream的流元素,那么thenApply处理的也是CompletableFuture对象中的“值元素”(姑且这么称吧),这个值元素之前是String类型的价格,现在作为Quote类的静态方法的参数传入,返回的是Quote对象,所以x.thenApply(Quote::parse)返回的是同一个CompletableFuture对象,只不过“值元素”更改了。

继续接下来的map运算:

1
2
3
Stream<CompletableFuture<String>> cf03 = cf02.map(x -> x.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));

cf02是Stream>类型,所以map中的x指代的是CompletableFuture类型的对象。

thenCompose()之于CompletableFuture,类似于flatMap()之于Stream。此时thenCompose方法内的quote相当于CompletableFuture对象的“值元素”,每个值元素运算后又返回CompletableFuture类型,这可以按照flatMap()之于Stream的逻辑去理解。

最后的结果是2056 msecs,是因为thenCompose方法对两个异步操作进行了流水线处理,也就是第一个操作完成后,将结果作为参数传递给第二个操作。

这些函数都有Async版本,名称中不含Async的函数会在和前一个任务一样的线程中运行,而带有Async字样的版本会提交到一个线程池,这样每个任务都是由不同线程处理的。

如果不采用thenCompose方法而是采用thenApply方法,也就是:

1
Stream<CompletableFuture<String>> cf04 = cf02.map(x -> x.thenApply(Discount::applyDiscount));

结果如下:

1
2
[BestPrice price is 110.934, LetsSaveBig price is 135.576, MyFavoriteShop price is 192.717, BuyItAll price is 184.74, chenchen price is 208.5725]
parallelStream done in 2053 msecs

和之前的结果没多差。

第一时间显示可返回产品价格

之前所有的delay函数都是延迟一秒,这里将延迟的时间设置成一个随机数,更好模拟真实情形:

1
2
3
4
5
6
7
8
9
public static void randomDelay() {
int delay = 500 + random02.nextInt(2000);
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

将randomDelay写在calculatePrice方法里。接下来在PriceFinder类中添加相关函数。

将前面ch03单独抽离出来构造一个新的方法findPricesStream:

1
2
3
4
5
6
7
8
9
public Stream<CompletableFuture<String>> findPricesStream(String product) {
return shops.stream()
.map(shop -> CompletableFuture.supplyAsync(
() -> shop.getPrice(product), executor))
.map( x -> x.thenApply(Quote::parse))
.map(x -> x.thenCompose(quote ->
CompletableFuture.supplyAsync(
() -> Discount.applyDiscount(quote), executor)));
}

接着将thenAccept运用在新的实例中:

1
2
3
4
5
6
7
8
9
10
11
public void printPricesStream(String product) {
long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream(product)
.map(f -> f.thenAccept(s -> System.out.println(s + " (done in" + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
}

thenAccept函数监听任务的完成,它以CompletableFuture执行完毕后的返回值做参数,可以回调主线程的函数,这样就不需要调用get方法。allOf在这里是指列表中的CompletableFuture都完成任务后再往下执行,输出结果如下:

1
2
3
4
5
6
LetsSaveBig price is 174.02900000000002 (done in2251 msecs)
chenchen price is 214.871 (done in2276 msecs)
MyFavoriteShop price is 173.774 (done in2823 msecs)
BestPrice price is 117.567 (done in3153 msecs)
BuyItAll price is 169.8895 (done in3428 msecs)
All shops have now responded in 3428 msecs

为了看清allOf的作用,以及演示toArray终端操作,将代码更改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void printPricesStream(String product) {
long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream(product)
// .map(f -> f.thenAccept(s -> System.out.println(s + " (done in" + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
// .toArray(size -> new CompletableFuture[size]);
.map(f -> f.thenAccept(s -> {
System.out.println(s + " (done in" + ((System.nanoTime() - start) / 1_000_000) + " msecs)");
}))
.toArray(size -> new CompletableFuture[size]);
// CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
}

这里去掉含带allOf的语句,在主线程的最后添加一个10秒的睡眠,运行结果如下:

1
2
3
4
5
6
All shops have now responded in 9 msecs
BuyItAll price is 169.8895 (done in1616 msecs)
chenchen price is 214.871 (done in2232 msecs)
BestPrice price is 117.567 (done in2447 msecs)
MyFavoriteShop price is 173.774 (done in3167 msecs)
LetsSaveBig price is 174.02900000000002 (done in3238 msecs)

所以说allOf函数阻塞了主线程,目的就是等待并行都完成。

小结

本实战的难点在于对一些细节的掌控,通过CompletableFuture可以实现原本Future不能直接提供的需求。

参考

Java 8 CompletableFuture 教程

CompletableFuture | thenApply vs thenCompose