基于Apache Camel的企业集成模式实践:第九章-SAGA EIP
1 总览
Saga EIP提供了一种在Camel路线中定义一系列相关动作的方法,这些动作可以成功完成,也可以不执行或补偿。Saga实现协调使用任何传输的分布式服务通信,以实现全球一致的结果。Saga EIP与传统的ACID分布式(XA)交易不同,因为保证不同参与服务的状态仅在Saga结束时才是一致的,而不会在任何中间步骤中保持一致。
Saga EIP适用于不鼓励使用分布式事务的用例。例如,允许参与Saga的服务使用任何类型的数据存储,例如经典数据库甚至NoSQL非事务性数据存储。它们也适合在无状态云服务中使用,因为它们不需要将事务日志存储在服务旁边。Saga EIP也不需要在短时间内完成,因为它们不使用数据库级别的锁,这与事务不同。因此,它们可以生存更长的时间,从几秒钟到几天。
Saga EIP不使用数据锁定。相反,他们定义了“补偿动作”的概念,该动作是在标准流遇到错误时应执行的动作,目的是还原流执行之前存在的状态。补偿动作可以使用Java或XML DSL在Camel路由中声明,并且仅在需要时(如果由于错误而取消了传奇)由Camel调用补偿动作。
2 Saga EIP选项
Saga EIP支持以下列出的6个选项:
名称 | 描述 | 默认 | 类型 |
传播 | 设置Saga传播模式(REQUIRED,REQUIRES_NEW,MANDATORY,SUPPORTS,NOT_SUPPORTED,NEVER)。 | 需要 | |
完成模式 | 确定如何将Saga视为完整。设置AUTO为时,成功处理启动Saga的交换时将完成Saga,或在异常完成时对其进行补偿。设置为时MANUAL,用户必须使用saga:complete或saga:compensate端点完成或补偿传奇。 | SagaCompletionMode | |
timeoutIn毫秒 | 设置佐贺的最长时间。超时到期后,传奇事件将自动得到补偿(除非在此期间做出了其他决定)。 | 长 | |
补偿金 | 必须调用补偿端点URI来补偿路由中所做的所有更改。与补偿URI对应的路由必须执行补偿并且完整无误。如果在补偿过程中发生错误,则Saga服务会再次调用补偿URI重试。 | SagaActionUri定义 | |
完成 | Saga成功完成时调用的完成端点URI。与完成URI对应的路由必须执行完成任务并终止而不会出现错误。如果在完成过程中发生错误,则Saga服务会再次调用完成URI重试。 | SagaActionUri定义 | |
选项 | 允许保存当前交换的属性,以便在补偿或完成回调路由中重用它们。选项通常很有用,例如,存储和检索在补偿操作中删除的对象的标识符。选项值转换为补偿/完成交换的输入标头。 | 清单 |
3 Saga服务配置
Saga EIP要求将实现接口的服务
org.apache.camel.saga.CamelSagaService添加到Camel上下文。骆驼目前支持以下Saga服务:
- InMemorySagaService:这是Saga EIP的基本实现,不支持高级功能(不进行远程上下文传播,在应用程序失败的情况下也不保证一致性)。
3.1 使用内存中传奇服务
不建议将In-memory Saga服务用于生产环境,因为它不支持Saga状态的持久性(仅保留在内存中),因此在应用程序失败的情况下,它不能保证Saga EIP的一致性(例如, JVM崩溃)。另外,在使用内存中的Saga服务时,无法使用传输级标头将Saga上下文传播到远程服务(可以通过其他实现方式完成)。当您要使用内存中的saga服务时,可以添加以下代码来自定义Camel上下文。该服务属于该camel-core模块。
context.addService(new org.apache.camel.impl.saga.InMemorySagaService());
4 例子
例如,您要下一个新订单,并且系统中有两项不同的服务:一项管理订单,一项管理贷项。从逻辑上讲,如果您有足够的信用可以下订单。使用Saga EIP,您可以将Direct:Buy路线建模为Saga,由两个不同的动作组成,一个动作创建订单,一个动作获得功劳。这两个动作都必须执行,否则,如果没有信用就下达订单,都不会被认为是不一致的结果(以及没有订单的付款)。
from("direct:buy")
.saga()
.to("direct:newOrder")
.to("direct:reserveCredit");
在其余示例中,购买动作不会改变。用于为“新订单和储备金抵扣”操作建模的不同选项如下:
from("direct:newOrder")
.saga()
.propagation(SagaPropagation.MANDATORY)
.compensation("direct:cancelOrder")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(orderManagerService, "newOrder")
.log("Order ${body} created");
此处,传播模式设置为MANDATORY,这意味着在此路由中流动的任何交换必须已经是Saga的一部分(在本示例中就是这种情况,因为Saga是在direct:buy路由中创建的)。该直接:newOrder航线声明了一个补偿的行动,被称为直接:cancelOrder,负责撤销的情况下,佐贺被取消了订单。
每个交易所始终包含一个
Exchange.SAGA_LONG_RUNNING_ACTION标头,在此用作订单的ID。这样可以确定在相应的补偿操作中要删除的顺序,但这不是必需的(可以将选项用作替代解决方案)。direct:newOrder的补偿动作是direct:cancelOrder,如下所示:
from("direct:cancelOrder")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(orderManagerService, "cancelOrder")
.log("Order ${body} cancelled");
当应取消订单时,Saga EIP实现会自动调用它。它不会因错误而终止。如果在direct:cancelOrder路由中引发错误,则EIP实现应定期重试以执行补偿操作,直至达到特定限制。这意味着任何补偿动作都必须是幂等的,因此应考虑到它可能多次触发并且在任何情况下都不会失败。如果所有重试后都无法补偿,则Saga实施应触发手动干预过程。
注意
可能发生由于Direct:newOrder路由执行的延迟,与此同时另一方取消了Saga(由于并行路由中的错误或Saga级别的超时)。因此,在调用补偿动作direct:cancelOrder时,它可能找不到取消的订单记录。为了保证完全的全局一致性,重要的是任何主要动作及其相应的补偿动作都是可交换的,例如,如果补偿发生在主要动作之前,则其效果应相同。
当不可能使用交换行为时,另一种可能的方法是始终使补偿动作失败,直到找到由主动作产生的数据(或耗尽最大重试次数)为止。这种方法可能在许多情况下都有效,但是它是启发式的。
信用服务的执行几乎与订购服务相同。
from("direct:reserveCredit")
.saga()
.propagation(SagaPropagation.MANDATORY)
.compensation("direct:refundCredit")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved in action ${body}");
呼吁赔偿行动:
from("direct:refundCredit")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(creditService, "refundCredit")
.log("Credit for action ${body} refunded");
在这里,信用保留的补偿动作是退款。该示例可以与Saga EIP的两种实现一起运行,因为它不涉及远程端点。
4.1 处理完成事件
Saga完成时需要某种类型的处理。发生错误并取消Saga时,将调用补偿端点。在完成端点可以被调用时,佐贺成功完成做进一步的处理。例如,在上面的订单服务中,我们可能需要知道订单何时完成(和保留的信用)才能真正开始准备订单。如果付款未完成,我们不希望开始准备订单(与大多数现代的CPU不同,在允许您有权读取它之前,现代CPU允许您访问保留的内存)。可以使用direct:newOrder端点的修改版本轻松完成此操作:
- 调用完成端点:
from("direct:newOrder")
.saga()
.propagation(SagaPropagation.MANDATORY)
.compensation("direct:cancelOrder")
.completion("direct:completeOrder")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(orderManagerService, "newOrder")
.log("Order ${body} created");
- 该直接:cancelOrder是一样的前面的示例所示。调用成功完成,如下所示:
from("direct:completeOrder")
.transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
.bean(orderManagerService, "findExternalId")
.to("jms:prepareOrder")
.log("Order ${body} sent for preparation");
Saga完成后,订单将发送到JMS队列进行准备。像补偿动作一样,Saga协调器也可以多次调用完成动作(尤其是在发生错误(例如网络错误)的情况下)。在此示例中,侦听prepareOrder JMS队列的服务已准备好保留可能的重复项(有关如何处理重复项的示例,请参见幂等方的EIP)。
4.2 使用自定义标识符和选项
您可以使用Saga选项来注册自定义标识符。例如,信贷服务的重构如下:
- 生成一个自定义ID并在正文中进行设置,如下所示:
from("direct:reserveCredit")
.bean(idService, "generateCustomId")
.to("direct:creditReservation")
- 委派动作并在补偿动作中根据需要标记当前主体。
from("direct:creditReservation")
.saga()
.propagation(SagaPropagation.SUPPORTS)
.option("CreditId", body())
.compensation("direct:creditRefund")
.bean(creditService, "reserveCredit")
.log("Credit ${header.amount} reserved. Custom Id used is ${body}");
- 仅在取消传奇时,才从标题中检索CreditId选项。
from("direct:creditRefund")
.transform(header("CreditId")) // retrieve the CreditId option from headers
.bean(creditService, "refundCredit")
.log("Credit for Custom Id ${body} refunded");
的直接:creditReservation端点可以通过传播模式设定为被称为佐贺之外,SUPPORTS。这样,可以在佐贺路线中声明多个选项。
4.3 设置超时
在Saga EIP上设置超时可确保在机器故障的情况下Saga不会永远卡住。Saga EIP实现在未明确指定的所有Saga EIP上均设置了默认超时。当超时时间到期时,除非之前已做出其他决定,否则Saga EIP将决定取消Saga(并补偿所有参与者)。
可以对Saga参与者设置超时,如下所示:
from("direct:newOrder")
.saga()
.timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
.propagation(SagaPropagation.MANDATORY)
.compensation("direct:cancelOrder")
.completion("direct:completeOrder")
// ...
.log("Order ${body} created");
所有参与者(例如,信贷服务,订单服务)都可以设置自己的超时时间。这些超时的最小值作为它们组合在一起时的传奇超时。也可以在Saga级别指定超时,如下所示:
from("direct:buy")
.saga()
.timeout(5, TimeUnit.MINUTES) // timeout at saga level
.to("direct:newOrder")
.to("direct:reserveCredit");
4.4 选择传播
上面的示例使用MANDATORY,SUPPORTS和REQUIRED传播模式,这是在未指定其他任何内容的情况下使用的默认传播。这些传播模式将交易环境中使用的等效模式1:1映射。
传播 | 描述 |
REQUIRED | 加入现有的Saga或创建一个新的Saga(如果不存在)。 |
REQUIRES_NEW | 始终创建一个新的传奇。暂停旧的Saga,并在新的Saga终止后恢复它。 |
MANDATORY | 传奇故事必须已经存在。现有的佐贺县加入。 |
SUPPORTS | 如果Saga已经存在,请加入。 |
NOT_SUPPORTED | 如果Saga已经存在,它将被挂起并在当前块完成时恢复。 |
NEVER | 在Saga中绝不能调用当前块。 |
4.5 使用手动完成(高级)
当无法以同步方式全部执行Saga,但例如需要使用异步通信通道与外部服务进行通信时,则无法将完成模式设置为AUTO(默认),因为交换时Saga尚未完成创建完成。对于执行时间较长(小时,天)的Saga EIP,通常是这种情况。在这些情况下,应使用MANUAL完成模式。
from("direct:mysaga")
.saga()
.completionMode(SagaCompletionMode.MANUAL)
.completion("direct:finalize")
.timeout(2, TimeUnit.HOURS)
.to("seda:newOrder")
.to("seda:reserveCredit");
为seda:newOrder和seda:reserveCredit添加异步处理。这些将异步回调发送到seda:operationCompleted。
from("seda:operationCompleted") // an asynchronous callback
.saga()
.propagation(SagaPropagation.MANDATORY)
.bean(controlService, "actionExecuted")
.choice()
.when(body().isEqualTo("ok"))
.to("saga:complete") // complete the current saga manually (saga component)
.end()
您可以添加direct:finalize端点以执行最终操作。
将完成模式设置为MANUAL意味着在直接路线:mysaga中处理交换时,传奇未完成,但会持续更长的时间(最长持续时间设置为2小时)。当两个异步操作都完成时,传奇就完成了。使用Camel Saga Component的saga:complete端点完成调用。有一个类似的端点可用于手动补偿Saga(saga:compensate)。
5. XML配置
Saga功能可供想要使用XML配置的用户使用。以下代码段显示了一个示例:
<route>
<from uri="direct:start"/>
<saga>
<compensation uri="direct:compensation" />
<completion uri="direct:completion" />
<option optionName="myOptionKey">
<constant>myOptionValue</constant>
</option>
<option optionName="myOptionKey2">
<constant>myOptionValue2</constant>
</option>
</saga>
<to uri="direct:action1" />
<to uri="direct:action2" />
</route>