select
project
, media
, spot
, bu
, sum(1) as cnt
, sum(case when time_diff <= 30 then 1 else 0 end) as cnt_1
from (
select project, media, spot, t_time, ext_user_id, bu
, if(pre_time_diff <= post_time_diff, pre_time_diff, post_time_diff) as time_diff
from (
select project, media, spot, t_time, ext_user_id, bu
, date_diff('second', lag_time, t_time) as pre_time_diff
, date_diff('second', t_time, lead_time) as post_time_diff
from (
select bu, project, media, spot, t_time, ext_user_id
, lag(t_time, 1, cast('1970-01-01 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lag_time
, lead(t_time, 1, cast('2099-12-31 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lead_time
from (
select split_part(b.name, '#', 2) as project
, split_part(b.name, '#', 3) as media
, split_part(b.name, '#', 4) as spot
, 'ftx' as bu
, cast(t_time as timestamp) as t_time
, ext_user_id
from (
select ext_order_id
, ext_user_id
, replace("timestamp", '_', ' ') as t_time
from dwd.d_ad_ftx_impression_data
where ext_dsp_id = 23
and thisdate = '2021-10-20'
and cast("hour" as int) between 0 and 21
) t
join
(
select id, concat('阿里#', split_part(name, '阿里#', 2)) as name
from u6rds.ftx."order"
where name like '%阿里#%'
) b on t.ext_order_id = b.id
-- union all
-- select split_part(b.name, '#', 2) as project
-- , split_part(b.name, '#', 3) as media
-- , split_part(b.name, '#', 4) as spot
-- , 'dsp' as bu
-- , cast(t_time as timestamp) as t_time
-- , ext_user_id
-- from (
-- select ext_adgroup_id
-- , ext_user_id
-- , replace("timestamp", '_', ' ') as t_time
-- from dwd.d_ad_impression a
-- where ext_advertiser_id = 2003127
-- and thisdate = '2021-10-20'
-- and cast("hour" as int) between 0 and 20
-- ) a
-- join
-- (
-- select id, concat('阿里#', split_part(name, '阿里#', 2)) as name
-- from u6rds.ad_fancy.ad_group
-- where advertiser_id = 2003127
-- and name like '%阿里#%'
-- ) b on a.ext_adgroup_id = b.id
) t
) t1
) t2
) t3
group by 1, 2, 3, 4
注意上面这条SQL中有部分内容被注释掉了,提交查询后很快就可以出结果,如下图所示:
如果去掉注释,即加上【union all】的部分,那么查询会一直卡住,直至timeout。当我们直接查union all的这部分SQL,会发现结果集并不大,并且也是很快可以出结果,如下图所示:
按照我们正常的逻辑来理解,union all这部分结果集不大,并且不是做join,所以不会因为叉乘放大结果集,所以不应该导致查询查不出来啊!?
为了研究这个问题,我们先给大家铺垫一些Presto的基础知识。
Presto的架构实际上就是一套分布式的SQL执行架构,它最大的特点就是存储和计算分离,Presto只负责计算,存储的部分由数据源自身提供。
在Presto的架构中,有两种角色的节点:
Coordinator,负责接收SQL Query,解析SQL,生成和优化执行计划,生成和调度Task到Worker上。Coordinator将一个完整的Query,拆分成了多个Stage,每个Stage拆分出多个可以并行的Task。
Worker,负责执行Coordinator发给它的Task,有部分Task负责到外部存储系统拉取数据,这部分Task会先执行,之后再执行那些负责计算的Task。
Presto的设计理念来源于一个叫Volcano的并行数据库,该数据库提出了一个并行执行SQL的模型,即火山模型(其实还有两个常见的模型:物化模型 和 向量化/批处理模型)。
Volcano模型核心思想就是 Operator Model
和 Iterator Model
。
Operator Model:即通过各种Operator组成一棵树,树的根节点负责结果输出,树的叶子节点是各种TableScan。这棵树被称作Plan(执行计划),数据库里又被细分为逻辑执行计划和物理执行计划。这棵树是由SQL经过词法、语法分析及语义分析后,生成一个AST(Abstract Syntax Tree),一般经过Visitor模式遍历后生成。原始数据通过叶子节点TableScan读取数据,然后经过各个Operator的计算,包括(TableScan、Project、Filter、Exchange、Agg、Join、TaskOutput等)产出结果。
Iterator Model:顾名思义就是一个递归迭代过程,Plan树的各节点有三个状态,Open、GetNext及Close。从根节点Open开始递归调用 GetNext 获取数据,即父节点递归调用子节点接口直到没有结果为止,然后Close。
简言之,Volcano模型将关系代数中每一种操作抽象为一个Operator,将整个SQL构建成一个Operator树,从根节点到叶子结点自上而下地递归调用next()函数。
下面看一个具体的SQL栗子:
SELECT Id, Name, Age, (Age - 30) * 50 AS Bonus
FROM People
WHERE Age > 30
对应火山模型如下:
简单解释一下上图的含义:
可以看到,Volcano模型是十分简单的,而且它对每个算子的接口都进行了一致性的封装。也就是说,从父节点来看,子节点具体是什么类型的算子并不重要,只需要能源源不断地从子节点的算子中Fetch到数据行就可以。这样的特性也给优化器从外部调整执行树而不改变计算结果创造了方便。
目前大部分单机或分布式的数据库以及SQL执行引擎,都是这种Operator-Based
执行模型。Volcano执行模型诞生于20多年前,那时候数据读取IO瓶颈更严重,所以Vocalno模型虽然CPU不友好,但也不要紧。现如今IO的性能有了较大提升,相比之下CPU的计算显得更加吃力一些,因此Volcano模型的问题就比较突出了。当然为了运行效率,现代的数据库或查询引擎是做过很多优化的,我们最常听到的几种优化是:
批量化(Batch Processing):next()方法从每次只处理一条记录,改为处理多条,平摊了函数调用成本。
向量化执行(Vectorized Execution):包含CPU的SIMD指令、loop unrolling(循环展开),还有列式存储和计算。这些底层的软件编码优化,大大提高了处理一批数据的性能。
代码生成(Code Generation):Volcano的operator链在执行时,需要层层调用next()带来深层次的调用栈,我们可以利用自动代码生成一个铺平的方法,去掉函数的调用,把层层调用的operator计算逻辑都安置在一起,经过数据实测,CPU能节省70%~90%的时间分片,去做更多真正有意义的计算逻辑。
Presto查询执行模型如下图所示:
MPP的理念就是能尽量细粒度的将SQL并行执行,以一个SQL两个表JOIN后Agg为例,那么每个表都可以单独并行执行去Scan数据(互不影响),然后进行Join和Agg。所以执行计划(Plan)将执行PlanFragment,即将一个树分块变为各个子树,每个子树可以并行的在多台机器上执行,这个Fragment被称为Stage。
根据Stage的用途,分为四种:
总结下来,对于Query的每个Stage,就干了三件事:
Split包含的信息可以让Presto Task知道去哪里拉取上游的数据,它是数据分区的基本单位(如果你愿意把它叫做partition也可以,就像Kafka那样)。上游数据源Connector的Split是ConnectorSplit,上游Stage的Split是RemoteSplit。RemoteSplit其实是ConnectorSplit接口的一个实现类,Presto在类似的逻辑上实现了高度的统一抽象。
Presto代码中定义的Operator与Volcano执行模型的Operator含义是相同的,如下是Operator接口的定义:
// File: Operator.java
// package: io.prestosql.operator
// Interface: Operator
// Description: Operator接口定义了很多方法,这里为了说明方便,我们只摘出来几个最重要的。
public interface Operator extends AutoCloseable {
void addInput(Page page);
Page getOutput();
@Override
default void close() throws Exception {}
...
}
Operator有许多实现类,代表了不同的Operator计算逻辑,常见的Operator举例如下:
可能你已经注意到,Operator的接口定义中,无论是addInput()的入参还是getOutput()的返回值,它们都是Page,也就是Operator的操作对象是Page。而在Volcano执行模型中,每次调用Operator::next()的操作对象是Row(数据中的一条记录)。这就是Presto做的“More than Volcano”的地方,它不仅一次处理多条记录,还做了更性能优化的优化,即用列式存储的存储和计算方式(Columnar Storage或者Vectorized Execution)。
在Presto内部数据计算时,它用了自己的方式来存储与计算列式格式的数据,分为三个层次——Slice、Block、Page。Slice表示一个Single Value,Block表示一列,类似于Parquet中的Column,Page表示多行记录,但是它们是以多列多个Block的方式组织在一起的,类似Parquet中的Row Group,这种组织方式,不同行相同列的字段都顺序相邻,更容易按列读取与计算。
TaskExecutor是Presto Task的执行池,它以单例的方式在Presto Worker上启动,内部维护了一个Java线程池ExecutorSerivce用于提交运行任务,所以无论某个Presto Worker上有多个Task在运行,TaskExecutor只有一个,这个设计,对于多租户来说不是很友好。
Driver是Task的Operator链执行的驱动器,由它来推动数据穿梭于Operator。在具体的实现上,Volcano执行模型是自顶向下的pull数据,Presto Driver与Volcano执行模型不一样,它是自底向上的push数据。
对于划分了多个Stage的Query,数据依赖关系上相邻的两个Stage必然存在着数据交换,而同一个Stage的所有Task之间是没有数据交换的。Presto的数据交换采用的是Pull Based方式,如下图所示:
其实用“数据交换(Exchange)”这个词语并不准确,Stage之间并没有交换数据,而只是后面执行的Stage从前面执行的Stage拉取数据。
Presto在实现这一套机制的时候,做了比较好的抽象,Stage间的数据交换连同包含TableScanOperator的Stage从Connector取数据这部分,统一实现为拉取数据源(Connector)的ConnectorSplit拉取逻辑,只不过Stage从Connector拉取的是某个Connector实现的ConnectorSplit(如HiveConnector的HiveSplit),Stage之间拉取的是RemoteSplit(RemoteSplit实现了ConnectorSplit的接口)。
Stage里有很多Operator,这些Operator可能并行度是不一样的,比如Scan数据并行就很大,但是最后聚合数据,并行一般为1。所以PlanFragment又会被切分为若干Pipeline,每个Pipeline由一组Operator组成,这些 Operator 被设置同样的并行度。Pipeline之间会通过LocalExchangeOperator来传递数据。
SQL是一种声明式的编程语言,能够很清晰的表达用户想要什么,正是因为它学习难度比较低、易用性比较高,已经成为数据库和大数据计算领域最常用的业务计算逻辑编写的方式。然而,在生产环境中,有很多系统没有对外暴露SQL执行接口,对内也没有SQL执行能力,如Elasticsearch和HBase;而有些系统虽然有SQL接口,但是没有海量数据计算能力,如MySQL;还有另外一些系统用MapReduce完成SQL计算,时延太长不满足业务需求,如Hive。Presto这个SQL执行框架主要解决的问题是为这些数据源提供了一种通用统一的SQL执行能力,在海量数据规模下,还具备了高性能、分布式的计算能力。
一个SQL进入到Presto系统中,分别完成了以下几个关键步骤,最终将结果输出:
我们结合Presto的结构设计再来看看SQL查询过程,注意这个过程中Coordinator和Worker的分工:
接下来我们通过一个简单的SQL来看看这个过程:
select col1 from test.db.tab
SqlParser的实现,使用了Antlr4
作为解析工具。它首先定义了Antlr4的SQL语法文件,见:SqlBase.g4,之后用它的代码生成工具自动生成了超过13K行SQL解析逻辑,SQL解析完成后会生成AST。
抽象语法树(AST)是用一种树形结构来表示SQL想要表述的语义,将一段SQL字符串结构化,以支持SQL执行引擎根据AST生成SQL执行计划。在Presto中,Node表示树的节点的抽象。根据语义不同,SQL AST中有多种不同类型的节点,它们继承自Node节点,如下所示:
我们来看看上面那个简单SQL的AST:
这棵树的根节点是一个Query类的对象,它有一个成员body,指向一个QuerySpecification对象。QuerySpecification有一个select成员,指向一个Select类的对象,Select类中有selectItems成员,对应语法定义中querySpecification里面可能出现的多个selectItem,以此类推。
一旦生成语法树,LogicalPlanner 将会据此生成逻辑执行计划。这个阶段分为两步执行:
LogicalPlanner对语法树进行从根节点开始的递归遍历,生成一个未经优化的逻辑计划,如下图所示:
上图中的表遍历节点TableScanNode是在遍历到语法树的Table节点时生成的,图中4个映射节点ProjectNode,靠近TableScanNode的两个是在遍历QuerySpecification节点时生成的,另外两个是在遍历Query节点时生成的。最后的OutputNode,是遍历完语法树之后,才生成的输出节点。
我们可以看到,未经优化的逻辑计划其实包含非常多冗余的ProjectNode,这时候,LogicalPlanner会进行第二步:对计划进行一系列的优化。
在LogicalPlanner类中,有一个planOptimizers列表,其中的每一个元素是一个优化器接口PlanOptimizer的实现。每个PlanOptimizer的实现都带有一个重写器Rewriter,用于对逻辑计划进行递归遍历,写出优化后的逻辑计划。LogicalPlanner循环地对上一步生成的逻辑计划应用planOptimizers列表的每一个优化器,最终得到下图所示的优化过的执行计划:
对于我们的示例查询在第一步生成的逻辑执行计划,真正生效的优化器只有两个:一个是IterativeOptimizer,另外一个是AddExchanges。IterativeOptimizer将逻辑计划中冗余的ProjectNode全部去掉了,这是IterativeOptimizer对RemoveRedundantIdentityProjections规则的应用。而AddExchanges优化器在OutputNode和TableScanNode之间,加上了一个ExchangeNode,用于在不同节点之间交换查询数据。
去掉冗余的ProjectNode的好处是显而易见的:去掉多余的ProjectNode可以提高查询的执行效率。而之所以需要增加ExchangeNode,是因为我们的最终输出OutputNode需要在Coordinator上执行,而TableScanNode一般需要调度到Worker上执行,所以两者之间需要加上一个ExchangeNode以交换数据。
Presto接下来会通过PlanFragmenter对优化后的逻辑执行计划进行拆分,分为若干个子计划SubPlan,这也是对优化后的逻辑执行计划进行自顶向下的再一次递归遍历完成的。
上图是拆分好的子计划,可以看到逻辑执行计划被拆分为两个子计划。对于逻辑计划,Presto的拆分的逻辑是:将ExchangeNode转换为RemoteSourceNode,然后为ExchangeNode的sources中的每个元素,新建一个子计划SubPlan。这么拆分可以使DistributedExecutionPlanner将ExchangeNode的sources对应的每一个SubPlan转换为一个Stage,然后分发到不同的Worker上执行。
接下来,DistributedExecutionPlanner将上面拆分好的子计划,转换为分布式执行计划。DistributedExecutionPlanner的转换逻辑是:将每一个SubPlan转换为一个StageExecutionPlan。
从上图可以看到,分布式执行计划与拆分后的子计划是非常相似的。区别在于,对于那些fragment里面有TableScanNode的StageExecutionPlan,它会额外维护一个splitSources。SplitSource定义了一个表如何被划分成若干个Batch,以便于后续并行处理。
接下来进入的是执行计划的实际调度阶段,如下图所示:
SqlQueryScheduler在创建的时候,会为分布式执行计划中的每一个 StateExecutionPlan创建一个对应的SqlStageExecution对象和对应的 StageScheduler(为了保持简洁,上图仅画了一个SqlStageExecution和一个StageScheduler,但实际上,对应我们的示例查询,SqlStageExecution和StageScheduler应该各有两个,分别对应分布式执行计划中的两个StateExecutionPlan。并且SqlQueryScheduler创建的是StageScheduler子类的实例,分别是FixedCountScheduler和SourcePartitionedScheduler)。
此后,SqlQueryScheduler通过AllAtOnceExecutionPolicy(还有一种Policy是PhasedExecutionPolicy),创建AllAtOnceExecutionSchedule。AllAtOnceExecutionSchedule在SqlQueryScheduler调用其getStagesToSchedule时,会一次性返回全部未调度的SqlStageExecution的集合。SqlQueryScheduler接下来会遍历这个集合,并调用集合中每个SqlStageExecution对应的StageScheduler的schedule方法,这个方法最终会调用到SqlStageExecution的scheduleTask。 scheduleTask将会创建HttpRemoteTask,并通过HttpRemoteTask,以Restful的方式,将Stage发送到worker节点。此后的执行,将会在worker上进行。
加上union all的执行计划:
去掉union all的执行计划:
对比上面两个执行计划,可以发现去掉union all之后内部的inner join是可以正常查询完,但是加上union all,同样的inner join一直跑不完。
初步推测可能是窗口函数和union all产生了不和谐,我们来看看跟窗口函数相关的Stage,如下图所示:
查询执行很久后WindowOperator没有任何输出,推测可能是阻塞了。
再来看看去掉union all的WindowOperator,如下图所示:
从上图可以看到有union all的数据量级和没有union all数据量级相差不大(这也印证了一点:数据量并没有被叉乘放大),但是在没有union all的情况下窗口函数可以正常输出。
在有union all的情况下,我们来看一个正常结束的Stage:
正常结束的Stage中所有的task对应的OutputBuffer都为0,说明OutputBuffer都已经被下游拉走了。
在有union all的情况下,不能正常结束的Stage如下图所示:
上图Stage中的task都是RUNNING状态,并且所有的task都处理完了各自的split,但是OutputBuffer并没有清空,说明下游没有拉取这些OutputBuffer,这直接导致这个Stage不能正常结束,而是一直在等待OutputBuffer被下游拉取。
为什么下游Stage没有拉取上游OutputBuffer呢?我们看下上游Stage的情况:
从上图可以看到上游Stage只有一个task在RUNNING,其他的task都已经正常结束了。并且这个RUNNING的task有32个split是blocked,这个时候应该不是计算资源不够用导致的block。注意,这个task的OutputBuffer的大小有38.6M。
目前所掌握的信息还不足以让我们得出什么有用的结论,好在Presto提供了task状态的API,方便我们观察,如下图所示:
从task状态信息中,我们很快发现 NO_MORE_BUFFERS
。
OutputBuffer相关参数看下图:
对比上面出现的38.6M,很明显OutputBuffer已经超过阈值了,不能再往OutputBuffer里写了,这才是造成阻塞的直接原因。
另外,有一点需要特别指出来的是:不加union all的时候查询也不一定能够正常出结果。比如下图的task状态信息:
直接原因我们找到了,那么有union all和没有union all到底有什么区别呢?为什么加了union all会造成OutputBuffer写满了呢?请看下图:
在外层有窗口函数的情况下,加了union all之后出现了数据倾斜,而不加union all则没有数据倾斜。数据量最大的task使用的CPU Time只有49.5s,这也说明这个task卡住了,虽然它的状态是RUNNING。
为了印证数据倾斜这个结论,这里我专门针对窗口函数的逻辑加了一个ORDEY BY语句,如下图所示:
来看看这个SQL的执行计划:
从上图可以看到union all上部分的inner join正常结束了。
写到这里可能有同学还是疑惑,为啥Presto没有对union all做优化呢?如果你认真看过我之前写的一篇博客(可能颠覆我们认知的【ORDER BY】小知识点)可能就会打消一些疑虑了。核心结论见下图,简单来说就是,单机版的MySQL都会选择不优化union all。
改写SQL,先分开做window,然后再union all:
select
project
, media
, spot
, bu
, sum(1) as cnt
, sum(case when time_diff <= 30 then 1 else 0 end) as cnt_1
from (
select project, media, spot, t_time, ext_user_id, bu
, if(pre_time_diff <= post_time_diff, pre_time_diff, post_time_diff) as time_diff
from (
select project, media, spot, t_time, ext_user_id, bu
, date_diff('second', lag_time, t_time) as pre_time_diff
, date_diff('second', t_time, lead_time) as post_time_diff
from (
select bu, project, media, spot, t_time, ext_user_id
, lag(t_time, 1, cast('1970-01-01 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lag_time
, lead(t_time, 1, cast('2099-12-31 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lead_time
from (
select split_part(b.name, '#', 2) as project
, split_part(b.name, '#', 3) as media
, split_part(b.name, '#', 4) as spot
, 'ftx' as bu
, cast(t_time as timestamp) as t_time
, ext_user_id
from (
select ext_order_id
, ext_user_id
, replace("timestamp", '_', ' ') as t_time
from dwd.d_ad_ftx_impression_data
where ext_dsp_id = 23
and thisdate = '2021-10-20'
and cast("hour" as int) between 0 and 21
) t
join
(
select id, concat('阿里#', split_part(name, '阿里#', 2)) as name
from u6rds.ftx."order"
where name like '%阿里#%'
) b on t.ext_order_id = b.id
) t
union all
select bu, project, media, spot, t_time, ext_user_id
, lag(t_time, 1, cast('1970-01-01 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lag_time
, lead(t_time, 1, cast('2099-12-31 00:00:00' as timestamp))
over (partition by project,media,spot,ext_user_id order by t_time) as lead_time
from (
select split_part(b.name, '#', 2) as project
, split_part(b.name, '#', 3) as media
, split_part(b.name, '#', 4) as spot
, 'dsp' as bu
, cast(t_time as timestamp) as t_time
, ext_user_id
from (
select ext_adgroup_id
, ext_user_id
, replace("timestamp", '_', ' ') as t_time
from dwd.d_ad_impression a
where ext_advertiser_id = 2003127
and thisdate = '2021-10-20'
and cast("hour" as int) between 0 and 20
) a
join
(
select id, concat('阿里#', split_part(name, '阿里#', 2)) as name
from u6rds.ad_fancy.ad_group
where advertiser_id = 2003127
and name like '%阿里#%'
) b on a.ext_adgroup_id = b.id
) tt
) t1
) t2
) t3
group by 1, 2, 3, 4
先分开做window再做union all的执行计划:
有窗口函数时内部如果使用union all会使得查询阻塞:
我们来看看这个唯一在RUNNING的split:
Stage 1的执行计划如下:
登录到唯一RUNNING的split的机器上,使用arthas命令:
看看具体的线程堆栈:
可以发现Presto有动态生成代码,反编译看看具体代码:
我们还可以看看调用动态生成的方法的参数信息:
如果需要分析性能问题,arthas里集成了async-profiler:
来看看生成的火焰图:
先说这么多,你学废了吗?当然arthas里还有很多其他的命令,有兴趣的话自己可以下去研究研究。