介绍
Flowable从一开始就一直支持并行执行任务,大多数时候都是使用 BPMN 并行网关。在 CMMN 中,并行执行的任务是任务执行的默认方式(如果您不熟悉 CMMN,请查看我们的 CMMN 博客系列简介)。
您可能想知道标题中的“真正的并行执行”是什么意思。为了回答这个问题,让我们用一些非常简单的 HTTP 任务在 BPMN 中做一个小练习。在每个任务名称中,我们都编写了完成单个 HTTP 任务所需的时间。通过查看下面的过程,您认为接下来的过程要花多长时间?
处理并行 HTTP 任务
如果您不熟悉Flowable或其他类似的工作流程引擎,则可能会说大约需要 800 毫秒才能完成。但是,实际上,此流程实例实际上在 2.6s 内执行。即使我们有一个描述逻辑并行业务执行的并行网关,该技术实现实际上也会按顺序完成这项工作,一个 HTTP 任务一个接一个。这样做的主要原因是事务性:Flowable保证从一个等待状态原子地移动到另一个等待状态。这样做的代价是所有任务的执行都是单线程的。
与异步标志的区别
你们中的一些人会立即说:“ Flowable已经有了使任务异步化的解决方案,为什么我们需要阅读有关并行执行的新博客?”
以上面的示例为例,将 Task1,Task2,Task3 和 Task4 标记为异步将确实使所有任务的执行都是异步的(有关更多信息,请参见 Demystifying the Asynchronous Flag)。但是,这样做有一些重要的警告。
现在,我们将有四个不同的线程/事务对同一个流程实例进行修改。当所有分支完成时加入时,这可能导致乐观的锁定异常(这是完全可以的,因为如果发生这种情况,Flowable将重试异步作业)。也有一个解决方案:将这些任务排除在外。但是,这意味着每个流程实例一次只能执行一个分支,以避免乐观锁异常。实际上,这将使执行并行化(甚至有可能将负载分散到不同的Flowable节点上),但是当发生冲突和发挥排他性时,它可能会使完整的端到端运行变慢。因此,异步/排他主要是在不同的 Flowable 节点之间分散负载的一种方法,而不是加快执行速度。当然,使用异步标志也具有其他效果,例如更快地将控制权返回给用户。但这不是本文的重点。
FUTUREJAVA 代表
在某些用例中,主要焦点是原始吞吐量,通常是在服务编排中,而不是分散负载。如果我们看一下上面的示例过程模型,自然的倾向是说这应该在〜800ms 内完成,因为那是执行时间最长的路径(第一和第三路径)所需的时间。其他分支速度更快,不应影响总执行时间。
现在,对于即将到来的 Flowable 6.6 开源版本所做的工作,现在确实可以做到这一点(如果您想通过从源代码构建引擎进行试验,代码库中已经存在该代码)。借助此功能,Flowable 将允许您编写自己的业务逻辑,以便可以完全并行地执行它。为此,我们使用 Java 8 CompletableFutures,并将工作的执行委托给来自不同线程池的新线程,从而允许当前线程继续执行和调度其他并行流。
为了使您能够编写真正的并行业务逻辑,我们添加了一些可以使用的新接口。主要接口是 FutureJavaDelegate(用于 BPMN)和 PlanItemFutureJavaDelegate(用于 CMMN)。我们将展示 BPMN 接口,但是 CMMN 具有相同的概念。通过实现这些接口,您的业务逻辑将自动符合真正的并行执行条件。
public interface FutureJavaDelegate
如您所见,该接口看起来与 JavaDelegate 非常相似。有一个 execute 方法带有 DelegateExecution 和一个新的 AsyncTaskInvoker。AsyncTaskInvoker 是一个新接口,可用于安排 Flowable 维护的公共共享线程池上的工作。但是,您可以根据需要使用自己的实现,甚至可以重用从所使用的库返回的 CompletableFuture(例如,Elasticsearch 的 Java API,MongoDB 等)。
除了 execute 方法之外,还有另一个方法(afterExecution)采用了 DelegateExecution 和执行数据。此方法是从与流程实例相同的线程和相同的事务中调用的,一旦昂贵的逻辑完成,应使用此方法在 DelegateExecution 上设置数据。不应在另一个线程上安排的工作中使用 DelegateExecution;在安排工作之前,应检索所有需要的数据。一个示例实现可能如下所示:
public class LongRunningJavaDelegate implements FutureJavaDelegate { public CompletableFuture execute(DelegateExecution execution, AsyncTaskInvoker taskInvoker) { // This is running in the same transaction as the process instance and is still possible to set and extract data from the execution String input = (String) execution.getVariable("input"); // The taskInvoker is a common invoker provided by Flowable that can be used to submit complex executions on a new thread. // However, you don't have to use it, you can use your own custom ExecutorService or return a CompletableFuture from your own services. return taskInvoker.submit(() -> { // This is running on a new thread. The execution shouldn't be used here. // There is also no transaction here. In case a new transaction is needed, then it should be managed by your own services // Perform some complex logic that takes some time, e.g. invoking an external service return "done"; }); } public void afterExecution(DelegateExecution execution, String executionData) { // This is running in the same transaction and thread as the process instance and data can be set on the execution execution.setVariable("longRunningResult", executionData); } }
除了 FutureJavaDelegate 之外,还有两个其他接口可以使您更轻松地实现业务逻辑:
- FlowableFutureJavaDelegate <输入,输出> –不需要自定义创建 CompletableFuture 的方式时可以使用,并且 Flowable 将通过 AsyncTaskInvoker 为您进行调度。它提供了一个挂钩点来在与事务相同的线程上创建输入数据,并提供了一个不返回 CompletableFuture 的 execute 方法。
- MapBasedFlowableFuture –一个接口,该接口公开委托执行的快照作为执行的输入数据,并显示执行结果的映射。结果映射的所有条目将存储在委托执行中。
可以使用 MapBasedFlowableFutureJavaDelegate 以下方式创建上面的示例实现:
public class LongRunningJavaDelegate implements MapBasedFlowableFutureJavaDelegate { public Map<string, object=""> execute(ReadOnlyDelegateExecution execution) { // The execution contains a read only snapshot of the delegate execution // This is running on a new thread. The execution shouldn't be used here. // There is also no transaction here. In case a new transaction is needed, then it should be managed by your own services // Perform some complex logic that takes some time, e.g. invoking an external service Map<string, object=""> result = new HashMap<>(); result.put("longRunningResult", "done"); // All the values from the returned map will be set on the execution return result; } }</string,></string,>
除了通过使用委托支持真正的并行执行外,我们还通过返回 CompletableFuture 的表达式来支持服务任务的此类执行。如果服务任务的表达式结果返回 CompletableFuture,我们将继续执行并行流,一旦将来完成,我们将继续该流的路径。
我们如何实现这一目标?
我们已经展示了使用 Flowable 实现真正的并行执行所需的操作。让我们快速看一下它是如何实现的,以及为什么我们以前没有这样做。
主要解决方案包括将委托类的执行分为不同的阶段。处理执行数据(例如变量)的阶段计划在调用线程上运行并参与现有事务。例如,这避免了连接多个异步路径的问题,该问题需要在同一实体上连接数据,如上所述,使用异步标志时。然后使用一个单独的线程池在一个阶段中计划将这些数据用作输入并生成输出数据的实际逻辑。最终,所有执行路径及其结果将合并到一个在事务上安全的阶段。
多年以来,我们就一直在使用此功能。到达此处花了一段时间的主要原因是,从技术上讲,它很难实现。原始的 V5 架构并不适合于此类事情,而执行它的方式意味着实现这一点极为困难。请注意,并非并非不可能(毕竟都是代码),但麻烦,并且从我们在 V5 架构上的经验来看,在所有用例中都易于出错。
但是,借助 V6 架构和我们最近进行的重构,这变得容易实现。所有的操作都在一个议程中计划(所有引擎,BPMN / CMMN / DMN /…都是这样工作的),这使我们可以计划一些特殊的操作,这些操作将在议程没有剩余时检查未来是否已经完成正常运作。这样,就可以继续执行已在议程上计划的其他并行流程。一旦没有其他正常操作,我们将执行一项操作,直到任何一个计划的期货完成(保持交易未完成)之前,该操作都将阻塞。这个未来的完成将允许在议程上计划新的正常操作,然后可以执行该操作。这将一直持续到我们进入等待状态,并且在此事务中不再执行任何其他类型的操作(包括期货)。
当然,这是其实现方式的简化。如果你有兴趣在低级别的细节那么我们建议你看看这个 PR 是添加了这个功能。
真正的并行 HTTP 任务
回到引言中的示例,我们使用了 HTTP 任务。在 Flowable 6.6+中,默认情况下,不使用这种真正的并行方法执行 HTTP 任务。我们决定保持这种方式,以确保所有当前正在运行的进程的执行顺序保持不变,并为您提供控制权,以决定您希望它们在同一线程中执行还是在新线程中执行。为此,我们添加了新的 BPMN 和 CMMN 扩展属性 flowable :parallelInSameTransaction,可以将其设置为 true 以使用新的并行方法执行 HTTP Task。如果没有设置该属性,然后在全球 defaultParallelInSameTransaction 从 HttpClientConfig 在 BPMN 和 CMMN 发动机配置来决定它应该如何执行。
我们目前正在制定基准测试,以向您展示 Flowable HTTP Tasks 的真正并行执行与同步并行执行之间的区别。我们还在尝试使用 HTTP NIO 和 Spring WebClient 来实现更高的吞吐量。