CompletableFuture
场景
最近在工作中遇到了短信发送的业务,由于不打算采用以前的存库+定时任务轮询调用的方式而是改用异步+多线程的方式直接调用,同时对于短信的状态、消息需要冗余为以后的失败重发兜底,因此打算采用CompletableFuture。
简介
CompletableFuture是Java对Future的拓展。当我们使用submit提交任务时会返回一个Future对象,通过它我们可以判断任务执行的成功与否并且获取返回值,但是它的get方法会阻塞当前线程,使用起来并不友好。
CompletableFuture实现了Future和CompletionStage接口,它提供了一系列的创建、链式调用和组合Future的方法,并且对于异常也有相应的处理。
相比于CountDownLatch及CyclicBarrier,个人觉得使用起来更为简洁_(:з」∠)_。
demo
/**
* 多线程异步调用
*
* @param smsEntityList
*/
@Async
public void send(List<SmsEntity> smsEntityList) {
if (CollectionUtils.isEmpty(smsEntityList)) {
return;
}
//可以传入一个线程池,默认使用ForkJoinPool
List<CompletableFuture<Void>> futureList = smsEntityList.stream().map(smsEntity -> CompletableFuture.supplyAsync(() -> sendSms(smsEntity), threadPoolTaskExecutor)
.thenAccept(uId -> {
//sendSms执行完成之后获得返回值进行业务处理
smsEntity.setUId(uId);
smsEntity.setStatus(1);
//todo
}).exceptionally(ex -> {
//发生异常情况的业务处理
smsEntity.setStatus(0);
log.error("send sms exception:" + ex);
return null;
})).collect(Collectors.toList());
//组合所有CompletableFuture
CompletableFuture<Void> allFutures = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
//所有线程执行完成之后统一业务处理
allFutures.thenAccept(param -> {
//todo save smsEntityList
});
}
/**
* 具体短信发送
*
* @param smsEntity
* @return
*/
public String sendSms(SmsEntity smsEntity) {
//todo
return UUID.randomUUID().toString();
}
api简单使用
runAsync():返回一个新的CompletableFuture<Void>对象,通常当无需对任务返回结果进行处理时使用。
CompletableFuture<Void> run = CompletableFuture.runAsync(()-> System.out.println("hello"));
supplyAsync():返回一个新的CompletableFuture<U>对象,将result作为输出返回,通常当后台任务需要返回一些结果时使用。
CompletableFuture<String> supply = CompletableFuture.supplyAsync(() -> {
System.out.println("Hello");
return "world";
});
System.out.println(supply.get()); //world
thenApply():返回一个CompletableFuture<U>对象,可以获得前一个任务的返回值并产生新的返回值。
CompletableFuture<String> thenApply = CompletableFuture.supplyAsync(() -> "world").thenApply(str -> "Hello " + str);
thenAccept():返回一个CompletableFuture<Void>对象,可以获取前一个任务的返回值但没有新的返回值。
CompletableFuture.supplyAsync(() -> "world").thenAccept(System.out::println);
thenRun():返回一个CompletableFuture<Void>对象,并且无法获取前一个任务的返回值。
CompletableFuture.runAsync(() -> {
//todo
}).thenRun(() -> {
//todo
});
CompletableFuture.allOf():组合多个CompletableFuture,可以在所有任务完成之后执行其他操作。
CompletableFuture.anyOf():组合多个CompletableFuture,当任意一个任务完成之后就能获取该任务结果。
exceptionally():回调异常处理。
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> "hello").exceptionally(ex -> {
log.error("exception:" + ex);
return ex.getMessage();
});
handle():无论是否发送异常都会被调用。
CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> "hello").handle((res, ex) -> {
if (ex != null) {
log.error("exception:" + ex);
return ex.getMessage();
}
return res + "world";
});
其他
这个博客的介绍感觉写得不错的,可以参考下。
https://www.callicoder.com/java-8-completablefuture-tutorial
Q.E.D.