Sharding-JDBC

Sharding-JDBC定位为轻量级Java框架,在Java的JDBC层提供的额外服务。可理解为增强版的JDBC驱动,完全兼容JDBC和各种ORM 框架的使用

Sharding-JDBC主要功能

1) 数据分片:分库分表、读写分离、分片策略、分布式主键

2) 分布式事务:标准化事务接口、XA强一致性事务、柔性事务

3) 数据库治理:配置动态、服务治理、数据脱敏、链路追踪

核心概念

逻辑表:水平拆分的数据库的相同逻辑和数据结构表的总称

真实表:在分片的数据库中真实存在的物理表。

数据节点:数据分片的最小单元。由数据源名称和数据表组成

绑定表:分片规则一致的主表和子表。(binding-tables)

广播表:也叫公共表,指素有的分片数据源中都存在的表,表结构和表中的数据在每个数据库中都完全一致。例如字典表。(broadcast-tables)

分片键:用于分片的数据库字段,是将数据库(表)进行水平拆分的关键字段。SQL中若没有分片字段,将会执行全路由,性能会很差。

分片算法:通过分片算法将数据进行分片,支持通过=、BETWEEN和IN分片。分片算法需要由应用开发者自行实现,可实现的灵活度非常高。

分片策略:真正用于进行分片操作的是分片键+分片算法,也就是分片策略。在ShardingJDBC中一般采用基于Groovy表达式的inline分片策略,通过一个包含分片键的算法表达式来制定分片策略,如t_user_$->{u_id%8}标识根据u_id模8,分成8张表,表名称为t_user_0到t_user_7。

分片算法

1) 精确分片算法PreciseShardingAlgorithm

2) 范围分片算法RangeShardingAlgorithm

3) 复合分片算法ComplexKeysShardingAlgorithm

4) Hint分片算法HintShardingAlgorithm

分片策略

1) 标准分片策略StandardShardingStrateg (eq,in,between)

2) 复合分片策略ComplexShardingStrategy (多个字段联合)

3) 行表达式分片策略InlineShardingStrateg (eq)

4) Hint分片策略HintShardingStrategy (与sql无关)

5) 不分片策略NoneShardingStrategy

分片策略配置

1) 数据源分片策略

2) 表分片策略

数据分片流程

1) SQL解析:SQL解析分为词法解析和语法解析。先通过词法解析器将SQL拆分为一个个不可再分的单词。再使 用语法解析器对SQL进行理解,并最终提炼出解析上下文。

2) 查询优化:负责合并和优化分片条件,如OR等。

3) SQL路由:根据解析上下文匹配用户配置的分片策略,并生成路由路径。

4) SQL改写:将SQL改写为在真实数据库中可以正确执行的语句。

5) SQL执行:通过多线程执行器异步执行SQL。

6) 结果归并:将多个执行结果集归并以便于通过统一的JDBC接口输出。

数据分片SQL使用规范

https://shardingsphere.apache.org/document/current/cn/features/sharding/use-norms/sql/

1) 支持路由至单数据节点时,目前MySQL数据库100%全兼容,路由至多数据节点时,全面支持DQL、DML、DDL、DCL、TCL。

2) 路由至多数据节点不支持CASE WHEN、HAVING、UNION (ALL)。

3) 支持分页子查询,但其他子查询有限支持,无论嵌套多少层,只能解析第一层。

4) 由于归并的限制,子查询中包含聚合函数目前无法支持。

5) 不支持包含schema的SQL。

6) 当分片键处于运算表达式或函数中的SQL时,将采用全路由的形式获取结果。

读写分离及架构设计方案

spring.shardingsphere.sharding.master-slave-rules.ds0.masterDataSourceName=m1
spring.shardingsphere.sharding.master-slave-rules.ds0.slaveDataSourceNames[0]=m2

压测影子库 shadow

数据脱敏 encrypt

脱敏配置四部分:数据源配置,加密器配置,脱敏表配置以及查询属性配置

数据源配置:指DataSource的配置信息

加密器配置:指使用什么加密策略进行加解密。目前ShardingSphere内置了两种加解密策略: AES/MD5

脱敏表配置:指定哪个列用于存储密文数据(cipherColumn)、哪个列用于存储明文数据(plainColumn)以及用户想使用哪个列进行SQL编写(logicColumn)

查询属性的配置:当底层数据库表里同时存储了明文数据、密文数据后,该属性开关用于决定是直接查询数据库表里的明文数据进行返回,还是查询密文数据通过Encrypt-JDBC解密后返回。

