什么是分布式事務(wù)問題?

單體應(yīng)用

  單體應(yīng)用中,一個業(yè)務(wù)操作需要調(diào)用三個模塊完成,此時數(shù)據(jù)的一致性由本地事務(wù)來保證。

微服務(wù)應(yīng)用

  隨著業(yè)務(wù)需求的變化,單體應(yīng)用被拆分成微服務(wù)應(yīng)用,原來的三個模塊被拆分成三個獨(dú)立的應(yīng)用,分別使用獨(dú)立的數(shù)據(jù)源,業(yè)務(wù)操作需要調(diào)用三個服務(wù)來完成。此時每個服務(wù)內(nèi)部的數(shù)據(jù)一致性由本地事務(wù)來保證,但是全局的數(shù)據(jù)一致性問題沒法保證。

小結(jié)

  在微服務(wù)架構(gòu)中由于全局?jǐn)?shù)據(jù)一致性沒法保證產(chǎn)生的問題就是分布式事務(wù)問題。簡單來說,一次業(yè)務(wù)操作需要操作多個數(shù)據(jù)源或需要進(jìn)行遠(yuǎn)程調(diào)用,就會產(chǎn)生分布式事務(wù)問題。

Seata 是什么?

  Seata 是一款開源的分布式事務(wù)解決方案,致力于提供高性能和簡單易用的分布式事務(wù)服務(wù)。Seata 將為用戶提供了 AT、TCC、SAGA 和 XA 事務(wù)模式,為用戶打造一站式的分布式解決方案。

  官網(wǎng):http://seata.io/

Seata 組成

Transaction ID(XID)

  全局唯一的事務(wù)id

三組件

  Transaction Coordinator(TC):事務(wù)協(xié)調(diào)器,維護(hù)全局事務(wù)的運(yùn)行狀態(tài),負(fù)責(zé)協(xié)調(diào)并驅(qū)動全局事務(wù)的提交或回滾

  Transaction Manager(TM):控制全局事務(wù)的邊界,負(fù)責(zé)開啟一個全局事務(wù),并最終發(fā)起全局提交或全局回滾的決議

  Resource Manager(RM):控制分支事務(wù),負(fù)責(zé)分支注冊、狀態(tài)匯報,并接受事務(wù)協(xié)調(diào)的指令,驅(qū)動分支(本地)事務(wù)的提交和回滾

Seata 分布式事務(wù)處理過程

  過程圖:

  

  說明:

  1、TM 向 TC 申請開啟一個全局事務(wù),全局事務(wù)創(chuàng)建成功并生成一個全局唯一的 XID;

  2、XID 在微服務(wù)調(diào)用鏈路的上下文中傳播;

  3、RM 向 TC 注冊分支事務(wù),將其納入 XID 對應(yīng)的全局事務(wù)的管轄;

  4、TM 向 TC 發(fā)起針對 XID 的全局提交或回滾決議;

  5、TC 調(diào)度 XID 下管轄的全部分支事務(wù)完成提交或回滾請求。

搭建demo

  • cloud-seata-order9011

    application.yml

# 端口
server:
  port: 9011
spring:
  application:
    name: seata-order-service
  #   數(shù)據(jù)源基本配置
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
    alibaba:
      seata:
        tx-service-group: my_test_tx_group
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata-order?allowPublicKeyRetrieval=true&useSSL=true
    username: root
    password: 678678
    #druid
    type: com.alibaba.druid.pool.DruidDataSource
mybatis:
  mapperLocations: classpath:mapper/*Mapper.xml
# seata
seata:
  enabled: true
  application-id: applicationName
  tx-service-group: my_test_tx_group
  enable-auto-data-source-proxy: true
  config:
    type: nacos
    nacos:
      namespace:
      serverAddr: 127.0.0.1:8848
      group: SEATA_GROUP
      userName: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      userName: "nacos"
      password: "nacos"
  service:
    vgroup-mapping: my_test_tx_group

    

 

     OrderServiceImpl

 1 package com.sdkj.service.impl;
 2 
 3 import com.sdkj.dao.OrderDao;
 4 import com.sdkj.entity.Order;
 5 import com.sdkj.service.AccountService;
 6 import com.sdkj.service.OrderService;
 7 import com.sdkj.service.StorageService;
 8 import io.seata.spring.annotation.GlobalTransactional;
 9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.stereotype.Service;
11 
12 /**
13  * @Author wangshuo
14  * @Date 2022/5/30, 22:06
15  * Please add a comment
16  */
17 @Service
18 public class OrderServiceImpl implements OrderService {
19 
20     @Autowired
21     private OrderDao orderDao;
22     @Autowired
23     private StorageService storageService;
24     @Autowired
25     private AccountService accountService;
26 
27     //全局事務(wù)控制
28     @GlobalTransactional(name = "my_test_tx_group",rollbackFor = Exception.class)
29     @Override
30     public void create(Order order) {
31         System.out.println("create start");
32         //減庫存
33         storageService.decrease(order.getProductId(),order.getCount());
34         System.out.println("storage decrease success");
35         //扣減余額
36         accountService.decrease(order.getUserId(), order.getMoney());
37         System.out.println("order decrease success");
38         //修改訂單狀態(tài)
39         orderDao.update(order.getId(), order.getUserId(), 0);
40         //end
41         System.out.println("end");
42     }
43 }
  • cloud-seata-storage9012 / cloud-seata-account9013

    application.yml

# 端口
server:
  port: 9012
