6、分布式事务

分布式事务

分布式事务概念

img

产生的背景

分布式事务解决方案

全局事务(2PC)

2 Prepared Commit

全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型--X/Open Distributed Transcation Processing Reference Model。它规定,要实现分布式事务的话,就需要三种角色:

阶段一:表决阶段,所有参与者都预提交(只执行sql,但不提交)事务,并将能都成功的信息反馈发给协调者。

事务协调者(事务管理器)给每个参与者(资源管理器)发送 Prepare 消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的 redo 和 undo 日志,但不提交,到达一种“万事俱备,只欠东风”的状态。

阶段二:执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。

如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)

image-20200414235849852

优缺点

TCC两阶段补偿性方案

数据库的事务,为逻辑业务处理

TCC,Try Confirm Cancel,它属于补偿型事务。顾名思义,TCC实现分布式事务一共有三个步骤:

image-20200928141556991

分布式事务中间件

Seata

官网地址:http://seata.io/zh-cn/

Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。致力于在微服务架构下提供高性能和简单易用的分布式事务服务。(AT模式是阿里首推模式,阿里云上有商用版本的GTS[Global Transaction service全局事务服务])。

它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。

发展史

2014 - 阿里中间件团队发布txc(taobao transaction constructor)在阿里内部提供分布式事务服务; 2016 - txc经过改造和升级,变成了gts(global transaction service)在阿里云作为服务对外开放,也成为当时唯一一款对外的服务; 2019 - 阿里经过txc和gts的技术积累,决定开源(Apache开源协议)。并且,在github上发起了一个项目叫做fescar(fast easy commit and rollback)开始拥有了社区群体; 2019 - fescar被重命名为了seata(simple extensiable autonomous transaction architecture),项目迁移到了新的github地址。

全局事务

AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。

Seata把一个全局事务,看做是由一组分支事务组成的一个大的事务(分支事务可以直接认为就是本地事务)

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

Seata把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图:

image-20210112182411510

Seata中的三个组件

Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。

Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。

Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

image-20210112182522859

Seata管理分布式事务的典型流程

  1. TM创建全局事务,告知TC,TC生成全局唯一的XId
  2. 在TM中调用分支事务RM,将xid传给RM。
  3. RM通过xid注册的TC中,告知TC他是哪一个全局事务的分支事务。
  4. TM告知TC,可以进行提交或者回滚了。
  5. 由TC通知TM对应的RM进行提交或回滚。

img

Seata安装与启动

1、下载seata

下载地址:https://github.com/seata/seata/releases

2、修改

将原来的模板全部删除,粘贴下面的

registry块用以控制将TC注册在哪里(注册中心),config块用以控制将TC的配置文件交由谁来管理(配置中心)

registry {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
}

config {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = ""
    cluster = "default"
  }
}

3、修改

定义某个事务组,属于哪个TC集群

image-20210827180716703

service.vgroup_mapping.micro-product=default
service.vgroup_mapping.micro-orders=default

其中,micro-product和micro-orders是事务分组的名字,其后的取值“default”是TC集群的名字

4、将seata的配置文件存入配置中心

确保nacos已经运行

在conf目录下,执行nacos-config.sh脚本,该脚本能把seata TC所需要的配置存入nacos配置中心

image-20210827161245989

进入nacos控制台,查看配置列表,可以看到很多Group为SEATA_GROUP的配置

image-20210827161327966

5、启动seata服务(TC)

bin目录下

默认占用的端口是8091,可以在启动seata-server时,通过-p指定端口

seata-server.bat -p 8091 -m file

启动后,在nacos的服务列表下面可以看到一个名为serverAddr的服务

image-20210827161649776

微服务中使用Seata

1、在需要进行事务管理的微服务中添加依赖

如果只加依赖,不进行下面的配置,启动会报错

依赖中封装了(TM、RM)

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

2、每个微服务添加初始化数据表

初始化数据表:在每个微服务的项目数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表

-- the table to store seata xid data
-- 0.7.0+ add context
-- you must to init this sql for you business databese. the seata server not need it.
-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
drop table `undo_log`;
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

3、在需要进行事务管理的微服务添加配置类

在star-orders和star-product的config包中,创建以下配置类,在其中配置代理数据源,该代理数据源主要添加了日志记录功能,TC通过该日志才能够进行回滚!也就是说这一步必不可少!!

//普通数据源
@Configuration
public class DataSourceProxyConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")  //读取yml中的配置
    public DriverManagerDataSource dataSource(){
        return new DriverManagerDataSource();
    }
    // 必须在代理数据源上添加@Primary,保证在自动装配时优先注入代理数据源
    @Primary
    @Bean
    public DataSourceProxy dataSource(DriverManagerDataSource driverManagerDataSource) {
        return new DataSourceProxy(driverManagerDataSource);
    }
}
//druid数据源
@Configuration
public class DataSourceProxyConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource") //读取yml中的配置
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }
    
    // 必须在代理数据源上添加@Primary,保证在自动装配时优先注入代理数据源
    @Primary
	@Bean
    public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