加密策略解析:
Encryptor:提供encrypt(), decrypt()两种方法对需要脱敏的数据进行加解密。
QueryAssistedEncryptor:即使是相同的数据,如两个用户的密码相同,它们在数据库里存储的脱敏数据也应当是不一样的。

SPI

jdk提供服务实现查找的一个工具类:java.util.ServiceLoader。

Java SPI 的具体约定为:当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。

Apache ShardingSphere所有通过SPI方式载入的功能模块:

1) SQL解析接口:用于规定用于解析SQL的ANTLR语法文件

2) 数据库协议接口:用于Sharding-Proxy解析与适配访问数据库的协议

3) 数据脱敏接口:用于规定加解密器的加密、解密、类型获取、属性设置等方式

4) 分布式主键接口:用于规定如何生成全局性的自增、类型获取、属性设置等。

5) 分布式事务接口:用于规定如何将分布式事务适配为本地事务接口。

6) XA事务管理器接口:用于规定如何将XA事务的实现者适配为统一的XA事务接口。

7) 注册中心接口:用于规定注册中心初始化、存取数据、更新数据、监控等行为。

分布式事务理论CAP和BASE

CAP(强一致性)布鲁尔定理。对于共享数据系统,最多只能同时拥有CAP其中的两个

BASE(最终一致性)

基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。它的核心思想是即使无法做到强一致性(CAP 就是强一致性),但应用可以采用适合的方式达到最终一致性。

BA指的是基本业务可用性,支持分区失败;

S表示柔性状态,也就是允许短时间内不同步;

E表示最终一致性,数据最终是一致的,但是实时是不一致的。

分布式事务模式2PC和3PC

2PC模式(强一致性)

两阶段提交,就是将事务的提交过程分为两个阶段来进行处理。事务的发起者称协调者,事务的执行者称参与者。

1) 阶段 1:准备阶段 协调者向所有参与者发送事务内容,询问是否可以提交事务,并等待所有参与者答复。 各参与者执行事务操作,但不提交事务,将 undo 和 redo 信息记入事务日志中。 如参与者执行成功,给协调者反馈 yes;如执行失败,给协调者反馈 no。

2) 阶段 2:提交阶段 如果协调者收到了参与者的失败消息或者超时,直接给每个参与者发送回滚(rollback)消息; 否则,发送提交(commit)消息。

2PC 方案存在的问题

1) 性能问题:所有参与者在事务提交阶段处于同步阻塞状态,占用系统资源,容易导致性能瓶颈。

2) 可靠性问题:如果协调者存在单点故障问题,如果协调者出现故障,参与者将一直处于锁定状态。

3) 数据一致性问题:在阶段 2 中,如果发生局部网络问题,一部分事务参与者收到了提交消息,另一部分事务参与者没收到提交消息,那么就导致了节点之间数据的不一致。

3PC模式(强一致性)

两阶段提交的改进版本,引入超时机制,将两阶段的准备阶段拆分为 2 个阶段,插入了一个 preCommit 阶段。

1) 阶段1:canCommit 协调者向参与者发送 commit 请求,参与者如果可以提交就返回 yes 响应,否则返回 no 响应。

2) 阶段2:preCommit 协调者根据阶段 1 canCommit 参与者的反应情况执行预提交事务或中断事务操作。 参与者均反馈 yes:协调者向所有参与者发出 preCommit 请求,参与者收到 preCommit 请求后,执行事务操作,但不提交;将 undo 和 redo 信息记入事务日志 中;各参与者向协调者反馈 ack 响应或 no 响应,并等待最终指令。 任何一个参与者反馈 no或等待超时:协调者向所有参与者发出 abort 请求,无论收到协调者发出的 abort 请求,或者在等待协调者请求过程中出现超时,参与者均会中断事务。

3) 阶段3:do Commit 该阶段进行真正的事务提交,根据阶段 2 preCommit反馈的结果完成事务提交或中断操作。

Sharding-JDBC整合XA原理

ShardingSphere整合XA事务时,分离了XA事务管理和连接池管理,这样接入XA时,可以做到对业务的零侵入,而且ShardingSphere集成后,可保证分片后跨库事务强一致性,XA本身也是强一致性的。

执行步骤:

1) Begin(开启XA全局事务):调用具体的XA事务管理器开启XA的全局事务。

2) 执行物理SQL:将所有SQL操作,标记为XA事务。

3) Commit/rollback(提交XA事务):收集所有投票,全部收到提交,否则回滚。

