百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

带你玩转CompletableFuture异步编程

wptr33 2025-02-19 14:10 21 浏览

为什么使用CompletableFuture

一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。 JDK5新增了Future接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,我们必须使用Future.get的方式阻塞调用线程,或者使用轮询方式判断 Future.isDone 任务是否结束,再获取结果。这两种处理方式都不是很优雅,相关代码如下:

    @Test
    public void testFuture() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        Future future = executorService.submit(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        });
        try {
            System.out.println(future.get());
            System.out.println("end");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
复制代码

此外就是无法解决多个异步任务需要相互依赖的场景,简单点说就是,主线程需要等待子线程任务执行完毕之后在进行执行,这个时候你可能想到了CountDownLatch,没错确实可以解决,代码如下,但是Java8以后我不在认为这是一种优雅的解决方式,接下来我们来了解下CompletableFuture的使用。

    @Test
    public void testCountDownLatch() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        CountDownLatch downLatch = new CountDownLatch(2);
        Future orderFuture = executorService.submit(() -> {
            OrderService orderService = new OrderServiceImpl();
            String result = orderService.queryOrderInfo();
            downLatch.countDown();
            return result;
        });

        Future trailFuture = executorService.submit(() -> {
            TrailService trailService = new TrailServiceImpl();
            String result = trailService.queryTrail();
            downLatch.countDown();
            return result;
        });

        try {
            downLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println(orderFuture.get() + trailFuture.get());
            System.out.println("end");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
复制代码

使用介绍

创建实例

关于CompletableFuture的创建提供了5种方式,第一个就是创建一个具有默认结果的 CompletableFuture,不经常使用,我们常用就是runAsync和supplyAsync,重点关注下这两个静态方法。runAsync是不具有返回值,supplyAsync具有返回值,关于这两个方法都提供一个两种创建形式,一种是默认的使用公共的 ForkJoinPool 线程池执行,这个线程池默认线程数是 CPU 的核数。

另外一种使用就是提供一个主动创建线程池,这样做相比于ForkJoinPool的好处是,可以通过自定义不同场景的线程池,来进行业务划分方便发现问题,还有一个好处就是对于ForkJoinPool这种共享线程来说,一旦阻塞,会照成其他线程无法获取执行机会。

CompletionStage

关于CompletableFuture核心能力就是通过继承Future和CompletionStage来实现的,关于Future就是提供一些异步的能力,如果单存就这样CompletableFuture也就不会那么强大,所以我们核心就是介绍CompletionStage内部常用一些方法。

关于CompletionStage的方法,可以分为两种类型的接口,其中核心方法都提供异步的方式,这里关于异步的方法不进行介绍,基本上原理类似,都是新提供一个线程池去实现任务。

异步回调

关于异步回调可以分为两类,一种是有参数的返回,另外一种是无参数的返回,有参数返回的包括thenApply和thenCompose,无参数返回的包括thenRun和thenAccept。

有参数返回

thenApply 和 thenCompose表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,也可以理解为串行化的,唯一不同的是thenCompose需要返回一个新的 CompletionStage,整体的使用如下:

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "aaaaaaaaaaaaaaaaaaaaa";
        }).thenApply(x -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return x + "bbbbbbbbbbbbbbbbbbbbbbbb";
        }).thenCompose(x -> CompletableFuture.supplyAsync(x::toUpperCase));
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println("耗时" + (end - start) / 1000 + "");
    }
复制代码

无参数返回

thenAccep也是消费上一个任务的动作,将该任务的执行结果即方法返回值作为入参传递到回调方法中,只是无返回值,thenAccep与thenRun方法不同就是没有入参也没有返回值。

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "a";
        }).thenAccept(x -> {
            System.out.println(x);
        });
        first.join();
        long end = System.currentTimeMillis();
        System.out.println("耗时" + (end - start) / 1000 + "秒");
    }

复制代码

异常

CompletableFuture方法执行过程若产生异常,只有get或者join方法的才能获取到异常,针对这种情况CompletableFuture提供三种处理异常的方式。

exceptionally

exceptionally的使用方式类似于 try catch中的catch代码块中异常处理。exceptionally当某个任务执行异常时执行的回调方法,将抛出的异常作为参数传递到回调方法中,如果该任务正常执行,exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果。

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("test");
        });
        CompletableFuture two = first.exceptionally((x) -> {
            x.printStackTrace();
            return "123";
        });
        two.join();
        long end = System.currentTimeMillis();
        System.out.println("耗时" + (end - start) / 1000 + "秒");
    }
复制代码

whenComplete

whenComplete的使用类似于 try..catch..finanlly 中 finally 代码块,无论是否发生异常,都将会执行的。whenComplete当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "aa";
        }).whenComplete((x, throwable) -> {
            // 如果异常存在,打印异常,并且返回默认值
            if (throwable != null) {
                throwable.printStackTrace();
                System.out.println("失败");
            } else {
                System.out.println("成功");
            }
        });
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println("耗时" + (end - start) / 1000 + "秒");
    }
复制代码

handle

跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关。

    @Test
    public void testCompletableFuture() {
        long start = System.currentTimeMillis();
        CompletableFuture first = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            throw new RuntimeException("test");
        }).handle((x, throwable) -> {
            // 如果异常存在,打印异常,并且返回默认值
            if (throwable != null) {
                throwable.printStackTrace();
                return "异常";
            } else {
                
                return x + "aa";
            }
        });
        System.out.println(first.join());
        long end = System.currentTimeMillis();
        System.out.println("耗时" + (end - start) / 1000 + "秒");
    }
复制代码

组合关系

组合关系关系分为两种,一种是和的关系,一种是或的关系,和关系就是当所有的任务执行完成以后再继续执行,类似于CountDownLatch,或关系就是只要有一个任务执行完成以后就可以向下执行。

和关系

thenCombine /thenAcceptBoth / runAfterBoth/allOf

这四个方法可以将多个CompletableFuture组合起来,将多个CompletableFuture都执行完成以后,才能执行后面的操作,区别在于,thenCombine会将任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意多个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。allOf是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

    @Test
    public void testCompletableFuture() {
        CompletableFuture order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        });
        CompletableFuture trail = CompletableFuture.supplyAsync(() -> {
            TrailService trailService = new TrailServiceImpl();
            return trailService.queryTrail();
        });
        CompletableFuture future = order.thenCombine(trail, (a, b) -> a + b);
        CompletableFuture afterBoth = future.runAfterBoth(trail, () -> {
            System.out.println(future.join());
        });
        CompletableFuture result = CompletableFuture.allOf(afterBoth);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
复制代码

applyToEither / acceptEither / runAfterEither/anyOf