4、修改微服务启动类

启动类上去掉默认的数据库链接,否则容易造成死循环

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)

5、微服务添加seata的配置文件registry.conf

用于告诉应用中的TM,去哪里找TC

registry {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = ""
  }
}
config {
  type = "nacos"
  nacos {
    serverAddr = "localhost"
    namespace = ""
  }
}

6、修改微服务的配置文件application.yml

用于告诉应用中的TM,去哪里找TC

要与nacos-config.txt中的service.vgroup_mapping.micro-orders=default一致

spring: 
	cloud:
		alibaba:
      		seata:
        		tx-service-group: micro-orders

7、在微服务控制层方法,开启全局事务

添加注解@GlobalTransactional

@GetMapping("/addOrderProduct/{uid}")
@GlobalTransactional
public ResultVO addOrderProduct(@PathVariable Integer uid){

}

测试时,记得关闭micro-orders中的ProductService的后备方法

CAP理论

C:Consistency -- 一致性,在分布式系统完成某写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。

A:Availability -- 可用性, 一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。

P:Partition Tolerance -- 分区容错性,指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。

image-20210830165854653

CAP三者不可兼得,该如何取舍

(1) CA: 优先保证一致性和可用性,放弃分区容错。 这也意味着放弃系统的扩展性,系统不再是分布式的,有违设计的初衷。

(2) CP: 优先保证一致性和分区容错性,放弃可用性。在数据一致性要求比较高的场合(譬如:zookeeper,Hbase) 是比较常见的做法,一旦发生网络故障或者消息丢失,就会牺牲用户体验,等恢复之后用户才逐渐能访问。

(3) AP: 优先保证可用性和分区容错性,放弃一致性。NoSQL中的Cassandra 就是这种架构。跟CP一样,放弃一致性不是说一致性就不保证了,而是逐渐的变得一致。

分布式锁

分布式锁的特点

Redission

image-20210830172444706

分布式锁的实现原理

使用Reids,自己进行实现

@PostMapping("/shopAndReduceStoreRedis")
public ResultVO shopAndReduceStore(){
    ResultVO resultVO = null;
    //time为锁的释放时间,如果达到3秒,那么会自动释放锁
    Long time = 3000L;
    //设置效果相当于redis命令setnx lock lock,如果当前的key存在,那么无法复制和创建,返回false
    Boolean addLockSuccess = redisTemplate.opsForValue().setIfAbsent("lock", "lock",time,TimeUnit.SECONDS);
    //判断是否成功添加锁,如果没有,则循环,直到成功添加锁
    while (!addLockSuccess){
        addLockSuccess = redisTemplate.opsForValue().setIfAbsent("lock", "lock",time,TimeUnit.SECONDS);
        //等待一会儿再尝试
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //守护线程,对主线程进行续命
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            sleep(time/2);
            redisTemplate.expire("lock",time/2,TimeUnit.SECONDS);
        }
    });
    thread.setDaemon(true);
    thread.start();
    try {
        Integer store = (Integer) redisTemplate.opsForValue().get("store");
        if (store == 0){
            resultVO = ResultVO.fail("库存不足");
        }else {
            redisTemplate.opsForValue().set("store",store--);
            resultVO = ResultVO.success("购买成功");
        }
    }catch (Exception e){
        e.printStackTrace();
        resultVO = ResultVO.fail("购买异常!");
    }finally {
        //释放锁
        redisTemplate.delete("lock");
        return resultVO;
    }
}

使用Redission实现

1、导入依赖
<!-- 导入Redisson分布锁技术 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>
2、编写Redission的配置类
/**
 * Redisson配置类
 * 主要提供:Redis分布式锁的功能
 */
@Configuration
public class RedissonConfiguration {
    
    /**
     * 根据配置信息,产生一个Redisson的客户端对象
     * @return
     */
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}
3、代码中具体使用
@Resource
private RedissonClient redissonClient;

@PostMapping("/shopAndReduceStore")
public ResultVO shopAndReduceStore(){
    RLock lock = null;
    ResultVO resultVO = null;
    try {
        //获取锁对象
        lock = redissonClient.getLock("lock");
        //加锁,30秒释放锁,防止死锁出现
        lock.lock(30L, TimeUnit.SECONDS);
        Integer store = (Integer) redisTemplate.opsForValue().get("store");
        if (store == 0){
            resultVO = ResultVO.fail("库存不足");
        }else {
            redisTemplate.opsForValue().set("store",store--);
            resultVO = ResultVO.success("购买成功");
        }
    }catch (Exception e){
        e.printStackTrace();
        resultVO = ResultVO.fail("购买异常!");
    }finally {
        //释放锁
        lock.unlock();
        return resultVO;
    }
}