1、 XA START|BEGIN 开启事务,这个test就相当于是事务ID,将事务置于ACTIVE状态
XA START 'test'; 2、对一个ACTIVE状态的XA事务,执行构成事务的SQL语句。
insert...//business sql3、发布一个XA END指令,将事务置于IDLE状态
XA END 'test'; //事务结束4、对于IDLE状态的XACT事务,执行XA PREPARED指令 将事务置于PREPARED状态。
//也可以执行 XA COMMIT 'test' ON PHASE 将预备和提交一起操作。
XA PREPARE 'test'; //准备事务
//PREPARED状态的事务可以用XA RECOVER指令列出。列出的事务ID会包含gtrid,bqual,formatID和data四个字段。
XA RECOVER;5、对于PREPARED状态的XA事务,可以进行提交或者回滚。
XA COMMIT 'test'; //提交事务
XA ROLLBACK 'test'; //回滚事务。

XA事务时需要注意以下几点:

XA事务无法自动提交

XA事务效率非常低下,全局事务的状态都需要持久化。性能非常低下,通常耗时能达到本地事务的10倍。

XA事务在提交前出现故障的话,很难将问题隔离开。

Base柔性事务 Seata

几种处理模式:

  • 最大努力通知型: 即分布式事务参与方都努力将自己的事务处理结果通知给分布式事务的其他参与方,也就是只保证尽力而为,不保证一定成功。适用于很多跨公司、流程复杂的场景。例如 电商完成一笔支付需要电商自己更改订单状态,同时需要调用支付宝完成实际支付。这种场景下,如果支付宝处理订单支付出错了,就只能尽力将错误结果通知给电商网站,让电商网站回退订单状态。

  • 补偿性:不保证事务实时的对齐状态,对于未对齐的事务,事后进行补偿。同样在电商调用支付宝的这个场景中,就只能通过定期对账的方式保证在一个账期内,双方的事务最终是对齐的,至于具体的每一笔订单,只能进行最大努力通知,不保证事务对齐。

  • 异步确保型: 典型的场景就是RocketMQ的事务消息机制。通过不断的异步确认,保证分布式事务的最终一致性。

  • 两阶段型: 通常用于都是操作数据库的分布式事务场景。 第一阶段准备阶段:分布式事务的各个参与方都提交自己的本地事务,并且锁定相关的资源。第二阶段提交阶段:由一个第三方的事务协调者综合处理各方的事务执行情况,通知各个参与方统一进行事务提交或者回退。

    **与两阶段协议对应的是增强版的三阶段协议。他们的本质区别在于,两阶段协议在准备阶段需要锁定资源,例如在数据库中,就是要加行锁。防止其他事务对数据做了调整,这样会导致在第二个阶段数据无法正常回滚。而对于Redis等其他的一些数据源,无法提供对应的锁资源操作。为了适应这样的场景,就在两阶段的准备阶段之前加一个询问阶段,在这一阶段,事务协调者只是询问各个参与方是否做好了准备。例如对于Redis,可能就是表示创建好了Redis连接。对于数据库,就只是表示已经创建好了JDBC连接。然后在准备阶段,参与者统一去写redo和undo日志,记录自己的事务提交状态。然后在最后的提交阶段,由事务协调者通知各个参与方统一进行事务提交或者回滚。

    **两阶段协议与三阶段协议的本质区别在于要不要锁资源。三阶段不用锁资源,所以适用性更强,并且对于事务的一致性强度也更高。**但是在编程实现上,两阶段对业务的侵入比较小,在很多框架中,直接声明一个注解就可以完成了。而三阶段对业务的侵入就比较大了,需要所有业务都按照三阶段的要求改造成TCC的模式。所以三阶段适合于一些对分布式事务准确性和时效性要求非常高的场景,比如很多银行系统。例如在一个典型的订单那支付操作中,A需要向B支付100元。使用TCC,在try阶段,通常会要求给订单设定一个状态UPDATING,同时A减少100元,B增加100元,并且将A需要减少的100元与B需要增加的100元这两个数据都单独记录下来,相当于锁定库存。这样可以用来实现类似锁资源的效果。然后在后续的confirm或者cancel操作中,将事务最终进行对齐。在这一步,首先需要修改订单状态,然后修改A和B的账户。这里注意,给A和B调整的账户都需要从锁定的资源中取,而不能凭空修改账户的数据。

  • SAGA模式:由分布式事务的各个参与方自己提供正向的提交操作以及逆向的回滚操作。事务协调者可以在各个参与方提交事务后,随时协调各个事务参与方进行回滚。具体来说,每个SAGA事务包含T1,T2,T3…Tn操作,每个操作都对应具体的补偿操作C1,C2,C3…Cn。那么SAGA事务就需要保证: 1、所遇事务T1,T2,T3…Tn执行成功(最佳情况),2、如果有事务执行失败了, T1,T2,T3…Tj,Cj,…C3,C2,C1执行成功(0<j<n)。例如对于客户扣款100块钱的操作,电商网站和支付宝都提供扣减客户100块钱的操作作为正向事务,同时也提供给客户加100块钱余额的操作作为逆向操作。这样事务协调者可以在检查电商网站和支付宝的扣款行为后,随时通知他们进行回滚。 这种方式对业务的影响也是比较大的。适合于事务流程比较长,参与方比较多的场景。

