这篇文章记录《Java 8实战》第十一章的实例,将各个例子厘清,这里主要涉及到thenApply、thenAccept、thenCompose等函数的用法,通过这些例子了解CompletableFuture基本的使用方法。
异步获取产品价格
第一个实例展示了一个基本的Future模式。
传统的获得价格的方式:
|
|
单纯返回计算结果。
异步方式:
|
|
该方式返回一个Future
两种计算价格的方式最大区别在于在主线程中调用getPriceAsync()会早早返回。
用工厂方法创建CompletableFuture对象
之前的写法太繁琐了有木有,而且还没有处理可能产生的异常。CompletableFuture提供了很多工厂方法,比如supplyAsync(),他接受一个生产者为参数,返回CompletableFuture对象:
|
|
这种方法和提供了错误管理机制的getPriceAsync方法(这里并没有提供错误处理)是完全等价的。
最佳价格查询器01:单纯采用stream()
多个Shop对象(这里是4家),每一个都要获取价格,第一种方式:
|
|
输出:
|
|
串行流就是一个个挨着执行,没什么特别的。
最佳价格查询器02:采用parallelStream()
将stream()更改为parallelStream():
输出:
|
|
并行流的结果确实很优秀!
最佳价格查询器03:采用CompletableFuture
首先要说明的是,这里的例子仍然采用串行流,实践起来分两步,第一步如下:
|
|
在第一步采用了map函数后就用收集器收集是因为流操作之间的延迟性,这一步之后先创建多个Future对象,因为这些对象已经创建,所以即便发生getPrice的阻塞,也不会影响那些线程的执行,因为它们已经在之前start了。
第二步:
|
|
join功能和get类似,只是不需要抛异常。
运行一下结果如下:
|
|
最后的效果是02 > 03 > 01。
将上面的例子的条件更改成处理5个Shop对象,02的输出如下:
|
|
03的输出如下:
|
|
这个时候结果变成03 > 02 > 01。
从店铺数大于4开始,并行流和CompletableFuture开始不分伯仲,这是因为它们背后都是采用通用线程池来完成任务。
设置线程数
实例中的线程池选取了newFixedThreadPool,它的corePoolSize和maximumPoolSize是相同的,这里本人比较关心守护线程的效果,先将其设置为true:
|
|
主线程执行结束后程序就退出了,如果不将其设为守护线程,那么程序将会一直阻塞。之前在解析线程池的时候分析过shutdown()效果,newFixedThreadPool采用的是LinkedBlockingQueue
添加折扣服务
这里模拟远程的折扣服务,它出现在获得价格的操作之后,这里简单的说一说基础逻辑:
- 修改Shop类下的getPrice方法,将Shop名、价格、折扣码通过“:”组合成字符串,作为该方法的返回;
- 构造Quote类,该类提供parse静态方法对上一步返回的字符串进行解析,将解析的各部分保存在Quote类字段中;
- 创建Discount类,该类首先构造一个内部枚举类Code,保存已经设定好的折扣代码;再次提供私有方法apply(),该方法计算出折扣后的价格并返回,延迟模拟也放在该方法中;最后提供appDiscount方法,该方法将打折后的价格拼接成对用户友好的字符串并返回;
串行流执行添加折扣后的服务
和之前的逻辑相似,先采用串行流,添加findPricesDis_stream方法:
|
|
在BestPriceFinderMain类中将相关方法切换为findPricesDis_stream并执行,最后输出结果:
|
|
这个结果很好解释,整个执行属于顺序执行,期间一共阻塞过10秒。
使用CompletableFuture执行添加折扣后的服务
这里就不演示并行流了,因为它和CompletableFuture都是通过线程池来实现,但是前者随着商店数增加扩展性不够好,而后者可以指定要执行的线程数。
直接代码放送:
|
|
输出:
|
|
这个方法最主要的部分在于thenApply()以及thenCompose()。为了理解这两个函数,我们再将上面的例子分开书写,一个个研究:
|
|
cf01是一个Stream
|
|
cf01的流元素是CompletableFuture
thenApply()之于CompletableFuture,类似于map()之于Stream
。那不妨继续打比方,既然map处理的是Stream的流元素,那么thenApply处理的也是CompletableFuture对象中的“值元素”(姑且这么称吧),这个值元素之前是String类型的价格,现在作为Quote类的静态方法的参数传入,返回的是Quote对象,所以x.thenApply(Quote::parse)返回的是同一个CompletableFuture对象,只不过“值元素”更改了。
继续接下来的map运算:
|
|
cf02是Stream类型的对象。
thenCompose()之于CompletableFuture,类似于flatMap()之于Stream
。此时thenCompose方法内的quote相当于CompletableFuture对象的“值元素”,每个值元素运算后又返回CompletableFuture类型,这可以按照flatMap()之于Stream的逻辑去理解。
最后的结果是2056 msecs,是因为thenCompose方法对两个异步操作进行了流水线处理,也就是第一个操作完成后,将结果作为参数传递给第二个操作。
这些函数都有Async版本,名称中不含Async的函数会在和前一个任务一样的线程中运行,而带有Async字样的版本会提交到一个线程池,这样每个任务都是由不同线程处理的。
如果不采用thenCompose方法而是采用thenApply方法,也就是:
|
|
结果如下:
|
|
和之前的结果没多差。
第一时间显示可返回产品价格
之前所有的delay函数都是延迟一秒,这里将延迟的时间设置成一个随机数,更好模拟真实情形:
|
|
将randomDelay写在calculatePrice方法里。接下来在PriceFinder类中添加相关函数。
将前面ch03单独抽离出来构造一个新的方法findPricesStream:
|
|
接着将thenAccept运用在新的实例中:
|
|
thenAccept函数监听任务的完成,它以CompletableFuture执行完毕后的返回值做参数,可以回调主线程的函数,这样就不需要调用get方法。allOf在这里是指列表中的CompletableFuture都完成任务后再往下执行,输出结果如下:
|
|
为了看清allOf的作用,以及演示toArray终端操作,将代码更改如下:
|
|
这里去掉含带allOf的语句,在主线程的最后添加一个10秒的睡眠,运行结果如下:
|
|
所以说allOf函数阻塞了主线程,目的就是等待并行都完成。
小结
本实战的难点在于对一些细节的掌控,通过CompletableFuture可以实现原本Future不能直接提供的需求。