这四个方法可以将多个CompletableFuture组合起来,只需要其中一个CompletableFuture执行完成以后,就能执行后面的操作,applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值,注意多个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。anyOf多个任务只要有一个任务执行完成,后续任务就可执行。

    @Test
    public void testCompletableFuture() {
        CompletableFuture order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        });
        CompletableFuture trail = CompletableFuture.supplyAsync(() -> {
            TrailService trailService = new TrailServiceImpl();
            return trailService.queryTrail();
        });
        CompletableFuture future = order.applyToEither(trail, (result) -> result);
        CompletableFuture afterBoth = future.runAfterEither(trail, () -> {
            System.out.println(future.join());
        });
        CompletableFuture result = CompletableFuture.anyOf(afterBoth,order);
        try {
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
复制代码

项目中如何使用

CompletableFuture在自定义线程时,默认使用的线程池是 ForkJoinPool.commonPool(),对于我们用java常做的IO密集型任务,默认线程池是远远不够使用的;在双核及以下机器上,默认线程池又会退化为为每个任务创建一个线程,相当于没有线程池。因此对于CompletableFuture在项目中的使用一定要自定义线程池,同时又要注意自定义线程池,线程池有个容量满了的拒绝策略,如果采用丢弃策略的拒绝策略,并且allOf方法和get方法如果没有设置超时则会无限期的等待下去,接下来我们通过自定义线程使用CompletableFuture。

  1. 自定义线程池,此处通过继承ThreadPoolExecutor,重写了shutdown() 、shutdownNow() 、beforeExecute() 和 afterExecute()方法来统计线程池的执行情况,此处还可以结合Spring和appllo实现自定义扩展线程池,下一篇可以聊聊扩展思路以及实现方案,不同对的业务场景使用的不同的线程池,一是方便出现问题的排查,另外就是类似于Hystrix隔离的方案;
package com.zto.lbd;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池监控类
 *
 * @author wangtongzhou 18635604249
 * @since 2022-02-23 07:27
 */
public class ThreadPoolMonitor extends ThreadPoolExecutor {

    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);

    /**
     * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
     */
    private ConcurrentHashMap startTimes;

    /**
     * 线程池名称,一般以业务名称命名,方便区分
     */
    private String poolName;

    /**
     * 调用父类的构造方法,并初始化HashMap和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param poolName        线程池名称
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue workQueue, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }


    /**
     * 调用父类的构造方法,并初始化HashMap和线程池名称
     *
     * @param corePoolSize    线程池核心线程数
     * @param maximumPoolSize 线程池最大线程数
     * @param keepAliveTime   线程的最大空闲时间
     * @param unit            空闲时间的单位
     * @param workQueue       保存被提交任务的队列
     * @param threadFactory   线程工厂
     * @param poolName        线程池名称
     */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                             TimeUnit unit, BlockingQueue workQueue,
                             ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }


    /**
     * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况
     */
    @Override
    public void shutdown() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} 关闭线程池, 已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /**
     * 线程池立即关闭时,统计线程池情况
     */
    @Override
    public List shutdownNow() {
        // 统计已执行任务、正在执行任务、未执行任务数量
        LOGGER.info("{} 立即关闭线程池,已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /**
     * 任务执行之前,记录任务开始时间
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /**
     * 任务执行之后,计算任务结束时间
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、
        // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、
        // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止
        LOGGER.info("{}-pool-monitor: " +
                        "任务耗时: {} ms, 初始线程数: {}, 核心线程数: {}, 正在执行的任务数量: {}, " +
                        "已完成任务数量: {}, 任务总数: {}, 队列里任务数量: {}, 池中存在的最大线程数: {}, " +
                        "最大线程数: {},  线程空闲时间: {}, 线程池是否关闭: {}, 线程池是否终止: {}",
                this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    /**
     * 生成线程池所用的线程,改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪
     */
    static class MonitorThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /**
         * 初始化线程工厂
         *
         * @param poolName 线程池名称
         */
        MonitorThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}
复制代码
  1. 使用自定义线程池的CompletableFuture;
    private final static BlockingQueue workQueue = new ArrayBlockingQueue<>(100);
    private final static ThreadPoolMonitor threadPoolMonitor = new ThreadPoolMonitor(5, 10, 100L,
            TimeUnit.SECONDS, workQueue, "monitor");

    @Test
    public void testCompletableFuture() {
        CompletableFuture order = CompletableFuture.supplyAsync(() -> {
            OrderService orderService = new OrderServiceImpl();
            return orderService.queryOrderInfo();
        },threadPoolMonitor);
        String result=order.join();
        assertTrue(Objects.nonNull(result));
    }
复制代码

相关推荐

开发者必看的八大Material Design开源项目

MaterialDesign是介于拟物和扁平之间的一种设计风格,自从它发布以来,便引起了很多开发者的关注,在这里小编介绍在Android开发者当中里最受青睐的八个MaterialDesign开源项...

另类插这么可爱,一定是…(另类t恤)

IT之家(www.ithome.com):另类插图:这么可爱,一定是…OSXMavericks和Yosemite打破了苹果对Mac操作系统传统的命名方式,使用加州的某些标志性景点来替换猫...

Android常用ADB命令(安卓adb工具是什么)

杀死应用①根据包名获取APP的PIDadbshellps|grep应用包名②执行kill命令...

微软Mac版PowerPoint测试Reading Order Pane功能

IT之家5月20日消息,微软公司昨日(5月19日)发布博文,邀请Microsoft365Insiders成员,测试macOS新版PowerPoint演示文稿应用,重点引入...

Visual Studio跨平台开发实战(4):Xamarin Android控制项介绍

前言不同于iOS,Xamarin在VisualStudio中针对Android,可以直接设计使用者界面.在本篇教学文章中,笔者会针对Android的专案目录结构以及基本控制项进行介绍,包...

用云存储30分钟快速搭建APP,你信吗?

背景不管你承认与否,移动互联的时代已经到来,这是一个移动互联的时代,手机已经是当今世界上引领潮流的趋势,大型的全球化企业和中小企业都把APP程序开发纳入到他们的企业发展策略当中。但随着手机APP上传的...

谷歌P图神器来了!不用学不用教,输入一句话,分分钟给结果

Pine发自凹非寺量子位|公众号QbitAI当你拍照片时,“模特不好好配合”怎么办?...

iOS文本编辑控件UITextField和UITextVie

记录一个菜鸟的IOS学习之旅,如能帮助正在学习的你,亦枫不胜荣幸;如路过的大神如指教几句,亦枫感激涕淋!细心的朋友可能已经注意到了,IOS学习之旅系列教程在本篇公众号的文章中,封面已经换成美女图片了,...

Android入门图文教程集锦(android 入门教程)

Android入门视频教程集锦AndroidStudio错误gradientandroid:endXattributenotfound...

如何使用Android自定义复合视图(如何使用android自定义复合视图)

在最近的一个客户应用中,我遇到了一个需求,根据选定的值来生成指定数量的编辑框字段,这样用户可以输入人物信息。最初我的想法是把这些逻辑放到Fragment中,只是根据选中值的变化来向线性布局容器中增加编...

原生安卓开发app的框架frida常用关键代码定位

前言有时候可能会对APP进行字符串加密等操作,这样的话你的变量名等一些都被混淆了,看代码就可能无从下手...

教程10 | 三分钟搞定一个智能输入法程序

一案例描述1、考核知识点网格布局线性布局样式和主题Toast2、练习目标掌握网格布局的使用掌握Toast的使用掌握线性布局的使用...

(Android 8.1) 功能与新特性(android的功能)

和你一起终身学习,这里是程序员AndroidAndroid8.1(API级别27)为用户和开发人员引入了各种新特性和功能。本文档重点介绍了开发人员的新功能。通过本章阅读,您将获取到以下内容:Andr...

怎样设置EditText内部文字被锁定不可删除和修改

在做项目的时候,我曾经遇到过这样的要求,就是跟百度贴吧客户端上的一样,在回复帖子的时候,在EditText中显示回复人的名字,而且这个名字不可以修改和删除,说白了就是不可操作,只能在后面输入内容。在E...

如何阻止 Android 活动启动时 EditText 获得焦点

技术背景在Android开发中,当活动启动时,EditText有时会自动获得焦点并弹出虚拟键盘,这可能不是用户期望的行为。为了提升用户体验,我们需要阻止...