LiteFlow - WhenCondition 和异步超时机制
WhenCondition
对应 EL 规则中的 WHEN 关键字,会被包装为
WhenCondition 组件
主要的并发编排逻辑主要由 WhenCondition
的实现方法执行
属性
WHEN 关键字支持的属性:
ignoreError:调用链调用失败时是否继续往下执行group:并发分组;该属性已经弃用,因为可以使用不同的WHEN进行分组控制,例如THEN(WHEN(a,b),WHEN(c,d))any:任意节点执行成功就继续向下执行threadExecutorClass:线程池名称;实例化后的线程池会被放在ExecutorHelper的Map<String, ExecutorService> executorServiceMap属性
1 | public class WhenCondition extends Condition { |
任务编排
WhenCondition 实现的 Condition 抽象方法
executeCondition
核心操作就是将任务包装为 CompletableFuture:
- 过滤前后置组件
- 过滤
isAccessfalse;一是过滤掉不需要执行的组件,二是不执行的组件一定是执行完成最快的,会造成any属性结果的混乱 - 对
Condition下的Map<String, List<Executable>> executableGroup包装为CompletableFuture,编排异步任务 - 在包装为
CompletableFuture过程中使用了CompletableFutureTimeout工具,套入CompletableFutureTimeout方法进行超时判断,如果超时则用WhenFutureObj.timeOut返回超时的对象 - 最终将所有需要执行的任务放进集合
List<CompletableFuture<WhenFutureObj>>
根据 any 属性对任务集合再次进行编排封装
如果是 any 属性,则使用
CompletableFuture.anyOf 进行编排,否则使用
CompletableFuture.allOf
1 | if (this.isAny()) { |
最终等待任务执行完成
1 | try { |
超时实现
Java 8 的 CompletableFuture 并没有 timeout
机制,虽然可以在 get 的时候指定 timeout,但是是一个同步堵塞的操作
一般的实现方案是启动一个 ScheduledThreadpoolExecutor
线程在 timeout * 时间后直接调用
CompletableFuture.completeExceptionally(new TimeoutException())
* 然后用 acceptEither() 或者 applyToEither
看是执行完成还是超时
Java 9 引入了 orTimeout 和
completeOnTimeOut 两个方法支持 异步 timeout
机制,底层也是使用 ScheduledThreadpoolExecutor
进行实现的
这里 LiteFlow 将任务和执行和超时封装成了一个 API 工具
CompletableFutureTimeout
在上面任务包装过程中,调用了
CompletableFutureTimeout.completeOnTimeout 方法
1 | public static <T> CompletableFuture<T> completeOnTimeout(T t, CompletableFuture<T> future, long timeout, |
- 泛型 T 下的
t,在任务编排中是WhenFutureObj对象 future是通过CompletableFuture.supplyAsync(new ParallelSupplier(executable, currChainName, slotIndex), parallelExecutor)编排的任务CompletableFuture对象,返回结果依然是WhenFutureObjtimeout是超时时间unit是超时时间单位
timeoutAfter 创建出超时异常任务后,由业务任务
future 的 applyToEither 编排两个任务
超时异常任务
调用 timeoutAfter 包装超时任务
1 | public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) { |
delayer 是一个
ScheduledThreadPoolExecutor
创建出一个 CompletableFuture,如果该任务在
get 时未完成,则抛出 TimeoutException 异常
WhenFutureObj
WhenFutureObj 是任务结果的包装对象
对于一个任务执行完成的状态有三种情况:
- 成功
- 失败
- 超时
在 ParallelSupplier
包装过程中对应了成功、失败两种返回
1 | public class ParallelSupplier implements Supplier<WhenFutureObj> { |
在超时任务中对应了超时的状态返回
WhenFutureObj.timeOut(executable.getId())
对象 build 方法
1 | public class WhenFutureObj { |
主要就是设置
success、timeout、异常信息等异步执行结果
结果解析
过滤出已经完成的任务,放到集合中
对于没有完成的任务就执行 cancel 操作
1 | List<WhenFutureObj> allCompletableWhenFutureObjList = completableFutureList.stream().filter(f -> { |
集合中的任务 get 出任务的产出,即包装的
WhenFutureObj
根据 WhenFutureObj 结果属性判断后续操作
- 输出超时信息
- 如果配置中
isIgnoreError不忽略异常- 根据
interrupted[0]集合中的中断情况抛出异常throw new WhenExecuteException(StrUtil.format("requestId [{}] when execute interrupted. errorResume [false].", slot.getRequestId())) - 循环判断
CompletableFuture的返回值,如果异步执行失败,则抛出相应的业务异常;如果是超时,这里就会抛出在超时任务编排时抛出的TimeoutException
- 根据
- 如果忽略异常;则对中断输出 warn 日志
补充
CompletableFuture cancel
在 WhenCondition 执行流程中,对于包装的
anyOf CompletableFuture
执行完成后,存在没有结束的任务,则会调用 CompletableFuture
的 cancel 方法
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
方法参数 mayInterruptIfRunning
实际并没有使用,也就是说整个 CompletableFuture
的执行并不会被中断
在官方文档中也说
@param mayInterruptIfRunning this value has no effect in this implementation because interrupts are not used to control processing.
这个值在这个实现中没有作用,因为中断不用于控制处理
该方法实现自 Future,其定义的
mayInterruptIfRunning 并没有使用
StackOverFlow 中也有人提出了这样的问题,认为违反了
Java.util.concurrent.Future 中对 cancel
方法定义的约定
其原因在评论中有所讨论:
As mentioned in my previous (edited) comment, the
CompletableFuturedoes not hold a reference to either the actual work or theThreadprocessing it. This is at least implied in the class documentation: "Since (unlike FutureTask) this class has no direct control over the computation that causes it to be completed, cancellation is treated as just another form of exceptional completion"由于与
FutureTask不同,CompletableFuture对执行任务的线程没有直接控制,因此取消被视为另一种形式的异常完成
CompletableFuturedoes not hold reference to thread, which I think it is also a problem, It is not consistency withFuture
CompletableFuture没有引用线程,我认为这也是一个问题,它与Future不一致
参考
java - Why does CompletableFuture implement the Future interface - Stack Overflow