CompletableFuture

2020-04-28   


CompletableFuture

场景

  最近在工作中遇到了短信发送的业务,由于不打算采用以前的存库+定时任务轮询调用的方式而是改用异步+多线程的方式直接调用,同时对于短信的状态、消息需要冗余为以后的失败重发兜底,因此打算采用CompletableFuture。

简介

  CompletableFuture是Java对Future的拓展。当我们使用submit提交任务时会返回一个Future对象,通过它我们可以判断任务执行的成功与否并且获取返回值,但是它的get方法会阻塞当前线程,使用起来并不友好。
  CompletableFuture实现了Future和CompletionStage接口,它提供了一系列的创建、链式调用和组合Future的方法,并且对于异常也有相应的处理。
  相比于CountDownLatch及CyclicBarrier,个人觉得使用起来更为简洁_(:з」∠)_。
completablefuture1

demo
    /**
     * 多线程异步调用
     *
     * @param smsEntityList
     */
    @Async
    public void send(List<SmsEntity> smsEntityList) {
        if (CollectionUtils.isEmpty(smsEntityList)) {
            return;
        }
        //可以传入一个线程池,默认使用ForkJoinPool
        List<CompletableFuture<Void>> futureList = smsEntityList.stream().map(smsEntity -> CompletableFuture.supplyAsync(() -> sendSms(smsEntity), threadPoolTaskExecutor)
                .thenAccept(uId -> {
                    //sendSms执行完成之后获得返回值进行业务处理
                    smsEntity.setUId(uId);
                    smsEntity.setStatus(1);
                    //todo
                }).exceptionally(ex -> {
                    //发生异常情况的业务处理
                    smsEntity.setStatus(0);
                    log.error("send sms exception:" + ex);
                    return null;
                })).collect(Collectors.toList());

        //组合所有CompletableFuture
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));

        //所有线程执行完成之后统一业务处理
        allFutures.thenAccept(param -> {
            //todo save smsEntityList
        });
    }


    /**
     * 具体短信发送
     *
     * @param smsEntity
     * @return
     */
    public String sendSms(SmsEntity smsEntity) {
        //todo
        return UUID.randomUUID().toString();
    }
api简单使用

runAsync():返回一个新的CompletableFuture<Void>对象,通常当无需对任务返回结果进行处理时使用。

CompletableFuture<Void> run = CompletableFuture.runAsync(()-> System.out.println("hello"));

supplyAsync():返回一个新的CompletableFuture<U>对象,将result作为输出返回,通常当后台任务需要返回一些结果时使用。

CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hello");
        return "world";
    });
    System.out.println(supply.get());  //world

thenApply():返回一个CompletableFuture<U>对象,可以获得前一个任务的返回值并产生新的返回值。

CompletableFuture<String> thenApply = CompletableFuture.supplyAsync(() -> "world").thenApply(str -> "Hello " + str);

thenAccept():返回一个CompletableFuture<Void>对象,可以获取前一个任务的返回值但没有新的返回值。

CompletableFuture.supplyAsync(() -> "world").thenAccept(System.out::println);

thenRun():返回一个CompletableFuture<Void>对象,并且无法获取前一个任务的返回值。

CompletableFuture.runAsync(() -> {
    //todo
}).thenRun(() -> {
    //todo
});

CompletableFuture.allOf():组合多个CompletableFuture,可以在所有任务完成之后执行其他操作。

CompletableFuture.anyOf():组合多个CompletableFuture,当任意一个任务完成之后就能获取该任务结果。

exceptionally():回调异常处理。

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> "hello").exceptionally(ex -> {
    log.error("exception:" + ex);
    return ex.getMessage();
});

handle():无论是否发送异常都会被调用。

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> "hello").handle((res, ex) -> {
    if (ex != null) {
        log.error("exception:" + ex);
        return ex.getMessage();
    }
    return res + "world";
});
其他

这个博客的介绍感觉写得不错的,可以参考下。
https://www.callicoder.com/java-8-completablefuture-tutorial

Q.E.D.