简陋却不简单的ShardingSphere

Updated on with 0 views and 0 comments

image.png

友情提示:在看这篇文章前建议先看看 分库分表理论篇

1 使用ShardingSphere替换分区表

下边是使用ShardingSphere创建分天逻辑表的DistSQL:

CREATE DATABASE ad_report;
USE ad_report;

ADD RESOURCE u6rds_ad_report (
    URL="jdbc:mysql://rm-2zed07gjleia95ml1.mysql.rds.aliyuncs.com/ad_report?useSSL=false&useUnicode=true&characterEncoding=UTF-8&jdbcCompliantTruncation=false&zeroDateTimeBehavior=convertToNull",
    USER=xxx,
    PASSWORD=xxx
);

CREATE SHARDING ALGORITHM table_rollday (
    TYPE(NAME=interval,PROPERTIES("datetime-pattern"="yyyy-MM-dd HH:mm:ss","datetime-lower"="2022-01-01 00:00:00","datetime-upper"="2038-01-19 11:14:07","sharding-suffix-pattern"="yyyyMMdd","datetime-interval-unit"="DAYS","datetime-interval-amount"=1))
);

CREATE SHARDING TABLE RULE clicklog (
    DATANODES("u6rds_ad_report.clicklog_${20220101..20220110}"),
    TABLE_STRATEGY(TYPE=standard,SHARDING_COLUMN=time,SHARDING_ALGORITHM=table_rollday)
);

有同学可能有疑问:为什么不直接使用分区表呢?

需要注意的是,一项技术,不是用了就一定带来益处。比如显式锁功能比内置锁强大,你没玩好可能导致很不好的情况。分区也是一样,不是启动了分区数据库就会运行的更快,分区可能会给某些sql语句性能提高,但是分区主要用于数据库高可用性的管理。数据库应用分为两类,一类是OLTP(在线事务处理),一类是OLAP(在线分析处理)。对于OLAP应用分区的确可以很好的提高查询性能,因为一般分析都需要返回大量的数据,如果按时间分区,比如一个月用户行为等数据,则只需扫描响应的分区即可。在OLTP应用中,分区更加要小心,通常不会获取一张大表的10%的数据,大部分是通过索引返回几条数据即可。

比如一张表1000w数据量,如果一句select语句走辅助索引,但是没有走分区键。那么结果会很尴尬。如果1000w的B+树的高度是3,现在有10个分区。那么不是要(3+3)*10次的逻辑IO?(3次聚集索引,3次辅助索引,10个分区)。所以在OLTP应用中请小心使用分区表。

有同学可能又会问了,使用分区表查询肯定会使用分区键啊,但是请注意,如果表中存在主键或者唯一索引时,分区列必须是唯一索引的一个组成部分,请看下面栗子:

daa44d55dbfc4daea42b21702a9b108c.png

分区表还有一个坑爹的栗子:在使用key partition的时候,分区数不能设置为偶数,最好是质数,这样才可能使每个分区数量均匀。先看看分区表的表结构,使用的MySQL版本是5.7:

632aea93a16e4daa8e579173be6724f8.png

如果分区数为10,那么p0,p2,p4,p6,p8都会有数据,观察下来还是比较均匀的,但是p1,p3,p5,p7,p9这些奇数分区一条数据也没有,如下图所示:

b1e640b142424f59af0964627dafda9c.png

将分区数由10改成11后,如下图所示:

21aa44b29ef44e549f6eee0a9bd986aa.png

眼见为实了吧,就问你分区表坑爹不坑爹吧!

其实分区表还有其他限制,比方说分区数有上限,再比方说分区表添加字段很麻烦,我们使用的是MySQL 5.7,每次添加字段都是先创建一张加好字段的空表,然后再搬数据,如果直接在分区表上加字段可能会锁很长时间的表。

2 有意思的SQL改写

来看一个单库多表的查询栗子:

4720282878eb4615a93c540272eb7a84.png

说明:在ShardingSphere中,可以使用 preview 预览实际将要分发执行的SQL。

从上面这个栗子中我们可以看到ShardingSphere并没有直接将原SQL分发执行,而是做了改写:

  • 将逻辑表名clicklog改写成clicklog_yyyyMMdd
  • 添加一个查询列 id AS ORDER_BY_DERIVED_0
  • limit 1,2 改成 limit 0,3

为什么做如上改写,请看 分库分表理论篇

