系统越来越大,数据量越来越多,就不可避免的需要做数据库的分库,分片,分表
不同的数据定位在不同的数据库中,不同的数据库又对应着不同的微服务,这就变成了跨库操作,以前的本地事务@Transactional就没用了
不同的数据库对应的不同的微服务,每个微服务又有了它自己的业务逻辑,并且微服务之间又是相互独立的,此时:微服务相互调用过程中,就很难做到保证整个业务链条过程中,业务保持完整性
不同的数据库,还有可能分配在不同的机房中,甚至有可能在不同的网络中,进一步加深了咱们分布式事务控制难度
2 Prepared Commit
全局事务基于DTP模型实现。DTP是由X/Open组织提出的一种分布式事务模型--X/Open Distributed Transcation Processing Reference Model。它规定,要实现分布式事务的话,就需要三种角色:
阶段一:表决阶段,所有参与者都预提交(只执行sql,但不提交)事务,并将能都成功的信息反馈发给协调者。
事务协调者(事务管理器)给每个参与者(资源管理器)发送 Prepare 消息,每个参与者要么直接返回失败(如权限验证失败),要么在本地执行事务,写本地的 redo 和 undo 日志,但不提交,到达一种“万事俱备,只欠东风”的状态。
阶段二:执行阶段,协调者根据所有参与者的反馈,通知所有参与者,步调一致地执行提交或者回滚。
如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据协调者的指令执行提交或者回滚操作,释放所有事务处理过程中使用的锁资源。(注意:必须在最后阶段释放锁资源)
数据库的事务,为逻辑业务处理
TCC,Try Confirm Cancel,它属于补偿型事务。顾名思义,TCC实现分布式事务一共有三个步骤:
Try:尝试待执行的业务
这个过程并未执行业务,只是完成所有业务的一致性检查,并预留好执行所需的全部资源
Confirm:确认执行业务
确认执行业务操作,不做任何业务检查,只使用Try阶段预留的业务资源。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
Cancel:取消待执行的业务
取消Try阶段预留的业务资源。通常情况下,采用TCC则认为Cancel阶段也是一定会成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
官网地址:http://seata.io/zh-cn/
Seata是由阿里中间件团队发起的开源项目 Fescar,后更名为Seata,它是一个是开源的分布式事务框架。致力于在微服务架构下提供高性能和简单易用的分布式事务服务。(AT模式是阿里首推模式,阿里云上有商用版本的GTS[Global Transaction service全局事务服务])。
它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0侵入的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。
2014 - 阿里中间件团队发布txc(taobao transaction constructor)在阿里内部提供分布式事务服务; 2016 - txc经过改造和升级,变成了gts(global transaction service)在阿里云作为服务对外开放,也成为当时唯一一款对外的服务; 2019 - 阿里经过txc和gts的技术积累,决定开源(Apache开源协议)。并且,在github上发起了一个项目叫做fescar(fast easy commit and rollback)开始拥有了社区群体; 2019 - fescar被重命名为了seata(simple extensiable autonomous transaction architecture),项目迁移到了新的github地址。
AT 模式是一种无侵入的分布式事务解决方案。在 AT 模式下,用户只需关注自己的“业务 SQL”,用户的 “业务 SQL” 作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。
Seata把一个全局事务,看做是由一组分支事务组成的一个大的事务(分支事务可以直接认为就是本地事务)
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
Seata把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务,下图是全局事务与分支事务的关系图:
Transaction Coordinator (TC): 事务协调器,维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚。
Transaction Manager (TM): 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或全局回滚的决议。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。
下载地址:https://github.com/seata/seata/releases
将原来的模板全部删除,粘贴下面的
registry块用以控制将TC注册在哪里(注册中心),config块用以控制将TC的配置文件交由谁来管理(配置中心)
registry {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
cluster = "default"
}
}
定义某个事务组,属于哪个TC集群
service.vgroup_mapping.micro-product=default
service.vgroup_mapping.micro-orders=default
其中,micro-product和micro-orders是事务分组的名字,其后的取值“default”是TC集群的名字
确保nacos已经运行
在conf目录下,执行nacos-config.sh脚本,该脚本能把seata TC所需要的配置存入nacos配置中心
进入nacos控制台,查看配置列表,可以看到很多Group为SEATA_GROUP的配置
bin目录下
默认占用的端口是8091,可以在启动seata-server时,通过-p
指定端口
seata-server.bat -p 8091 -m file
启动后,在nacos的服务列表下面可以看到一个名为serverAddr的服务
如果只加依赖,不进行下面的配置,启动会报错
依赖中封装了(TM、RM)
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
初始化数据表:在每个微服务的项目数据库中加入一张undo_log表,这是Seata记录事务日志要用到的表
-- the table to store seata xid data
-- 0.7.0+ add context
-- you must to init this sql for you business databese. the seata server not need it.
-- 此脚本必须初始化在你当前的业务数据库中,用于AT 模式XID记录。与server端无关(注:业务数据库)
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
drop table `undo_log`;
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
在star-orders和star-product的config包中,创建以下配置类,在其中配置代理数据源,该代理数据源主要添加了日志记录功能,TC通过该日志才能够进行回滚!也就是说这一步必不可少!!
//普通数据源
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource") //读取yml中的配置
public DriverManagerDataSource dataSource(){
return new DriverManagerDataSource();
}
// 必须在代理数据源上添加@Primary,保证在自动装配时优先注入代理数据源
@Primary
@Bean
public DataSourceProxy dataSource(DriverManagerDataSource driverManagerDataSource) {
return new DataSourceProxy(driverManagerDataSource);
}
}
//druid数据源
@Configuration
public class DataSourceProxyConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource") //读取yml中的配置
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
// 必须在代理数据源上添加@Primary,保证在自动装配时优先注入代理数据源
@Primary
@Bean
public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
}
启动类上去掉默认的数据库链接,否则容易造成死循环
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
用于告诉应用中的TM,去哪里找TC
registry {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
}
}
config {
type = "nacos"
nacos {
serverAddr = "localhost"
namespace = ""
}
}
用于告诉应用中的TM,去哪里找TC
要与nacos-config.txt中的service.vgroup_mapping.micro-orders=default一致
spring:
cloud:
alibaba:
seata:
tx-service-group: micro-orders
添加注解@GlobalTransactional
@GetMapping("/addOrderProduct/{uid}")
@GlobalTransactional
public ResultVO addOrderProduct(@PathVariable Integer uid){
}
测试时,记得关闭micro-orders中的ProductService的后备方法
C:Consistency -- 一致性,在分布式系统完成某写操作后任何读操作,都应该获取到该写操作写入的那个最新的值。相当于要求分布式系统中的各节点时时刻刻保持数据的一致性。
A:Availability -- 可用性, 一直可以正常的做读写操作。简单而言就是客户端一直可以正常访问并得到系统的正常响应。用户角度来看就是不会出现系统操作失败或者访问超时等问题。
P:Partition Tolerance -- 分区容错性,指的分布式系统中的某个节点或者网络分区出现了故障的时候,整个系统仍然能对外提供满足一致性和可用性的服务。也就是说部分故障不影响整体使用。
(1) CA: 优先保证一致性和可用性,放弃分区容错。 这也意味着放弃系统的扩展性,系统不再是分布式的,有违设计的初衷。
(2) CP: 优先保证一致性和分区容错性,放弃可用性。在数据一致性要求比较高的场合(譬如:zookeeper,Hbase) 是比较常见的做法,一旦发生网络故障或者消息丢失,就会牺牲用户体验,等恢复之后用户才逐渐能访问。
(3) AP: 优先保证可用性和分区容错性,放弃一致性。NoSQL中的Cassandra 就是这种架构。跟CP一样,放弃一致性不是说一致性就不保证了,而是逐渐的变得一致。
Redis加锁的命令:SET lock_key random_value NX PX 5000
@PostMapping("/shopAndReduceStoreRedis")
public ResultVO shopAndReduceStore(){
ResultVO resultVO = null;
//time为锁的释放时间,如果达到3秒,那么会自动释放锁
Long time = 3000L;
//设置效果相当于redis命令setnx lock lock,如果当前的key存在,那么无法复制和创建,返回false
Boolean addLockSuccess = redisTemplate.opsForValue().setIfAbsent("lock", "lock",time,TimeUnit.SECONDS);
//判断是否成功添加锁,如果没有,则循环,直到成功添加锁
while (!addLockSuccess){
addLockSuccess = redisTemplate.opsForValue().setIfAbsent("lock", "lock",time,TimeUnit.SECONDS);
//等待一会儿再尝试
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//守护线程,对主线程进行续命
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
sleep(time/2);
redisTemplate.expire("lock",time/2,TimeUnit.SECONDS);
}
});
thread.setDaemon(true);
thread.start();
try {
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if (store == 0){
resultVO = ResultVO.fail("库存不足");
}else {
redisTemplate.opsForValue().set("store",store--);
resultVO = ResultVO.success("购买成功");
}
}catch (Exception e){
e.printStackTrace();
resultVO = ResultVO.fail("购买异常!");
}finally {
//释放锁
redisTemplate.delete("lock");
return resultVO;
}
}
<!-- 导入Redisson分布锁技术 -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
/**
* Redisson配置类
* 主要提供:Redis分布式锁的功能
*/
@Configuration
public class RedissonConfiguration {
/**
* 根据配置信息,产生一个Redisson的客户端对象
* @return
*/
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
return Redisson.create(config);
}
}
@Resource
private RedissonClient redissonClient;
@PostMapping("/shopAndReduceStore")
public ResultVO shopAndReduceStore(){
RLock lock = null;
ResultVO resultVO = null;
try {
//获取锁对象
lock = redissonClient.getLock("lock");
//加锁,30秒释放锁,防止死锁出现
lock.lock(30L, TimeUnit.SECONDS);
Integer store = (Integer) redisTemplate.opsForValue().get("store");
if (store == 0){
resultVO = ResultVO.fail("库存不足");
}else {
redisTemplate.opsForValue().set("store",store--);
resultVO = ResultVO.success("购买成功");
}
}catch (Exception e){
e.printStackTrace();
resultVO = ResultVO.fail("购买异常!");
}finally {
//释放锁
lock.unlock();
return resultVO;
}
}