基于Apache Camel的企业集成模式实践:第九章-SAGA EIP

基于Apache Camel的企业集成模式实践:第九章-SAGA EIP

精选文章moguli202025-04-28 22:57:1210A+A-

1 总览

Saga EIP提供了一种在Camel路线中定义一系列相关动作的方法,这些动作可以成功完成,也可以不执行或补偿。Saga实现协调使用任何传输的分布式服务通信,以实现全球一致的结果。Saga EIP与传统的ACID分布式(XA)交易不同,因为保证不同参与服务的状态仅在Saga结束时才是一致的,而不会在任何中间步骤中保持一致。

Saga EIP适用于不鼓励使用分布式事务的用例。例如,允许参与Saga的服务使用任何类型的数据存储,例如经典数据库甚至NoSQL非事务性数据存储。它们也适合在无状态云服务中使用,因为它们不需要将事务日志存储在服务旁边。Saga EIP也不需要在短时间内完成,因为它们不使用数据库级别的锁,这与事务不同。因此,它们可以生存更长的时间,从几秒钟到几天。

Saga EIP不使用数据锁定。相反,他们定义了“补偿动作”的概念,该动作是在标准流遇到错误时应执行的动作,目的是还原流执行之前存在的状态。补偿动作可以使用Java或XML DSL在Camel路由中声明,并且仅在需要时(如果由于错误而取消了传奇)由Camel调用补偿动作。

2 Saga EIP选项

Saga EIP支持以下列出的6个选项:

名称

描述

默认

类型

传播

设置Saga传播模式(REQUIRED,REQUIRES_NEW,MANDATORY,SUPPORTS,NOT_SUPPORTED,NEVER)。

需要


完成模式

确定如何将Saga视为完整。设置AUTO为时,成功处理启动Saga的交换时将完成Saga,或在异常完成时对其进行补偿。设置为时MANUAL,用户必须使用saga:completesaga:compensate端点完成或补偿传奇。


SagaCompletionMode

timeoutIn毫秒

设置佐贺的最长时间。超时到期后,传奇事件将自动得到补偿(除非在此期间做出了其他决定)。


补偿金

必须调用补偿端点URI来补偿路由中所做的所有更改。与补偿URI对应的路由必须执行补偿并且完整无误。如果在补偿过程中发生错误,则Saga服务会再次调用补偿URI重试。


SagaActionUri定义

完成

Saga成功完成时调用的完成端点URI。与完成URI对应的路由必须执行完成任务并终止而不会出现错误。如果在完成过程中发生错误,则Saga服务会再次调用完成URI重试。


SagaActionUri定义

选项

允许保存当前交换的属性,以便在补偿或完成回调路由中重用它们。选项通常很有用,例如,存储和检索在补偿操作中删除的对象的标识符。选项值转换为补偿/完成交换的输入标头。


清单

3 Saga服务配置

Saga EIP要求将实现接口的服务
org.apache.camel.saga.CamelSagaService
添加到Camel上下文。骆驼目前支持以下Saga服务:

  • InMemorySagaService:这是Saga EIP的基本实现,不支持高级功能(不进行远程上下文传播,在应用程序失败的情况下也不保证一致性)。

3.1 使用内存中传奇服务

不建议将In-memory Saga服务用于生产环境,因为它不支持Saga状态的持久性(仅保留在内存中),因此在应用程序失败的情况下,它不能保证Saga EIP的一致性(例如, JVM崩溃)。另外,在使用内存中的Saga服务时,无法使用传输级标头将Saga上下文传播到远程服务(可以通过其他实现方式完成)。当您要使用内存中的saga服务时,可以添加以下代码来自定义Camel上下文。该服务属于该camel-core模块。

context.addService(new org.apache.camel.impl.saga.InMemorySagaService());

4 例子

例如,您要下一个新订单,并且系统中有两项不同的服务:一项管理订单,一项管理贷项。从逻辑上讲,如果您有足够的信用可以下订单。使用Saga EIP,您可以将Direct:Buy路线建模为Saga,由两个不同的动作组成,一个动作创建订单,一个动作获得功劳。这两个动作都必须执行,否则,如果没有信用就下达订单,都不会被认为是不一致的结果(以及没有订单的付款)。

from("direct:buy")
  .saga()
    .to("direct:newOrder")
    .to("direct:reserveCredit");