那么是不是每执行一条SQL都需要进行这么复杂的改写呢?不是的!!!请看下面这个栗子:

3e6642bd7230443ea8e4f46e5ff9fc92.png

上面这个栗子只是将逻辑表名替换成了实际表名,没有做其他改写。原因就是根据Sharding字段可以判断只需要查询一张表,不存在多表查询的情况,所以不需要做额外的SQL改写

3 强大却简陋的DistSQL

灵活的规则配置和资源管控能力是 Apache ShardingSphere 的特点之一。 在使用ShardingSphere-Proxy时,开发者虽然可以像使用数据库一样操作数据,但却需要通过YAML文件(或注册中心)配置资源和规则。 然而,YAML格式的展现形式,以及注册中心动态修改带来的操作习惯变更,对于运维工程师并不友好。

DistSQL让用户可以像操作数据库一样操作 Apache ShardingSphere,让用户用最规范、标准、熟悉的查询方式操纵及管理ShardingSphere分布式数据库生态所有数据库资源及元数据信息,使其从面向开发人员的框架和中间件转变为面向运维人员的数据库产品。

DistSQL细分为RDL、RQL 和RAL三种类型:

  • RDL(Resource & Rule Definition Language):负责资源和规则的创建、修改和删除;
  • RQL(Resource & Rule Query Language):负责资源和规则的查询和展现;
  • RAL(Resource & Rule Administration Language):负责 Hint、事务类型切换、分片执行计划查询等管理功能。

但是,目前DistSQL并非ShardingSphere宣传的那么美好,我们来看一个糟心的栗子:

a4f7473932154fb4a4286d95dc0f9e0d.png

我们使用DistSQL提供的ALTER语句修改分天表的日期上限,因为 timestamp 类型的字段最大值为 2038-01-19 11:14:07,所以这里尝试将日期上限修改成 20380119。不幸的是,这条ALTER语句执行了很久都没有结束,一直卡着,如下图所示:

8260a9f055444688920825a2f765bc6f.png

为了分析这个问题,我们再次搬出碉堡的 arthas,先来看看thread情况:

e46bf153d20e46baa46e735481fe29f6.png

很容易就能发现有个线程CPU使用率最高,接下来看看这个线程的堆栈信息:

965b28b42190471b94fe61b3ad8594f2.png

从堆栈信息中我们发现了 getDataSourceNames 这个方法,再dump一次堆栈信息:

176fa99e03184a3cb7c38926ce3b65df.png

这回出现了一个新的方法 isValidDataNode,找出源码看看:

cdaa215f4e0240caa685dfb80e976e22.png

看到上面这段逻辑是不是有骂娘的冲动?本来一层循环就能解决的事,非要搞的这么复杂。。。

再来看看具体的参数调用:

2f88f8d6c4a74933a21e1fb092a4033d.png

原本我们设定的日期后缀其实被当作数值处理,每次对数值加1,然后再加上双层循环,简直就是丧心病狂。看到这里大家应该能够理解为什么说DistSQL代码简陋了吧!

下面是修改后的代码:

3789547194784806bc1c30ff704b3c55.png

重新部署ShardingSphere后,再次执行那个ALTER语句瞬间完成:

aeca133aa37e45dda2f585c2a9562c22.png

4 添加分天Sharding上限约束条件

一般情况下,当天的实时数据我们会写到当天的数据表中,但是我们在查询的时候很有可能查当天某个时间点之后的数据,如下图所示:

8b6dd35a9489492fb4eef75798c22489.png

从图中我们可以看到实际执行的查询会分发到查询时间点之后的所有表,问题是我们可能并没有预先创建所有的表,可能只是创建了往后几天的表,那么这种情况下执行就可能报错了。

针对上述问题,我们来看下该如何解决。首先我们要明确这个问题并不是什么性能问题,而是代码逻辑问题,说白了就是SQL路由问题,直接使用arthas可能找不到切入点,这时候可以看看ShardingSphere官网有没有相关内容。虽然官网做的很简陋,不过只要我们足够耐心还是可以找到如下内容:

e4fe5305aded4ce6b1448ac88d87116e.png

找到切入口之后,查看相关代码:

067711d257d44eb4b21ee4cc0fb2a015.png

为了确定上面列出来的方法是否在SQL路由过程中被调用,我们还是祭出杀手锏 arthas,这里我们先在arthas做好 watch 准备,如下图所示:

2663f0ff39ea497d9704c4d8b46faf1a.png