spring:
  application:
    name: seata-storage-service
  #   數(shù)據(jù)源基本配置
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
    alibaba:
      seata:
        tx-service-group: my_test_tx_group
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata-storage?allowPublicKeyRetrieval=true&useSSL=true
    username: root
    password: 678678
feign.hystrix.enabled: true
hystrix:
  command:
    default:
      circuitBreaker:
        sleepWindowInMilliseconds: 30000
        requestVolumeThreshold: 10
      execution:
        isolation:
          strategy: SEMAPHORE
          thread:
            timeoutInMilliseconds: 100000
# seata
seata:
  enabled: true
  application-id: applicationName
  tx-service-group: my_test_tx_group
  enable-auto-data-source-proxy: true
  config:
    type: nacos
    nacos:
      namespace:
      serverAddr: 127.0.0.1:8848
      group: SEATA_GROUP
      userName: "nacos"
      password: "nacos"
  registry:
    type: nacos
    nacos:
      application: seata-server
      server-addr: 127.0.0.1:8848
      namespace:
      userName: "nacos"
      password: "nacos"
  service:
    vgroup-mapping: my_test_tx_group

    

 

     StorageService

 1 package com.sdkj.service;
 2 
 3 /**
 4  * @Author wangshuo
 5  * @Date 2022/5/31, 9:31
 6  * AT模式
 7  */
 8 public interface StorageService {
 9 
10     Integer decrease(Long productId, Integer count);
11 }

    StorageServiceImpl

 1 package com.sdkj.service.impl;
 2 
 3 import com.sdkj.dao.StorageMapper;
 4 import com.sdkj.service.StorageService;
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.stereotype.Service;
 7 
 8 /**
 9  * @Author wangshuo
10  * @Date 2022/5/31, 9:42
11  * Please add a comment
12  */
13 @Service
14 public class StorageServiceImpl implements StorageService {
15 
16     @Autowired
17     private StorageMapper storageMapper;
18 
19     @Override
20     public Integer decrease(Long productId, Integer count) {
21         return storageMapper.decrease(productId, count);
22     }
23 }

    TCCStorageService

 1 package com.sdkj.service;
 2 
 3 import io.seata.rm.tcc.api.BusinessActionContext;
 4 import io.seata.rm.tcc.api.BusinessActionContextParameter;
 5 import io.seata.rm.tcc.api.LocalTCC;
 6 import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
 7 
 8 /**
 9  * @Author wangshuo
10  * @Date 2022/6/1, 17:34
11  * TCC模式
12  * 接口被seata管理 根據(jù)事務(wù)的狀態(tài)完成提交或回滾的操作
13  */
14 @LocalTCC
15 public interface TCCStorageService {
16 
17     @TwoPhaseBusinessAction(name = "StorageTcc",commitMethod = "addCommit",rollbackMethod = "addRollBack")
18     Integer decrease(@BusinessActionContextParameter(paramName = "productId") Long productId,
19                      @BusinessActionContextParameter(paramName = "count") Integer count);
20 
21     public boolean addCommit(BusinessActionContext context);
22 
23     public boolean addRollBack(BusinessActionContext context);
24 }

    TCCStorageServiceImpl

 1 package com.sdkj.service.impl;
 2 
 3 import com.alibaba.fastjson.JSON;
 4 import com.sdkj.service.TCCStorageService;
 5 import io.seata.rm.tcc.api.BusinessActionContext;
 6 import org.springframework.stereotype.Service;
 7 
 8 /**
 9  * @Author wangshuo
10  * @Date 2022/6/1, 17:40
11  * Please add a comment
12  */
13 @Service
14 public class TCCStorageServiceImpl implements TCCStorageService {
15 
16     //try階段
17     @Override
18     public Integer decrease(Long productId, Integer count) {
19         /*
20             新增
21                 到數(shù)據(jù)庫一條狀態(tài)為嘗試的數(shù)據(jù)
22             修改
23                 數(shù)據(jù)庫設(shè)計時多加一個對應(yīng)的凍結(jié)字段,try階段修改先修改try對應(yīng)字段
24          */
25         return null;
26     }
27 
28     //commit階段
29     @Override
30     public boolean addCommit(BusinessActionContext context) {
31         //獲取參數(shù)對象(Json格式)
32         Object productId = context.getActionContext("productId");
33         Long pid = JSON.parseObject(productId.toString(), Long.class);//也可以直接轉(zhuǎn)化為實(shí)體類中的數(shù)據(jù)
34 
35         //提交到數(shù)據(jù)庫
36         return false;
37     }
38 
39     //
40     @Override
41     public boolean addRollBack(BusinessActionContext context) {
42         //獲取參數(shù)對象(Json格式)
43         Object productId = context.getActionContext("productId");
44         Long pid = JSON.parseObject(productId.toString(), Long.class);//也可以直接轉(zhuǎn)化為實(shí)體類中的數(shù)據(jù)
45 
46         /*
47             新增
48                 刪除對應(yīng)數(shù)據(jù)
49             修改
50                 將凍結(jié)字段置為0或初始值
51          */
52         return false;
53     }
54 }

 

AT / TCC優(yōu)缺點(diǎn)對比

  • AT優(yōu)點(diǎn):

    使用簡單

    沒有入侵性

  • AT缺點(diǎn):

    性能較差(鎖)

  • TCC優(yōu)點(diǎn):

    性能較好

  • TCC缺點(diǎn):

    編碼 / 數(shù)據(jù)庫表結(jié)構(gòu)更加復(fù)雜

    有一定入侵性