在其余示例中,购买动作不会改变。用于为“新订单和储备金抵扣”操作建模的不同选项如下:

from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");

此处,传播模式设置为MANDATORY,这意味着在此路由中流动的任何交换必须已经是Saga的一部分(在本示例中就是这种情况,因为Saga是在direct:buy路由中创建的)。该直接:newOrder航线声明了一个补偿的行动,被称为直接:cancelOrder,负责撤销的情况下,佐贺被取消了订单。

每个交易所始终包含一个
Exchange.SAGA_LONG_RUNNING_ACTION
标头,在此用作订单的ID。这样可以确定在相应的补偿操作中要删除的顺序,但这不是必需的(可以将选项用作替代解决方案)。direct:newOrder的补偿动作是direct:cancelOrder,如下所示:

from("direct:cancelOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "cancelOrder")
  .log("Order ${body} cancelled");

当应取消订单时,Saga EIP实现会自动调用它。它不会因错误而终止。如果在direct:cancelOrder路由中引发错误,则EIP实现应定期重试以执行补偿操作,直至达到特定限制。这意味着任何补偿动作都必须是幂等的,因此应考虑到它可能多次触发并且在任何情况下都不会失败。如果所有重试后都无法补偿,则Saga实施应触发手动干预过程。

注意

可能发生由于Direct:newOrder路由执行的延迟,与此同时另一方取消了Saga(由于并行路由中的错误或Saga级别的超时)。因此,在调用补偿动作direct:cancelOrder时,它可能找不到取消的订单记录。为了保证完全的全局一致性,重要的是任何主要动作及其相应的补偿动作都是可交换的,例如,如果补偿发生在主要动作之前,则其效果应相同。

当不可能使用交换行为时,另一种可能的方法是始终使补偿动作失败,直到找到由主动作产生的数据(或耗尽最大重试次数)为止。这种方法可能在许多情况下都有效,但是它是启发式的

信用服务的执行几乎与订购服务相同。

from("direct:reserveCredit")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:refundCredit")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved in action ${body}");

呼吁赔偿行动:

from("direct:refundCredit")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(creditService, "refundCredit")
  .log("Credit for action ${body} refunded");

在这里,信用保留的补偿动作是退款。该示例可以与Saga EIP的两种实现一起运行,因为它不涉及远程端点。

4.1 处理完成事件

Saga完成时需要某种类型的处理。发生错误并取消Saga时,将调用补偿端点。在完成端点可以被调用时,佐贺成功完成做进一步的处理。例如,在上面的订单服务中,我们可能需要知道订单何时完成(和保留的信用)才能真正开始准备订单。如果付款未完成,我们不希望开始准备订单(与大多数现代的CPU不同,在允许您有权读取它之前,现代CPU允许您访问保留的内存)。可以使用direct:newOrder端点的修改版本轻松完成此操作:

  1. 调用完成端点:
from("direct:newOrder")
  .saga()
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
    .bean(orderManagerService, "newOrder")
    .log("Order ${body} created");
  1. 直接:cancelOrder是一样的前面的示例所示。调用成功完成,如下所示:
from("direct:completeOrder")
  .transform().header(Exchange.SAGA_LONG_RUNNING_ACTION)
  .bean(orderManagerService, "findExternalId")
  .to("jms:prepareOrder")
  .log("Order ${body} sent for preparation");

Saga完成后,订单将发送到JMS队列进行准备。像补偿动作一样,Saga协调器也可以多次调用完成动作(尤其是在发生错误(例如网络错误)的情况下)。在此示例中,侦听prepareOrder JMS队列的服务已准备好保留可能的重复项(有关如何处理重复项的示例,请参见幂等方的EIP)。

4.2 使用自定义标识符和选项

您可以使用Saga选项来注册自定义标识符。例如,信贷服务的重构如下:

  1. 生成一个自定义ID并在正文中进行设置,如下所示:
from("direct:reserveCredit")
  .bean(idService, "generateCustomId")
  .to("direct:creditReservation")
  1. 委派动作并在补偿动作中根据需要标记当前主体。
from("direct:creditReservation")
  .saga()
  .propagation(SagaPropagation.SUPPORTS)
  .option("CreditId", body())
  .compensation("direct:creditRefund")
    .bean(creditService, "reserveCredit")
    .log("Credit ${header.amount} reserved. Custom Id used is ${body}");
  1. 仅在取消传奇时,才从标题中检索CreditId选项。