当我们执行preview语句后就能捕获到哪个方法被调用:

f18f0b1f1cfd48caabe83be8be448dee.png

我们可以看到上面方法被调用后的返回值是 ShardingConditions 对象,其内部有一个条件List,这个List只有一个ShardingCondition,取值范围是 [2022-01-18 16:00:00, 正无穷大),我们要改造的其实就是这个取值范围。

继续跟代码,会找到 WhereClauseShardingConditionEngine 这个类,还是使用arthas的watch命令观察参数情况,最终会定位到如下方法:

86f90c2b87ac400c9754168df0469eea.png

接下来要改造就比较简单了,下面是修改后的代码:

f0a7b3797a4f4650aa490c2c89e5c97d.png

代码改造之后的效果图:

2eb3548221d04299885cad18e26238c6.png

5 重新认识监控

ShardingSphere并不负责如何采集、存储以及展示应用性能监控的相关数据,而是将SQL解析与SQL执行这两块数据分片的最核心的相关信息发送至 APM(应用性能监控)系统,并交由其处理。 换句话说,ShardingSphere仅负责产生具有价值的数据,并通过标准协议递交至相关系统。

ShardingSphere可以通过三种方式对接应用性能监控系统:

  • 第一种方式是使用 OpenTracing API 发送性能追踪数据。面向OpenTracing协议的APM产品都可以与ShardingSphere自动对接,比如SkyWalking、Zipkin和Jaeger。

  • 第二种方式是使用SkyWalking的自动探针。 ShardingSphere团队与SkyWalking团队共同合作,在SkyWalking中实现了ShardingSphere自动探针,可以将相关的应用性能数据自动发送到SkyWalking中。

  • 第三种方式是使用OpenTelemetry发送性能追踪数据。OpenTelemetry在2019年由OpenTracing和OpenCencus合并而来。

接下来我们重点了解下APM。目前APM的主要功能着眼于分布式系统的性能诊断,其主要功能包括调用链展示,应用拓扑分析等。这里需要提到一个概念:可观测性 (Observability)。可观测性有三种数据模型:追踪 (Traces)、度量 (Metrics)、日志 (Logs)。

c08e72ac29c54e44bb1eeeca6a3907d2.png

这三种数据模型不是相互独立的,之间有一定联系。

5.1 Logs

首先Logs最好理解,就是对各个应用中打印的log进行收集和提供查询能力。

Logs系统的重要性不言而喻,通常我们在排查特定的请求的时候,是非常依赖于上下文的日志的。

以前我们都是通过terminal登录到机器里面去查log,但是由于集群化和微服务化的原因,继续使用这种方式工作效率会比较低,因为你可能需要登录好几台机器搜索日志才能找到需要的信息,所以需要有一个地方中心化存储日志,并且提供日志查询。

Logs的典型实现是ELK(ElasticSearch、Logstash、Kibana),三个项目都是由Elastic开源,其中最核心的就是ES的储存和查询的性能得到了大家的认可,经受了非常多公司的业务考验。

33961e573c5c4aab9545cf3964a3af80.png

我们回过头来分析Logs系统,Logs系统的数据来自于应用中打印的日志,它的特点是数据量可能很大,取决于应用开发者怎么打日志,Logs系统需要存储全量数据,通常都要支持至少1周的储存。

每条日志包含 ipthreadclasstimestamptraceIdmessage 等信息,它涉及到的技术点非常容易理解,就是日志的存储和查询。

使用也非常简单,排查问题时,通常先通过关键字搜到一条日志,然后通过它的traceId来搜索整个链路的日志。

5.2 Metrics

Metrics是指对系统中某一类信息的统计聚合。这里的信息可以包括:JVM、CPU、数据库连接池等里面的具体指标。

说到监控,就不得不提 Prometheus + Grafana 这对组合,它们对机器健康情况、URL 访问统计、QPS、P90、P99 等等这些需求,支持得非常好,它们用来做监控大屏是非常合适的,但是通常不能帮助我们排查问题,它看到的是系统压力高了、系统不行了,但不能一下子看出来为啥高了、为啥不行了。

d8c1b3a1e4c4474790c378f0058a200a.png

5.3 Traces

再来看看 Traces 系统,它用于记录整个调用链路

