开发者

讲解如何利用 Python完成 Saga 分布式事务

开发者 https://www.devze.com 2022-11-30 14:17 出处:网络 作者: 叶东富
目录1、分布式事务2、SAGA3、SAGA实践4、处理网络异常5、处理回滚6、小结银行跨行转账业务是一个典型分布式事务场景,假设A需要跨行转账给B,那么就涉及两个银行的数据,无法通过一个数据库...
目录
  • 1、分布式事务
  • 2、SAGA

  • 3、SAGA 实践
  • 4、处理网络异常
  • 5、处理回滚
  • 6、小结

讲解如何利用 Python完成 Saga 分布式事务

银行跨行转账业务是http://www.cppcns.com一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID,只能够通过分布式事务来解决。

1、分布式事务

分布式事务在分布式环境下,为了满足可用性、性能与降级服务的需要,降低一致性与隔离性的要求,一方面遵循 BASE 理论:

  • 基本业务可用性( Basic Availability )
  • 柔性状态( Soft state )
  • 最终一致性( Eventual consistency )

另一方面,分布式事务也部分遵循 ACID 规范:

  • 原子性:严格遵循
  • 一致性:事务完成后的一致性严格遵循;事务中的一致性可适当放宽
  • 隔离性:并行事务间不可影响;事务中间结果可见性允许安全放宽
  • 持久性:严格遵循

2、SAGA

Saga 是这一篇数据库论文SAGAS提到的一个分布式事务方案。其核心思想是将长事务拆分为多个本地短事务,由 Saga 事务协调器协调,如果各个本地事务成功完成那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。

目前可用于 SAGA 的开源框架,主要为 java 语言,其中以 seata 为代表。我们的例子采用 go 语言,使用的分布式事务框架为https://github.com/yedf/dtm,它对分布式事务的支持非常优雅。下面来详细讲解 SAGA 的组成:

DTM 事务框架里,有 3 个角色,与经典的 XA 分布式事务一样:

  • AP/应用程序,发起全局事务,定义全局事务包含哪些事务分支
  • RM/资源管理器,负责分支事务各项资源的管理
  • TM/事务管理器,负责协调全局事务的正确执行,包括 SAGA 正向 /逆向操作的执行

下面看一个成功完成的 SAGA 时序图,就很容易理解 SAGA 分布式事务:

讲解如何利用 Python完成 Saga 分布式事务

3、SAGA 实践

对于我们要进行的银行转账的例子,我们将在正向操作中,进行转入转出,在补偿操作中,做相反的调整。

首先我们创建账户余额表:

CREATE TABLE dtm_busi.`user_account` ( 
  `id` int(11) AUTO_INCREMENT PRIMARY KEY, 
  `user_id` int(11) not NULL UNIQUE , 
  `balance` decimal(10,2) NOT NULL DEFAULT '0.00', 
  `create_time` datetime DEFAULT now(), 
  `update_time` datetime DEFAULT now() 
); 


我们先编写核心业务代码,调整用户的账户余额

def saga_adjust_balance(cursor, uid, amount): 
  affected = utils.sqlexec(cursor, "update dtm_busi.user_account set balance=balance+%d where user_id=%d and balance >= -%d" %(amount, uid, amount)) 
  if affected == 0: 
    raise Exception("update error, balance not enough") 