from("direct:creditRefund")
  .transform(header("CreditId")) // retrieve the CreditId option from headers
  .bean(creditService, "refundCredit")
  .log("Credit for Custom Id ${body} refunded");

直接:creditReservation端点可以通过传播模式设定为被称为佐贺之外,SUPPORTS。这样,可以在佐贺路线中声明多个选项。

4.3 设置超时

在Saga EIP上设置超时可确保在机器故障的情况下Saga不会永远卡住。Saga EIP实现在未明确指定的所有Saga EIP上均设置了默认超时。当超时时间到期时,除非之前已做出其他决定,否则Saga EIP将决定取消Saga(并补偿所有参与者)。

可以对Saga参与者设置超时,如下所示:

from("direct:newOrder")
  .saga()
  .timeout(1, TimeUnit.MINUTES) // newOrder requires that the saga is completed within 1 minute
  .propagation(SagaPropagation.MANDATORY)
  .compensation("direct:cancelOrder")
  .completion("direct:completeOrder")
    // ...
    .log("Order ${body} created");

所有参与者(例如,信贷服务,订单服务)都可以设置自己的超时时间。这些超时的最小值作为它们组合在一起时的传奇超时。也可以在Saga级别指定超时,如下所示:

from("direct:buy")
  .saga()
  .timeout(5, TimeUnit.MINUTES) // timeout at saga level
    .to("direct:newOrder")
    .to("direct:reserveCredit");

4.4 选择传播

上面的示例使用MANDATORYSUPPORTSREQUIRED传播模式,这是在未指定其他任何内容的情况下使用的默认传播。这些传播模式将交易环境中使用的等效模式1:1映射。

传播

描述

REQUIRED

加入现有的Saga或创建一个新的Saga(如果不存在)。

REQUIRES_NEW

始终创建一个新的传奇。暂停旧的Saga,并在新的Saga终止后恢复它。

MANDATORY

传奇故事必须已经存在。现有的佐贺县加入。

SUPPORTS

如果Saga已经存在,请加入。

NOT_SUPPORTED

如果Saga已经存在,它将被挂起并在当前块完成时恢复。

NEVER

在Saga中绝不能调用当前块。

4.5 使用手动完成(高级)

当无法以同步方式全部执行Saga,但例如需要使用异步通信通道与外部服务进行通信时,则无法将完成模式设置为AUTO(默认),因为交换时Saga尚未完成创建完成。对于执行时间较长(小时,天)的Saga EIP,通常是这种情况。在这些情况下,应使用MANUAL完成模式。

from("direct:mysaga")
  .saga()
  .completionMode(SagaCompletionMode.MANUAL)
  .completion("direct:finalize")
  .timeout(2, TimeUnit.HOURS)
    .to("seda:newOrder")
    .to("seda:reserveCredit");

为seda:newOrder和seda:reserveCredit添加异步处理。这些将异步回调发送到seda:operationCompleted。

from("seda:operationCompleted") // an asynchronous callback
  .saga()
  .propagation(SagaPropagation.MANDATORY)
    .bean(controlService, "actionExecuted")
    .choice()
      .when(body().isEqualTo("ok"))
        .to("saga:complete") // complete the current saga manually (saga component)
    .end()

您可以添加direct:finalize端点以执行最终操作。

将完成模式设置为MANUAL意味着在直接路线:mysaga中处理交换时,传奇未完成,但会持续更长的时间(最长持续时间设置为2小时)。当两个异步操作都完成时,传奇就完成了。使用Camel Saga Component的saga:complete端点完成调用。有一个类似的端点可用于手动补偿Saga(saga:compensate)。

5. XML配置

Saga功能可供想要使用XML配置的用户使用。以下代码段显示了一个示例:

<route>
  <from uri="direct:start"/>
  <saga>
    <compensation uri="direct:compensation" />
    <completion uri="direct:completion" />
    <option optionName="myOptionKey">
      <constant>myOptionValue</constant>
    </option>
    <option optionName="myOptionKey2">
      <constant>myOptionValue2</constant>
    </option>
  </saga>
  <to uri="direct:action1" />
  <to uri="direct:action2" />
</route>
点击这里复制本文地址 以上内容由莫古技术网整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

莫古技术网 © All Rights Reserved.  滇ICP备2024046894号-2