前面介绍的Logs系统使用的是开发者打印的日志,所以它是最贴近业务的。而Traces系统就离业务更远一些了,它关注的是一个请求进来以后,经过了哪些应用、哪些方法,分别在各个节点耗费了多少时间,在哪个地方抛出的异常等,用来快速定位问题。

731c5325262f4ca787ef94cad06d13c9.png

在微服务时代,追踪不只是单体程序调用栈的追踪;一个外部请求导致的内部服务的调用轨迹的收集,更是它的重要内容。因此,分布式系统中的追踪在国内通常被称为“全链路追踪”,许多资料中也把它叫做是“分布式追踪”(Distributed Tracing)。

经过多年的发展,Traces系统虽然在服务端的设计很多样,但是客户端的设计慢慢地趋于统一,所以有了OpenTracing项目,我们可以简单理解为它是一个规范,它定义了一套API,把客户端的模型固化下来。当前比较主流的Traces系统中,Jaeger、SkyWalking是使用这个规范的,而 Zipkin、Pinpoint没有使用该规范。

6 有挑战的分布式JOIN

6.1 MySQL相关的JOIN算法

先来回顾一下单机版MySQL的Join算法。

6.1.1 Nested-Loop Join Algorithm(嵌套循环Join算法)

最简单的Join算法及外循环读取一行数据,根据关联条件列到内循环中匹配关联,在这种算法中,我们通常称外循环表为驱动表,称内循环表为被驱动表。

Nested-Loop Join算法的伪代码如下:

331e97dc444c449ea2192d23e55906a7.png

6.1.2 Block Nested-Loop Join Algorithm(BNL算法)

BNL算法是对Nested-Loop Join算法的优化。具体做法是将外循环的行缓存起来,读取缓冲区中的行,减少内循环表被扫描的次数。例如,外循环表与内循环表均有100行记录,普通的嵌套内循环表需要扫描100次,如果使用块嵌套循环,则每次外循环读取10行记录到缓冲区中,然后把缓冲区数据传递给下一个内循环,将内循环读取到的每行和缓冲区中的10行进行比较,这样内循环表只需要扫描10次即可完成,使用块嵌套循环后内循环整体扫描次数少了一个数量级。使用块嵌套循环,内循环表扫描方式应是全表扫描,因为是内循环表匹配Join Buffer中的数据的。使用块嵌套循环连接,MySQL会使用连接缓冲区(Join Buffer),且会遵循下面一些原则:

  • 连接类型为ALL、index、range,会使用到Join Buffer。
  • Join Buffer是由 join_buffer_size 变量控制的。
  • 每次连接都使用一个Join Buffer,多表的连接可以使用多个Join Buffer。
  • Join Buffer只存储与查询操作相关的字段数据,而不是整行记录。

BNL算法的伪代码如下:

e3e1129004964924a309cf62be1f7caa.png

对上面的过程解释如下:

①将t1、t2的连接结果放到缓冲区中,直到缓冲区满为止。
②遍历t3,与缓冲区内的数据匹配,找到匹配的行,发送到客户端。
③清空缓冲区。
④重复上面的步骤,直至缓冲区不满。
⑤处理缓冲区中剩余的数据,重复步骤②。
假设S是每次存储t1、t2组合的大小,C是组合的数量,则t3被扫描的次数为:(S * C) / join_buffer_size + 1

由此可见,随着join_buffer_size的增大,t3被扫描的次数会减少,如果join_buffer_size足够大,大到可以容纳所有t1和t2连接产生的数据,那么t3只会被扫描一次。

来看一个具体的案例:

f1974c3c8cf7437b8cb8044137911250.png

aff4d818066d481299348699bc9400d4.png

6.1.3 Index Nested-Loop Join Algorithm(INLJ算法)

索引嵌套循环连接算法是基于嵌套循环算法的改进版,其优化的思路,主要是为了减少了内层循环匹配次数,就是通过外层数据循环与内存索引数据进行匹配,这样就避免了内层循环数据逐个与外层循环的数据进行对比,从 原来的匹配次数 = 外层所有行数据 * 内层所有行数据 优化成 外层所有行数据 * 索引树的高度,极大的提高的查询效率。

SQL案例:

4877e30a287b4cd28b362488ef2680ef.png

上面SQL大致执行流程如下图所示:

0533e0969cd24825b9024469534dad43.png

  • 从t2表中读取一行记录
  • 从第1步记录中,取出关联字段a到t表的辅助索引树中进行查找
  • 从t1表中取出辅助索引树中满足条件的记录拿出主键ID到主键索引中根据主键ID将剩下字段的数据取出与t2中获取到的结果进行合并,将结果放入结果集
  • 循环上面三个步骤,直到无法满足条件,将结果集返回给客户端