所以从广义上来看,ShardingSphere支持的这种XA事务其实也是属于一种柔性事务。但是一般情况下,BASE柔性事务特指Seata框架提供的柔性事务,因为BASE实际上是集成了阿里对于分布式事务的所有研究,而阿里的这些研究成果,最终都沉淀到了Seata框架中。ShardingSphere中对于柔性事务的支持,其实也是更多的基于Seata的AT模式,来实现的两阶段提交。这里要注意的是,虽然XA和AT都是基于两阶段协议提供的实现,但是AT模式相比XA模式,简化了对于资源锁的要求,所以可以认为在大部分的业务场景下,AT模式比XA模式性能稍高。

事务demo

    @Transactional@ShardingTransactionType(TransactionType.BASE) //LOCAL,XA,BASEpublic void insertProductTransaction() {}

properties demo

spring.shardingsphere.datasource.names=m1,m2spring.shardingsphere.datasource.m1.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.m1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.m1.url=jdbc:mysql://xxxxxxxx:3316/shardingdb1
spring.shardingsphere.datasource.m1.username=root
spring.shardingsphere.datasource.m1.password=xxxxxxxxspring.shardingsphere.datasource.m2.type=com.alibaba.druid.pool.DruidDataSource
spring.shardingsphere.datasource.m2.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.m2.url=jdbc:mysql://xxxxxxxx:3316/shardingdb2
spring.shardingsphere.datasource.m2.username=root
spring.shardingsphere.datasource.m2.password=xxxxxxxx#spring.shardingsphere.sharding.master-slave-rules.ds0.masterDataSourceName=m1
#spring.shardingsphere.sharding.master-slave-rules.ds0.slaveDataSourceNames[0]=m2#spring.shardingsphere.sharding.tables.t_product.actualDataNodes=ds0.t_productspring.shardingsphere.sharding.tables.product.actualDataNodes=m$->{1..2}.product_$->{1..2}spring.shardingsphere.sharding.tables.product.key-generator.column=pid
spring.shardingsphere.sharding.tables.product.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.product.key-generator.props.worker.id=1spring.shardingsphere.sharding.tables.inventory.actualDataNodes=m$->{1..2}.inventory_$->{1..2}spring.shardingsphere.sharding.tables.inventory.key-generator.column=iid
spring.shardingsphere.sharding.tables.inventory.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.inventory.key-generator.props.worker.id=1spring.shardingsphere.sharding.tables.product.database-strategy.inline.sharding-column=pid
spring.shardingsphere.sharding.tables.product.database-strategy.inline.algorithm-expression=m$->{pid%2+1}spring.shardingsphere.sharding.tables.product.table-strategy.inline.sharding-column=pid
spring.shardingsphere.sharding.tables.product.table-strategy.inline.algorithm-expression=product_$->{((pid+1)%4).intdiv(2)+1}spring.shardingsphere.sharding.tables.category.actualDataNodes=m$->{1..2}.category_$->{1..2}
spring.shardingsphere.sharding.tables.category.key-generator.column=cid
spring.shardingsphere.sharding.tables.category.key-generator.type=SNOWFLAKE
spring.shardingsphere.sharding.tables.category.key-generator.props.worker.id=1spring.shardingsphere.sharding.broadcast-tables[0]=category#spring.shardingsphere.sharding.binding-tables[0]=product,inventoryspring.shardingsphere.props.sql.show=true
standard.....
complex.....
hint.....

Sharding-JDBC 源码流程

Sharding-Proxy


下载: https://shardingsphere.apache.org/document/current/cn/downloads/

  1. conf/server.yaml、conf/config-sharding.yaml (config-encrypt.yaml config-master_slave.yaml config-shadow.yaml logback.xml)
  2. If you want to connect to MySQL, you should manually copy MySQL driver to lib directory.
  3. ./bin/start.sh 3316
  4. tail -f logs/stdout.log
  5. mysql -h0.0.0.0 -P3316 -usharding -psharding
  6. show databases;
  7. use sharding_db
  8. show tables;

配置:ConfigurationPropertyKey.java

案例 sharding-jdbc

pom.xml

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--spring-mvc的依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--加入springcloud alibaba--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.1.4</version></dependency><!--MyBatis分页插件starter--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.2.10</version></dependency><!--MyBatis分页插件--><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper</artifactId><version>5.1.8</version></dependency><!-- sharding 分库分表--><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.1.1</version></dependency><!-- transaction 2pc XA 2阶段提交分布式事务 --><!--     <dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-transaction-xa-core</artifactId><version>4.1.1</version><exclusions><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency>--><!-- Base - 基于华为servicecomb的saga柔性事务,shardingjdbc正式版之后没有进行维护,需要自己重构saga工程 --><!--<dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-jdbc-spring-boot-starter</artifactId><version>4.0.0-RC2</version></dependency>--><!--<dependency><groupId>io.shardingsphere</groupId><artifactId>sharding-transaction-base-saga</artifactId><version>4.0.0-RC2</version></dependency>--><!-- seata柔性事务-增强版的saga模型 --><!--        <dependency>--><!--            <groupId>org.apache.shardingsphere</groupId>--><!--            <artifactId>sharding-transaction-base-seata-at</artifactId>--><!--            <version>4.1.1</version>--><!--        </dependency>--><!--       <dependency>--><!--         <groupId>io.seata</groupId>--><!--            <artifactId>seata-all</artifactId>--><!--         <version>1.4.0</version>--><!--           <exclusions>--><!--             <exclusion>--><!--                  <artifactId>druid</artifactId>--><!--                 <groupId>com.alibaba</groupId>--><!--             </exclusion>--><!--         </exclusions>--><!--        </dependency>-->

application.yml

spring:
#  datasource:
#    url: jdbc:mysql://mysql.localhost.com:3306/tl_mall_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8
#    username: root
#    password: xxxxx
#    druid:
#      initial-size: 5 #连接池初始化大小
#      min-idle: 10 #最小空闲连接数
#      max-active: 20 #最大连接数
#      web-stat-filter:
#        exclusions: "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*" #不统计这些请求数据
#      stat-view-servlet: #访问监控网页的登录用户名和密码
#        login-username: druid
#        login-password: druid#分库分表配置shardingsphere:#数据源配置datasource:names: ds-master,ds-slaveds-master:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://mysql.localhost.com:3306/tl_mall_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8initialSize: 5minIdle: 10maxActive: 30validationQuery: SELECT 1 FROM DUALusername: rootpassword: xxxxxds-slave:type: com.alibaba.druid.pool.DruidDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://mysql.localhost.com:3306/tl_mall_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8initialSize: 5minIdle: 10maxActive: 30validationQuery: SELECT 1 FROM DUALusername: rootpassword: xxxxxsharding:default-data-source-name: ds-masterdefault-database-strategy:none:tables:oms_order:actual-data-nodes: ds-master.oms_order_$->{0..31}table-strategy:complex:sharding-columns: id,member_idalgorithm-class-name: com.mx.use.shardingjdbc.sharding.OmsOrderShardingAlgorithm
#          key-generator:
#            column: id
#            type: CUSTOM
#            props:
#              worker.id: 123
#              redis:
#                prefix: 'order_id_prefix:'oms_order_item:actual-data-nodes: ds-master.oms_order_item_$->{0..31}table-strategy:complex:sharding-columns: order_idalgorithm-class-name: com.mx.use.shardingjdbc.sharding.OmsOrderItemShardingAlgorithm
#          key-generator:
#            column: id
#            type: SNOWFLAKE
#            props:
#              worker.id: 123binding-tables: oms_order,oms_order_itembroadcastTables:- oms_company_address- oms_order_operate_history- oms_order_return_apply- oms_order_return_reason- oms_order_setting#读写分离配置masterSlaveRules:name: ds_msmasterDataSourceName: ds-masterslaveDataSourceNames: [ds-slave]loadBalanceAlgorithmType: ROUND_ROBINprops:sql:show: true

分片策略 – 复合型

