为了方便大家理解 Grouped Execution 的原理,我们先来介绍两个概念:分桶 和 Hash Join。
其实 Hive 表中桶的概念就是 MapReduce 的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。
而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件,通过文件夹可以查询里面存放的文件,但文件夹本身和数据的内容毫无关系。
桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。
主要分为两个阶段:建立阶段(build phase)和探测阶段(probe phase)
Bulid Phase:选择一个表(一般情况下是较小的那个表,以减少建立哈希表的时间和空间),对其中每个元组上的连接属性(join attribute)采用哈希函数得到哈希值,从而建立一个哈希表。
Probe Phase:对另一个表,扫描它的每一行并计算连接属性的哈希值,与bulid phase建立的哈希表对比,若有落在同一个 bucket 的,如果满足连接谓词(predicate)则连接成新的表。
所谓的 Grouped Execution 针对的是那种数据摆放上进行了 bucketed 存储的数据查询的一种优化手段。
比如两个表 JOIN,在没有 Grouped Execution 的时候做 hash join,一般会把小表作为 build 表发到每个 worker 上,然后针对大表的数据做 probe,这样内存占用由整个小表数据量的大小决定
而如果这两个表是 bucketed 表,而且 bucket 的字段就是这个 JOIN 的字段,那么不同 bucket 之间的数据天然就 JOIN 不上,那么两个表之间的 JOIN 可以转变为相对应的 bucket 之间的 JOIN,最后再做个汇总即可,这样每个 worker 上的内存量会大大降低。
单个 bucket 的 JOIN 如果失败是可以单独重试的,这也就引出了 Facebook 做的第二个优化: Lifespan 重试,通过 Lifespan 级别的重试可以提高大查询的成功率。
上面两种优化手段的应用场景是很受限的,必须要 bucketed table,而且要用 bucketed 字段 JOIN 才能有用。确实,如果故事只到这里就结束就很没意思了,因此 Facebook 提出了第三个优化手段: Exchange Materialization。
我们知道 Presto 的 Exchange 本来是流式的,上游把数据通过 HTTP 发给下游,下游如果处理不过来会反压上游,中间是没有数据落盘的,如下图所示:
Exchange Materialization 则是要把数据落到盘上,并且按照 JOIN 的 key 组织成 bucketed table,那么从这个 Exchange 节点开始往后就可以应用上面的优化了,如下图所示:
Exchange Materialization 启用参数:
SET SESSION exchange_materialization_strategy='ALL';
SET SESSION partitioning_provider_catalog='hive';
SET SESSION hash_partition_count = 4096;
默认情况下,如果查询执行所请求的内存超过会话属性 query_max_memory
或 query_max_memory_per_node
,Presto 就会终止查询。这种机制确保了查询分配内存的公平性,防止了内存分配导致的死锁。当集群中有很多小查询时,它是有效的,但会导致杀死不符合限制的大型查询。
为了克服这种低效率,Presto 引入了可撤销内存的概念。Query 可以请求不计入限制的内存,但内存管理器可以在任何时候撤销这些内存。当内存被撤销时,查询运行程序将中间数据从内存溢出到磁盘,然后继续处理它。
在实践中,当集群空闲且所有内存都可用时,内存密集型查询可能会使用集群中的所有内存。另一方面,当集群没有太多空闲内存时,同一个查询可能被迫使用磁盘作为中间数据的存储。与完全在内存中运行的查询相比,强制溢出到磁盘的查询的执行时间可能要长几个数量级。
请注意,启用 spill-to-disk 并不能保证能够执行所有内存密集型的查询。查询运行程序仍然有可能无法将中间数据划分为足够小的块,使每个块都能装入内存,从而导致从磁盘加载数据时出现内存不足错误。
Joins
task.concurrency
配置值join_spill_enabled
默认值是 false,可通过 set session join_spill_enabled=true;
启用Aggregations
spill_enabled
参数后无需单独设置其他参数早在2005年,Oracle 数据库就支持比较丰富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才开始支持这个功能。为了让大家快速理解这个功能,我们先来看一个 Spark 的栗子:
SELECT * FROM fact_iteblog
JOIN dim_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10
通过 Spark 的动态分区裁减,可以将执行计划修改成如下形式:
可见,在扫描 fact_iteblog 表时,如果能自动加上了类似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的过滤条件,那么当 fact_iteblog 表有大量的分区,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查询语句只返回少量的分区,这样可以大大提升查询性能。
Presto 也支持上面的功能,这个功能称为动态过滤(Dynamic Filtering)。事实上,Presto 的动态过滤比 Spark 的动态分区裁减要丰富。因为 Spark 动态分区裁减只能用于分区表,而 Presto 的动态过滤支持分区表和非分区表,Presto 的动态分区包含 Partition Pruning(分区表) 以及 Row filtering(非分区表)。直到 Presto 0.241,这个功能正式加入到 master 分支。
注意事项:
Hive
数据源支持动态过滤ORC
数据格式set session enable_dynamic_filtering=true;
set session hive.pushdown_filter_enabled=true;
提高Presto查询延迟的一个常见优化是缓存工作集,以避免来自远程数据源或通过慢速网络的不必要的I/O。Presto 利用 Alluxio 作为缓存层,主要有两种使用方式:Alluxio File System 和 Alluxio Structured Data Service。
注意:目前我们公司 Presto 集群尚未加入 Alluxio 缓存服务,这个优化手段可以作为一个知识储备。
Alluxio File System 将 Presto Hive Connector 作为一个独立的分布式缓存文件系统服务于 HDFS 或 AWS S3、GCP、Azure blob store 等对象存储之上。用户可以通过文件系统接口明确地了解缓存的使用情况并控制缓存。例如,可以预加载 Alluxio 目录中的所有文件,为 Presto 查询预热缓存,并设置缓存数据的 TTL(生存时间),以回收缓存容量。举个栗子:
注意对比 Hive 表的 location,前缀换成了 alluxio://
Alluxio Structured Data Service 通过基于 Alluxio File System 的目录和缓存文件系统与 Presto 进行交互。这种方式有额外的优势,在不用修改 Hive 表的 location 的前提下,就能无缝访问现有的 Hive 表,并通过合并小文件或转换输入文件的格式进一步性能优化。具体做法如下: