Presto 优化拾遗

Updated on with 0 views and 0 comments

1 Grouped Execution

为了方便大家理解 Grouped Execution 的原理,我们先来介绍两个概念:分桶Hash Join

1.1 分桶

  • 其实 Hive 表中桶的概念就是 MapReduce 的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。

  • 而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件,通过文件夹可以查询里面存放的文件,但文件夹本身和数据的内容毫无关系。

  • 桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。

3feceb9471624cf7bb1d604223d93f9d.png

1.2 Hash Join

主要分为两个阶段:建立阶段(build phase)和探测阶段(probe phase)

  • Bulid Phase:选择一个表(一般情况下是较小的那个表,以减少建立哈希表的时间和空间),对其中每个元组上的连接属性(join attribute)采用哈希函数得到哈希值,从而建立一个哈希表。

  • Probe Phase:对另一个表,扫描它的每一行并计算连接属性的哈希值,与bulid phase建立的哈希表对比,若有落在同一个 bucket 的,如果满足连接谓词(predicate)则连接成新的表。

e9e0e73469e3401bb8c604f6d75d99ff.png

1.3 Grouped Execution 原理

所谓的 Grouped Execution 针对的是那种数据摆放上进行了 bucketed 存储的数据查询的一种优化手段。

  • 比如两个表 JOIN,在没有 Grouped Execution 的时候做 hash join,一般会把小表作为 build 表发到每个 worker 上,然后针对大表的数据做 probe,这样内存占用由整个小表数据量的大小决定
    c5311708347d48e6b29eab36a846ad82.png

  • 而如果这两个表是 bucketed 表,而且 bucket 的字段就是这个 JOIN 的字段,那么不同 bucket 之间的数据天然就 JOIN 不上,那么两个表之间的 JOIN 可以转变为相对应的 bucket 之间的 JOIN,最后再做个汇总即可,这样每个 worker 上的内存量会大大降低。
    eae48645e2214900b35ec01bb62bde87.png

2 Recoverable Grouped Execution

单个 bucket 的 JOIN 如果失败是可以单独重试的,这也就引出了 Facebook 做的第二个优化: Lifespan 重试,通过 Lifespan 级别的重试可以提高大查询的成功率。

9c104ff8de8145659bf21bf5f9cdb3b9.png

3 Exchange Materialization

上面两种优化手段的应用场景是很受限的,必须要 bucketed table,而且要用 bucketed 字段 JOIN 才能有用。确实,如果故事只到这里就结束就很没意思了,因此 Facebook 提出了第三个优化手段: Exchange Materialization。

  • 我们知道 Presto 的 Exchange 本来是流式的,上游把数据通过 HTTP 发给下游,下游如果处理不过来会反压上游,中间是没有数据落盘的,如下图所示:
    392d8f10aae24c3ea8e503637f97bcd9.png

  • Exchange Materialization 则是要把数据落到盘上,并且按照 JOIN 的 key 组织成 bucketed table,那么从这个 Exchange 节点开始往后就可以应用上面的优化了,如下图所示:
    71bc93a9b1fc478187468f782bf0139b.png

  • Exchange Materialization 启用参数:

SET SESSION exchange_materialization_strategy='ALL';
SET SESSION partitioning_provider_catalog='hive';
SET SESSION hash_partition_count = 4096;

4 Spill to Disk

默认情况下,如果查询执行所请求的内存超过会话属性 query_max_memoryquery_max_memory_per_node,Presto 就会终止查询。这种机制确保了查询分配内存的公平性,防止了内存分配导致的死锁。当集群中有很多小查询时,它是有效的,但会导致杀死不符合限制的大型查询。

4.1 原理简介

为了克服这种低效率,Presto 引入了可撤销内存的概念。Query 可以请求不计入限制的内存,但内存管理器可以在任何时候撤销这些内存。当内存被撤销时,查询运行程序将中间数据从内存溢出到磁盘,然后继续处理它。

在实践中,当集群空闲且所有内存都可用时,内存密集型查询可能会使用集群中的所有内存。另一方面,当集群没有太多空闲内存时,同一个查询可能被迫使用磁盘作为中间数据的存储。与完全在内存中运行的查询相比,强制溢出到磁盘的查询的执行时间可能要长几个数量级。