下面我们来编写具体的正向操作 /补偿操作的处理函数

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  saga_adjust_balance(c, out_uid, -30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransOutCompensate") 
def trans_out_compensate(): 
  saga_adjust_balance(c, out_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  saga_adjust_balance(c, in_uid, 30) 
  return {"dtm_result": "SUCCESS"} 
 
@app.post("/api/TransInCompensate") 
def trans_in_compensate(): 
  saga_adjust_balance(c, in_uid, -30) 
  return {"dtm_result": "SUCCESS"} 


到此各个子事务的处理函数已经 OK 了,然后是开启 SAGA 事务,进行分支调用

# 这是 dtm 服务地址 
dtm = "http://localhost:8080/api/dtmsvr" 
# 这是业务微服务地址 
svc = "http://localhost:5000/api" 
 
    req = {"amount": 30} 
    s = saga.Saga(dtm, utils.gen_gid(dtm)) 
    s.add(req, svc + "/TransOutSaga", svc + "/TransOutCompensate") 
    s.add(req, svc + "/TransInSaga", svc + "/TransInCompensate") 
    s.submit() 


至此,一个完整的 SAGA 分布式事务编写完成。

如果您想要完整运行一个成功的示例,那么参考这个例子yedf/dtmcli-py-sampltGiITCe,将它运行起来非常简单

# 部署启动 dtm 
# 需要 docker 版本 18 以上 
git clone https://github.com/yedf/dtm 
cd dtm 
docker-compose up 
 
# 另起一个命令行 
git clone https://github.com/yedf/dtmcli-py-sample 
cd dtmcli-py-sample 
pip3 install flask dtmcli requests 
flask run 
 
# 另起一个命令行 
curl localhost:5000/api/fireSaga 

4、处理网络异常

假设提交给 dtm 的事务中,调用转入操作时,出现短暂的故障怎么办?按照 SAGA 事务的协议,dtm 会重试未完成的操作,这时我们要如何处理?故障有可能是转入操作完http://www.cppcns.com成后出网络故障,也有可能是转入操作完成中出现机器宕机。如何处理才能够保障账户余额的调整是正确无问题的?

这类网络异常的妥当处理,是分布式事务中的大难题,异常情况包括三类:重复请求、空补偿、悬挂,都需要正确处理

DTM 提供了子事务屏障功能,保证上述异常情况下的业务逻辑,只会有一次正确顺序下的成功提交。(子事务屏障详情参考分布式事务最经典的七种解决方案的子事务屏障环节)

我们把处理函数调整为:

@app.post("/api/TransOutSaga") 
def trans_out_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, out_uid, -30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "SUCCESS"} 


这里的 barrier_from_req(request).call(cursor, busi_callback)调用会使用子事务屏障技术,保证 busi_callback http://www.cppcns.com回调函数仅被提交一次

您可以尝试多次调用这个 TransIn 服务,仅有一次http://www.cppcns.com余额调整。

5、处理回滚

假如银行将金额准备转入用户 2 时,发现用户 2 的账户异常,返回失败,会怎么样?我们调整处理函数,让转入操作返回失败

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  return {"dtm_result": "FAILURE"} 


我们给出事务失败交互的时序图:

讲解如何利用 Python完成 Saga 分布式事务

这里有一点,TransIn 的正向操作什么都没有做,就返回了失败,此时调用 TransIn 的补偿操作,会不会导致反向调整出错了呢?

不用担心,前面的子事务屏障技术,能够保证 TransIn 的错误如果发生在提交之前,则补偿为空操作;TransIn 的错误如果发生在提交之后,则补偿操作会将数据提交一次。

我们可以将返回错误的 TransIn 改成:

@app.post("/api/TransInSaga") 
def trans_in_saga(): 
  with barrier.AutoCursor(conn_new()) as cursor: 
    def busi_callback(c): 
      saga_adjust_balance(c, in_uid, 30) 
    barrier_from_req(request).call(cursor, busi_callback) 
  return {"dtm_result": "FAILURE"} 


最后的结果余额依旧会是对的,原理可以参考:分布式事务最经典的七种解决方案的子事务屏障环节

6、小结

在这篇文章里,我们介绍了 SAGA 的理论知识,也通过一个例子,完整给出了编写一个 SAGA 事务的过程,涵盖了正常成功完成,异常情况,以及成功回滚的情况。相信读者通过这边文章,对 SAGA 已经有了深入的理解。

文中使用的 dtm 是新开源的 golang 分布式事务管理框架,功能强大,支持 TCC 、SAGA 、XA 、事务消息等事务模式,支持 Go 、python 、php 、node 、csharp 等语言的。同时提供了非常简单易用的接口。

以上就是利用 Python 轻松完成一个 Saga 分布式事务的详细内容,更多关于Python完成一个 Saga 分布式事务的资料请关注我们其它相关文章!

0

精彩评论

暂无评论...
验证码 换一张
取 消