/*** @description: 自定义分片算法,* 以纯粹的订单id为分片键会无法支持按用户id快速查询订单,* 所以截取客户id后2位拼凑到订单id上,分表时以订单id的后两位取模分片。* 这其实是个客户id和订单id都需要作为一个分片键,比较适用于复合分片算法* 复合分片算法配合复合策略使用,支持精确查询与部分范围查询**/
@Slf4j
public class OmsOrderShardingAlgorithm implements ComplexKeysShardingAlgorithm<String> {/* 订单编号列名 */private static final String COLUMN_ORDER_SHARDING_KEY = "id";/* 客户id列名*/private static final String COLUMN_CUSTOMER_SHARDING_KEY = "member_id";@Overridepublic Collection<String> doSharding(Collection<String> availableTargetNames, ComplexKeysShardingValue<String> complexKeysShardingValue) {/*处理 = 以及 in */if (!complexKeysShardingValue.getColumnNameAndShardingValuesMap().isEmpty()) {Map<String, Collection<String>> columnNameAndShardingValuesMap = complexKeysShardingValue.getColumnNameAndShardingValuesMap();if(columnNameAndShardingValuesMap.containsKey(COLUMN_ORDER_SHARDING_KEY) || columnNameAndShardingValuesMap.containsKey(COLUMN_CUSTOMER_SHARDING_KEY)){/*获取订单编号*/Collection<String> orderSns = complexKeysShardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_ORDER_SHARDING_KEY, new ArrayList<>(1));/* 获取客户id*/Collection<String> customerIds = complexKeysShardingValue.getColumnNameAndShardingValuesMap().getOrDefault(COLUMN_CUSTOMER_SHARDING_KEY, new ArrayList<>(1));/*合并订单id和客户id到一个容器中*/List<String> ids = new ArrayList<>(16);if (Objects.nonNull(orderSns)) ids.addAll(ids2String(orderSns));if (Objects.nonNull(customerIds)) ids.addAll(ids2String(customerIds));return ids.stream()/*截取 订单号或客户id的后2位*/.map(id -> id.substring(id.length() - 2))/* 去重*/.distinct()/* 转换成int*/.map(Integer::new)/* 对可用的表名求余数,获取到真实的表的后缀*/.map(idSuffix -> idSuffix % availableTargetNames.size())/*转换成string*/.map(String::valueOf)/* 获取到真实的表*/.map(tableSuffix -> availableTargetNames.stream().filter(targetName -> targetName.endsWith(tableSuffix)).findFirst().orElse(null)).filter(Objects::nonNull).collect(Collectors.toList());}}/*处理类似between and 范围查询*/else if(!complexKeysShardingValue.getColumnNameAndRangeValuesMap().isEmpty()){log.info("[MyTableComplexKeysShardingAlgorithm] complexKeysShardingValue: [{}]", complexKeysShardingValue);Set<String> tableNameResultList = new LinkedHashSet<>();int tableSize = availableTargetNames.size();/* 提取范围查询的范围*/Range<String> rangeUserId = complexKeysShardingValue.getColumnNameAndRangeValuesMap().get(COLUMN_ORDER_SHARDING_KEY);Long lower = Long.valueOf(rangeUserId.lowerEndpoint());Long upper = Long.valueOf(rangeUserId.lowerEndpoint());/*根据order_sn选择表*/for (String tableNameItem : availableTargetNames) {if (tableNameItem.endsWith(String.valueOf(lower % (tableSize -1 )))|| tableNameItem.endsWith(String.valueOf(upper % (tableSize -1 )))) {tableNameResultList.add(tableNameItem);}if (tableNameResultList.size() >= tableSize) {return tableNameResultList;}}return tableNameResultList;}log.warn("无法处理分区,将进行全路由!!");return availableTargetNames;}/*转换成String*/private List<String> ids2String(Collection<?> ids) {List<String> result = new ArrayList<>(ids.size());for(Object id : ids){String strId = Objects.toString(id);String idFact = strId.length()==1 ? "0"+strId : strId;result.add(idFact);}return result;}
}

dao

PortalOrderDao

