一、分布式事务2PC原理
事务协调者向所有参与者发送预处理请求。
所有参与者处理之后回应事务协调者能否提交事务。
如果所有参与者都一致回复可提交事务,那么事务协调者向所有参与者发送提交事务请求,否则发送回滚事务请求。
所有参与者提交事务,并回应事务协调者,如果事务协调者在一定时间后还未收到回应,可能会发出超时重试请求。
AT模式
第一阶段,参与者会拦截sql,查询出需要更新的这条数据的原始数据保存为before image,然后执行sql,提交事务,并将更新后的数据也查询出来保存为after image,最后生成行锁,保证了一阶段操作的原子性。
第二阶段,如果是提交操作的话,删除保存的before image和after image以及行锁即可。如果是回滚操作,则将进行以下几个步骤:
校验数据库数据和after image数据,如果不一致则说明有脏写,需要人工处理,否则执行下一步骤。
将before image的数据逆向为sql,还原到数据库中。
删除删除保存的before image和after image以及行锁。
TCC模式
try阶段,事务参与者执行预留资源的操作,如果所有参与者的资源预留操作都可正常执行,则执行confrim操作,否则执行cancel操作。
confirm阶段,执行真正的提交操作,并清除预留操作。
cancel阶段,执行预留资源的撤销操作。
二、Seata安装配置
点击Releases · seata/seata (github.com)进入下载页面,选择最新版本(v1.6.1)下载seata并解压。
修改application.yaml文件,配置注册中心,配置中心和数据库连接信息,注意数据库驱动也要改。
# Copyright 1999-2019 Seata.io Group. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. server: port: 7091 spring: application: name: seata-server logging: config: classpath:logback-spring.xml file: path: ${user.home}/logs/seata extend: logstash-appender: destination: 127.0.0.1:4560 kafka-appender: bootstrap-servers: 127.0.0.1:9092 topic: logback_to_logstash console: user: username: seata password: seata seata: config: # support: nacos, consul, apollo, zk, etcd3 type: nacos nacos: server-addr: 127.0.0.1:8848 namespace: 75d49a80-72f9-4b15-97b8-08ad100100b0 group: SEATA_GROUP username: nacos password: nacos context-path: ##if use MSE Nacos with auth, mutex with username/password attribute #access-key: #secret-key: data-id: seataServer.properties registry: # support: nacos, eureka, redis, zk, consul, etcd3, sofa type: nacos nacos: application: seata-server server-addr: 127.0.0.1:8848 group: SEATA_GROUP namespace: 75d49a80-72f9-4b15-97b8-08ad100100b0 cluster: default username: nacos password: nacos context-path: ##if use MSE Nacos with auth, mutex with username/password attribute #access-key: #secret-key: store: # support: file 、 db 、 redis mode: db session: mode: db lock: mode: db db: datasource: druid db-type: mysql driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://localhost:3306/seata?rewriteBatchedStatements=true user: root password: 123456 min-conn: 10 max-conn: 100 global-table: global_table branch-table: branch_table lock-table: lock_table distributed-lock-table: distributed_lock query-limit: 1000 max-wait: 5000 # server: # service-port: 8091 #If not configured, the default is '${server.port} + 1000' security: secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017 tokenValidityInMilliseconds: 1800000 ignore: urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login
创建对应的数据库seata,并执行初始脚本,脚本在script/server/db/mysql.sql文件中。
在nacos中创建一个dataId为seataServer.properties,group为SEATA_GROUP的配置,并将script/config-center/config.txt中的配置全部复制到刚才创建的配置中,然后根据实际情况修改nacos中的配置,注意修改存储模式,如果存储模式修改为db,可以删除和数据库无关的其他存储模式的配置。
注意这一行配置:
service.vgroupMapping.default_tx_group=default
在后面会使用到,等号右边default是集群的名字,在application.yml文件中有配置cluster: default
。#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html #Transport configuration, for client and server transport.type=TCP transport.server=NIO transport.heartbeat=true transport.enableTmClientBatchSendRequest=false transport.enableRmClientBatchSendRequest=true transport.enableTcServerBatchSendResponse=false transport.rpcRmRequestTimeout=30000 transport.rpcTmRequestTimeout=30000 transport.rpcTcRequestTimeout=30000 transport.threadFactory.bossThreadPrefix=NettyBoss transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler transport.threadFactory.shareBossWorker=false transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector transport.threadFactory.clientSelectorThreadSize=1 transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread transport.threadFactory.bossThreadSize=1 transport.threadFactory.workerThreadSize=default transport.shutdown.wait=3 transport.serialization=seata transport.compressor=none #Transaction routing rules configuration, only for the client service.vgroupMapping.default_tx_group=default #If you use a registry, you can ignore it service.default.grouplist=127.0.0.1:8091 service.enableDegrade=false service.disableGlobalTransaction=false #Transaction rule configuration, only for the client client.rm.asyncCommitBufferLimit=10000 client.rm.lock.retryInterval=10 client.rm.lock.retryTimes=30 client.rm.lock.retryPolicyBranchRollbackOnConflict=true client.rm.reportRetryCount=5 client.rm.tableMetaCheckEnable=true client.rm.tableMetaCheckerInterval=60000 client.rm.sqlParserType=druid client.rm.reportSuccessEnable=false client.rm.sagaBranchRegisterEnable=false client.rm.sagaJsonParser=fastjson client.rm.tccActionInterceptorOrder=-2147482648 client.tm.commitRetryCount=5 client.tm.rollbackRetryCount=5 client.tm.defaultGlobalTransactionTimeout=60000 client.tm.degradeCheck=false client.tm.degradeCheckAllowTimes=10 client.tm.degradeCheckPeriod=2000 client.tm.interceptorOrder=-2147482648 client.undo.dataValidation=true client.undo.logSerialization=jackson client.undo.onlyCareUpdateColumns=true server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 client.undo.logTable=undo_log client.undo.compress.enable=true client.undo.compress.type=zip client.undo.compress.threshold=64k #For TCC transaction mode tcc.fence.logTableName=tcc_fence_log tcc.fence.cleanPeriod=1h #Log rule configuration, for client and server log.exceptionRate=100 #Transaction storage configuration, only for the server. The file, db, and redis configuration values are optional. store.mode=db store.lock.mode=db store.session.mode=db #Used for password encryption store.publicKey= #These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block. store.db.datasource=druid store.db.dbType=mysql store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://localhost:3306/seata?useUnicode=true&rewriteBatchedStatements=true store.db.user=root store.db.password=123456 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.distributedLockTable=distributed_lock store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 #Transaction rule configuration, only for the server server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.distributedLockExpireTime=10000 server.xaerNotaRetryTimeout=60000 server.session.branchAsyncQueueSize=5000 server.session.enableBranchAsyncRemove=false server.enableParallelRequestHandle=false #Metrics configuration, only for the server metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
运行bin目录下的启动文件即可。注意:使用MySQL8时,如果启动遇到Unable to load authentication plugin 'caching_sha2_password',可能是忘了修改数据库驱动为
com.mysql.cj.jdbc.Driver
,也可以进入lib/jdbc目录下把老版本的连接驱动删掉。
三、Seata结合客户端使用
创建两个微服务,这个比较随意,就不贴代码了。两个微服务主要的公共依赖如下:(还有一些常见依赖没有贴出来,例如lombok,springboot的devtools)
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jdbc</artifactId> </dependency> <dependency> <groupId>com.mysql</groupId> <artifactId>mysql-connector-j</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> </dependency>
其中A服务中独有的依赖如下:
<!--必须依赖,用于负载均衡--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> </dependency> <!--使用feign调用接口时必须依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency>
微服务B中编写了一个接口,接口中实现了对数据库B的修改操作。微服务A也编写了一个接口,该接口中对数据库A的数据进行了修改,同时通过远程调用微服务B提供的接口操作了数据库B,这里的数据库A和数据库B也可以是同一个数据库,因为是在不同的服务中调用。主要示例代码如下:
/** * 此方法中直接操作了数据库,同时通过微服务远程调用间接操作了另一个数据库。 * 两个数据库也可以是同一个。 * 然后使用 @GlobalTransactional注解,可以实现分布式事务,两个数据库中的数据要么同时修改,要么同时不修改。 * * @return */ @GetMapping("/") @GlobalTransactional public String test() { //在当前服务中修改数据库中的数据。 int i = accountRepository.decreaseAmount(1L); //通过feign远程调用另一个服务,操作数据库中的数据。 feignService.decreaseStockCount(); //模拟异常 System.out.println(1 / 0); return "ok"; }
在微服务A和B中的关于seata的配置如下,其余关于微服务本身的配置我在前面的文章有讲解,完全复制即可。注意前面几行配置,我在本文前面提到过,与这行配置
service.vgroupMapping.default_tx_group=default
一致即可。seata: tx-service-group: default_tx_group #和seata的config.txt配置文件中的service.vgroupMapping.default_tx_group=default一致 service: vgroup-mapping: - default_tx_group: default #集群名字 registry: type: nacos nacos: application: seata-server group: SEATA_GROUP server-addr: @nacos.server-addr@ #用于管理配置的nacos服务地址 namespace: @nacos.namespace@ #命名空间 username: @nacos.username@ #@***@这种写法表示从pom.xml文件读取配置 password: @nacos.password@ config: # seata客户端的配置从nacos配置中心读取 type: nacos nacos: data-id: seataServer.properties server-addr: @nacos.server-addr@ #用于管理配置的nacos服务地址 namespace: @nacos.namespace@ #命名空间 username: @nacos.username@ #@***@这种写法表示从pom.xml文件读取配置 password: @nacos.password@ group: SEATA_GROUP
在每一个微服务使用到的数据库中创建undo_log表:注意是每一个数据库都需要创建此表。
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`) ) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8;
启动程序,访问微服务A暴露的接口,我们可以打个断点看数据库中相关表数据的变化。
评论区