请注意,启用 spill-to-disk 并不能保证能够执行所有内存密集型的查询。查询运行程序仍然有可能无法将中间数据划分为足够小的块,使每个块都能装入内存,从而导致从磁盘加载数据时出现内存不足错误。

4.2 支持的操作

  • Joins

    • 当任务并发大于 1 时,将对 build 表进行分区,分区的数量等于 task.concurrency 配置值
    • 在对 build 表进行分区时,spill-to-disk 机制可以减少连接操作所需的峰值内存使用。当查询接近内存限制时,build 表分区的一个子集将溢出到磁盘,join 另一侧表中的记录也会落到相同的分区。
    • 然后,就可以 one-by-one 读取溢出的分区数据以完成连接操作。
    • 需要注意的是 join_spill_enabled 默认值是 false,可通过 set session join_spill_enabled=true; 启用
  • Aggregations

    • 如果正在聚合的 group 中数量很大,则可能需要大量的内存。
    • 当启用 spill-to-disk 时,如果没有足够的内存,则将中间累积聚合结果写入磁盘。
    • 当内存可用时,它们被加载回来进行合并。
    • 开启 spill_enabled 参数后无需单独设置其他参数

5 Dynamic Filtering

早在2005年,Oracle 数据库就支持比较丰富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才开始支持这个功能。为了让大家快速理解这个功能,我们先来看一个 Spark 的栗子:

5.1 Spark 动态分区裁减

SELECT * FROM fact_iteblog
JOIN dim_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol) 
WHERE dim_iteblog.othercol > 10

通过 Spark 的动态分区裁减,可以将执行计划修改成如下形式:

2acb4864b4c64045b862697f0c2d3b5c.png

可见,在扫描 fact_iteblog 表时,如果能自动加上了类似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的过滤条件,那么当 fact_iteblog 表有大量的分区,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查询语句只返回少量的分区,这样可以大大提升查询性能。

5.2 Presto 动态过滤

Presto 也支持上面的功能,这个功能称为动态过滤(Dynamic Filtering)。事实上,Presto 的动态过滤比 Spark 的动态分区裁减要丰富。因为 Spark 动态分区裁减只能用于分区表,而 Presto 的动态过滤支持分区表和非分区表,Presto 的动态分区包含 Partition Pruning(分区表) 以及 Row filtering(非分区表)。直到 Presto 0.241,这个功能正式加入到 master 分支。

注意事项:

  • 目前 PrestoDB 的实现中只有 Hive 数据源支持动态过滤
  • 而且非分区表动态过滤只支持 ORC 数据格式
  • 另外,Presto 的 dynamic filtering 和 grouped_execution 不能同时使用
  • 并且需要设置以下参数
set session enable_dynamic_filtering=true;
set session hive.pushdown_filter_enabled=true;

6 Alluxio Cache Service

提高Presto查询延迟的一个常见优化是缓存工作集,以避免来自远程数据源或通过慢速网络的不必要的I/O。Presto 利用 Alluxio 作为缓存层,主要有两种使用方式:Alluxio File System 和 Alluxio Structured Data Service。
注意:目前我们公司 Presto 集群尚未加入 Alluxio 缓存服务,这个优化手段可以作为一个知识储备。

6.1 Alluxio File System

Alluxio File System 将 Presto Hive Connector 作为一个独立的分布式缓存文件系统服务于 HDFS 或 AWS S3、GCP、Azure blob store 等对象存储之上。用户可以通过文件系统接口明确地了解缓存的使用情况并控制缓存。例如,可以预加载 Alluxio 目录中的所有文件,为 Presto 查询预热缓存,并设置缓存数据的 TTL(生存时间),以回收缓存容量。举个栗子:

2919a26a744a40078e75c3fc2477b8e4.png

3a9ade1153cf47ef962632281a942d4a.png

注意对比 Hive 表的 location,前缀换成了 alluxio://

6.2 Alluxio Structured Data Service

Alluxio Structured Data Service 通过基于 Alluxio File System 的目录和缓存文件系统与 Presto 进行交互。这种方式有额外的优势,在不用修改 Hive 表的 location 的前提下,就能无缝访问现有的 Hive 表,并通过合并小文件转换输入文件的格式进一步性能优化。具体做法如下:

a9edc16f2eee49ff89fb47496d7f3843.png


标题:Presto 优化拾遗
作者:yanghao
地址:http://solo.fancydigital.com.cn/articles/2021/10/12/1634022224465.html