/*** 前台订单自定义Dao*/
@Mapper
public interface PortalOrderDao {/*** 查询会员的订单* @param memberId 会员ID* @param status 订单状态* @return*/List<OmsOrderDetail> findMemberOrderList(@Param("memberId") Long memberId, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mx.use.shardingjdbc.dao.PortalOrderDao">...<select id="findMemberOrderList" resultMap="orderDetailMap">SELECTo.id,o.status,o.total_amount,o.pay_amount,o.order_sn,o.member_id,ot.id ot_id,ot.product_id ot_product_id,ot.product_name ot_product_name,ot.product_pic ot_product_pic,ot.product_price ot_product_price,ot.product_sku_id ot_product_sku_id,ot.product_sku_code ot_product_sku_code,ot.product_quantity ot_product_quantity,ot.product_attr ot_product_attrFROMoms_order oLEFT JOINoms_order_item ot ON o.id = ot.order_idWHEREo.delete_status = 0 and o.member_id=#{memberId}<if test="status != null">and o.status=#{status}</if>ORDER BY o.create_time desc</select>
</mapper>

service

@Autowired
private PortalOrderDao portalOrderDao;/*** 查询用户订单* @param memberId 会员ID* @param status  订单状态*/@Overridepublic CommonResult<List<OmsOrderDetail>> findMemberOrderList(Integer pageSize, Integer pageNum, Long memberId, Integer status) {PageHelper.startPage(pageNum,pageSize);return CommonResult.success(portalOrderDao.findMemberOrderList(memberId,status));}

ShardingSphere 分布式事务 整合Seata

直接整合seata – 异常

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><version>2.2.8.RELEASE</version><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions>
</dependency><dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.5.1</version>
</dependency>
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',`xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',`log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',`log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
seata:application-id: demo-order-currtx-service-group: default_tx_groupregistry:type: nacosnacos:application: seata-serverserver-addr: nacos.localhost.com:8848group: SEATA_GROUPconfig:nacos:server-addr: nacos.localhost.com:8848namespace: seata-configgroup: SEATA_GROUPdata-id: seataServer.properties
//此处不能使用@GlobalTransactional
@GlobalTransactional(name = "generateOrder",rollbackFor = Exception.class)
public CommonResult generateOrder(OrderParam orderParam, Long memberId)

执行异常

此处是本项目接入seata最难的地方:原因在于订单表用了分库分表技术(shardingsphere),seata不能对逻辑表进行解析。不能简单的在全局事务发起方使用@GlobalTransactional

Apache ShardingSphere 分布式事务 整合seata

  • Local 本地事务
  • XA 事务
  • BASE 柔性事务(Seata AT)

https://shardingsphere.apache.org/document/current/cn/reference/transaction/base-transaction-seata/

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',`xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',`context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',`log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',`log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',`log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
  1. 引入依赖
<!-- seata柔性事务-增强版的saga模型 --><dependency><groupId>org.apache.shardingsphere</groupId><artifactId>sharding-transaction-base-seata-at</artifactId><version>4.1.1</version></dependency>
  1. 配置 resources/seata.conf
    包含 Seata 柔性事务的应用启动时,用户配置的数据源会根据 seata.conf 的配置,适配为 Seata 事务所需的 DataSourceProxy,并且注册至 RM 中。
client {application.id = demo-order-currtransaction.service.group = default_tx_group
}
  1. resources/application.yml
  • enable-auto-data-source-proxy false
  • service.vgroup-mapping.default_tx_group default
seata:application-id: demo-order-currtx-service-group: default_tx_group#关闭数据源自动代理,交给sharding-jdbcenable-auto-data-source-proxy: falseregistry:type: nacosnacos:application: seata-serverserver-addr: nacos.localhost.com:8848group: SEATA_GROUPconfig:nacos:server-addr: nacos.localhost.com:8848namespace: seata-configgroup: SEATA_GROUPdata-id: seataServer.propertiesservice:vgroup-mapping:default_tx_group: default
  1. 开启全局事务配置
//注意:@GlobalTransactional 和 @ShardingTransactionType 不能同时出现,此处不能使用 @GlobalTransactional
//@GlobalTransactional(name = "generateOrder",rollbackFor = Exception.class)
//全局事务交给@SeataATShardingTransactionManager管理
@ShardingTransactionType(TransactionType.BASE)
@Transactional
public CommonResult generateOrder(OrderParam orderParam, Long memberId)

MySQL(十一):分库分表方案-ShardingSphere相关推荐

  1. 最全的MySQL 常用分库分表方案,都在这里!

    点击上方蓝色字体,选择"标星公众号" 优质文章,第一时间送达关注公众号后台回复pay或mall获取实战项目资料视频 点击此链接:一套的SpringCloud版聚合支付项目,资料文档 ...

  2. Mysql的分库分表(基于shardingsphere)

    一.名词解释 库:database:表:table:分库分表:sharding 二.数据库架构演变 刚开始我们只用单机数据库就够了,随后面对越来越多的请求,我们将数据库的写操作和读操作进行分离, 使用 ...

  3. 互联网公司MySQL常用分库分表方案总结

    更多内容关注微信公众号:fullstack888 一.数据库瓶颈 1.IO瓶颈 2.CPU瓶颈 二.分库分表 1.水平分库 2.水平分表 3.垂直分库 4.垂直分表 三.分库分表工具 四.分库分表步骤 ...

  4. MySQL第六讲 MySQL分库分表方案

    分库分表概念        分库分表就是业务系统将数据写请求分发到master节点,而读请求分发到slave 节点的一种方案,可以大大提高整个数据库集群的性能.但是要注意,分库分表的 一整套逻辑全部是 ...

  5. 最全的MySQL分库分表方案总结

    " 面试中我们经常会碰到的关于分库分表的问题!今天就给大家介绍互联网公司常用 MySQL 分库分表方案!希望对大家的面试有所帮助! 数据库瓶颈 不管是 IO 瓶颈,还是 CPU 瓶颈,最终都 ...

  6. MySQL(十):分库分表方案

    分库分表 是将数据拆分成不同的存储单元. 从分拆的角度上,可以分为垂直分片和水平分片. 垂直分片:按照业务来对数据进行分片,又称为纵向分片. 垂直分片往往需要对架构和设计进行调整.通常来讲,是来不及应 ...

  7. MySQL数据库的分库分表方案

    MySQL数据库的分库分表方案 一. 数据库瓶颈 不管是IO瓶颈,还是CPU瓶颈,最终都会导致数据库的活跃连接数增加,进而逼近甚至达到数据库可承载活跃连接数的阈值.在业务Service来看就是,可用数 ...

  8. MySQL:互联网公司常用分库分表方案汇总!

    本文来源: cnblogs.com/littlecharacter/p/9342129.html 一.数据库瓶颈 不管是IO瓶颈,还是CPU瓶颈,最终都会导致数据库的活跃连接数增加,进而逼近甚至达到数 ...

  9. MySQL主从(MySQL proxy Lua读写分离设置,一主多从同步配置,分库分表方案)

    Mysql Proxy Lua读写分离设置 一.读写分离说明 读写分离(Read/Write Splitting),基本的原理是让主数据库处理事务性增.改.删操作(INSERT.UPDATE.DELE ...

最新文章

  1. linux 脚本设置时间戳,修改linux系统时间的方法(date命令)
  2. Nginx+Tomcat集群与负载均衡
  3. python 将图片旋转多个角度_【技术】把你的朋友变成表情包?Python:So easy
  4. 没有J2EE容器的JNDI和JPA
  5. python 数据框缺失值_Python:处理数据框中的缺失值
  6. Docker日志收集最佳实践
  7. Webstorm React Nodejs 整合
  8. 清空对象里面所有的value值_Python 面向对象之组合用法
  9. S3C6410 SD卡启动uboot分析(详细)
  10. DCEP | 农行将上线数字人民币刷脸支付
  11. 算法设计与分析(第2版)屈婉玲 刘田 张立昂 王捍贫编著 第四章课后习题答案
  12. python获取当前时间戳_Python获取时间戳代码实例
  13. 机器学习之数据预备、清洗与特征工程
  14. 全世界国家中英文名称以及地区区号json格式【资源】
  15. 「Python编程规范」语句分隔符号
  16. 台式计算机显示不了无线网络,我是台式电脑,插上无线网卡怎么我的链接里不显示无线...
  17. python把int转为str_python中int与str互转方法
  18. 计算机专业买什么牌子的笔记本,买笔记本电脑什么牌子好(2020年6月笔记本电脑推荐)...
  19. 制造业数字化转型的意义是什么?
  20. 如何在1到100的整数数组中找到缺失的数字

热门文章

  1. xplorer2 Pro(资源管理器) v5.0.0
  2. qbs java_Qt构建工具QBS之零 —— QBS 概览
  3. 虚拟 sim 卡服务器,基于虚拟SIM卡的内置多虚拟SIM卡方法
  4. 狼滕图----狼的格言
  5. 内网服务器设置NAT123端口映射,方便外网连接;如何测试端口连通情况。
  6. 如何在苹果Mac中将 APFS 格式 U 盘抹成通用格式?
  7. 管理者的角色修炼-第三课-赢在执行
  8. 微信个人号裂变引流技巧,微信个人号粉丝裂变
  9. 转载别人的ftp,觉得目录结构不错,学习
  10. 配有p4cpu的微型计算机,2011江苏省基层公共基础知识最新考试试题库(完整版)