基于嵌套循环连接算法进行优化,虽然还是双层循环进行匹配数据,但是内层循环(被驱动表)是使用索引树的高度决定循环次数的,这样的话,无论驱动表和被驱动表的数据多大,效率还是很高的。

6.1.4 Batched Key Access(BKA)

BKA是对BNL算法的更进一步扩展及优化,其作用是在表连接时可以进行顺序I/O,所以BKA是在MRR(参考 MySQL 索引拾遗)基础之上实现的,同时BKA支持内连接、外连接和半连接操作。

当两个表连接时,在没有BKA的情况下如下图所示,可以看到访问t2表时是随机I/O

03adc420b70049cb918d5de409adf9c7.png

有了BKA之后如下图所示,可以看到对t2表进行连接访问时,先将t1中相关的字段放入Join buffer中,然后利用MRR特性接口进行排序(根据rowid),排序之后即可通过rowid到t2表中进行查找。

811a1090261d4f19bff39ffadff52d91.png

这里也有一个隐含的条件,就是关联字段需要有索引,否则还是会使用BNL算法的

6.2 Hash Join

在8.0.18之前,MySQL只支持NestLoopJoin算法。虽然MySQL对于Join做过若干优化,比如NBLJ、INLJ以及BKA等,但这些代替不了HashJoin的作用。关于HashJoin的详细介绍可以参考 Presto 优化拾遗

Hash join 跟 Block Nested-Loop Join 类似,都是把一批数据放进hash table里面再与另一个表进行join操作,区别是Block Nested-Loop Join有固定的buffer大小,而hash join 没有固定的大小,是把整张表的数据都放进内存中的hash table里面。如果放不下的话怎么办呢?放不下的话就会分成若干个partition,写入磁盘的temporary segment。

所以hash join这种方式适用于表比较小的情况,适用于较小的表整张表都能放入内存中的情况,如果一次放不完,IO次数就会变多,影响性能。时间复杂度为O(n)级别。

6.3 Merge Join

Merge join又称Sorted-Merge join就是先对两张表所关联的列进行排序。例如有两张表tA和tB:

06fd752f7b9745a58e16c11cab06914a.png

执行下面SQL:

select * from tA left join tB on tA.id=tB.id;

先对tA和tB分别按照id进行排序,那么tA和tB的id的顺序都变成了1、2、3、4。

这样再做join的时候,就能从上到下一一对应了。Merge join的时间复杂度为O(nlogn)级别(即排序的时间复杂度),不如Hash join,但是若两张表的关联列本来就是有序的,那就省去了排序的过程,这时候时间复杂度为O(n)级别,优于Hash join,节约了找hash值的时间。

6.4 JOIN在MapReduce中的实现

直接上具体的栗子:

select 
    u.name
    , o.orderid 
from order o 
join user u on o.uid = u.uid;

在map的输出value中为不同表的数据打上tag标记,在reduce阶段根据tag判断数据来源。MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式):

58580c4554f54a87bdadb21e97065f4f.png

从上图可以看到key相同的记录经过shuffle后排在一起

6.5 还在开发中的Federation执行引擎

ShardingSphere由Federation执行引擎(开发中)提供支持,对关联查询、子查询等复杂查询进行优化,同时支持跨多个数据库实例的分布式查询,内部使用关系代数优化查询计划,通过最优计划查询出结果。

f68e864ddd094de992b1acee2be5191e.png

ShardingSphere的3个产品的数据分片主要流程是完全一致的,按照是否进行查询优化,可以分为Standard内核流程和Federation执行引擎流程:

  • Standard内核流程SQL 解析 => SQL 路由 => SQL 改写 => SQL 执行 => 结果归并 组成,主要用于处理标准分片场景下的 SQL 执行

  • Federation执行引擎流程SQL 解析 => 逻辑优化 => 物理优化 => 优化执行 => Standard 内核流程 组成,Federation执行引擎内部进行逻辑优化和物理优化,在优化执行阶段依赖Standard内核流程,对优化后的逻辑SQL进行路由、改写、执行和归并


标题:简陋却不简单的ShardingSphere
作者:yanghao
地址:http://solo.fancydigital.com.cn/articles/2022/01/21/1642779824092.html