Flink 入门篇

Updated on with 0 views and 0 comments

1 Flink 是什么?

Flink 可以处理有界的数据集、也可以处理无界的数据集,它可以流式的处理数据、也可以批量的处理数据。

9cc82d8ae2a14baa896b33cee0a66cd2.png

f4479a1acea141cba45dd228044b9c55.png

75c1ec65e78d4a24adadff162dc15374.png

2 Flink Architecture

2.1 Flink 整体结构

2f87393777ce466c843f023194136042.png

  • 部署:Flink 支持本地运行、能在独立集群或者在被 YARN 或 Mesos 管理的集群上运行, 也能部署在云上

  • 运行:Flink 的核心是分布式流式数据引擎,意味着数据以一次一个事件的形式被处理

  • API:DataStream、DataSet、Table、SQL API

  • 扩展库:Flink 还包括用于复杂事件处理,机器学习,图形处理和 Apache Storm 兼容性的专用代码库

2.2 Flink 数据流编程模型抽象级别

Flink提供了不同的抽象级别以开发流式或批处理应用。

783039f1a9fc4818ab796b21ae0bdd65.png

  • 最底层提供了有状态流。它将通过 过程函数(Process Function)嵌入到 DataStream API 中。它允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。

  • DataStream / DataSet API 是 Flink 提供的核心 API ,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。

  • Table API 是以表为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

  • Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。

2.3 Flink 部署模式

Flink可以以三种方式执行应用程序:

  • Application Mode
  • Per-Job Mode
  • Session Mode

b4a73f0d6a274d2485a5efd824430f88.png

以上模式的不同之处在于:

  • 集群生命周期和资源隔离保证

  • 应用程序的main()方法是在客户端还是集群上执行的

2.4 Flink 作业提交流程

Flink作业提交流程见下图:

2a84ae4811fe4de680c553464bea43c6.png

  • Program Code:我们编写的 Flink 应用程序代码

  • Job Client:Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点。 Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户

  • Job Manager:主进程(也称为作业管理器)协调和管理程序的执行。 它的主要职责包括安排任务,管理checkpoint ,故障恢复等。机器集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容灾,高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby;Job Manager 包含 Actor system、Scheduler、Check pointing 三个重要的组件

  • Task Manager:从 Job Manager 处接收需要部署的 Task。Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点。 任务执行的并行性由每个 Task Manager 上可用的任务槽决定。 每个任务代表分配给任务槽的一组资源。 例如,如果 Task Manager 有四个插槽,那么它将为每个插槽分配 25% 的内存。 可以在任务槽中运行一个或多个线程。 同一插槽中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job,这种共享可以有更好的资源利用率
    7c4ad3cd762c4e04a589ea1e736e8d94.png

2.5 Flink 程序与数据流结构

Flink应用程序结构如下图所示:

38233b99307e4677b5e32374d8f82356.png

  • Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、Amazon Kinesis Streams、RabbitMQ、Twitter Streaming API、Apache NiFi 等,当然你也可以定义自己的 source

  • Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,操作很多,可以将数据转换计算成你想要的数据
    724583b62cf643d4abea2d3a3eae2f3e.png

  • Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink

3 State and Fault Tolerance

3.1 状态(State)

在 Flink 中,状态(State)始终与特定算子相关联。总的来说,有两种类型的状态:

  • 算子状态(operator state):算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。

  • 键控状态(keyed state):键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink 为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个 key 对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的 key。因此,具有相同 key 的所有数据都会访问相同的状态。Keyed State 很类似于一个分布式的 key-value map 数据结构,只能用于 KeyedStream(keyBy 算子处理之后)。

3.2 Checkpoint

5ba699b46f7749009d0ff40493c95511.png

Flink Checkpoint 是基于 Chandy-Lamport 算法的分布式快照。

Chandy-Lamport 算法将分布式系统抽象成 DAG(暂时不考虑有闭环的图),节点表示进程,边表示两个进程间通信的管道。分布式快照的目的是记录下整个系统的状态,即可以分为节点的状态(进程的状态)和边的状态(信道的状态,即传输中的数据)。因为系统状态是由输入的消息序列驱动变化的,我们可以将输入的消息序列分为多个较短的子序列,图的每个节点或边先后处理完某个子序列后,都会进入同一个稳定的全局状态。利用这个特性,系统的进程和信道在子序列的边界点分别进行本地快照,即使各部分的快照时间点不同,最终也可以组合成一个有意义的全局快照

e3f995a655bc49268668b1d517d44220.png

从实现上看,Flink 通过在 DAG 数据源定时向数据流注入名为 Barrier 的特殊元素,将连续的数据流切分为多个有限序列,对应多个 Checkpoint 周期。每当接收到 Barrier,算子进行本地的 Checkpoint 快照,并在完成后异步上传本地快照,同时将 Barrier 以广播方式发送至下游。当某个 Checkpoint 的所有 Barrier 到达 DAG 末端且所有算子完成快照,则标志着全局快照的成功。

f7edce3407044a738baae7d8b162437c.png

在有多个输入 Channel 的情况下,为了数据准确性,算子会等待所有流的 Barrier 都到达之后才会开始本地的快照,这种机制被称为 Barrier 对齐。在对齐的过程中,算子只会继续处理的来自未出现 Barrier Channel 的数据,而其余 Channel 的数据会被写入输入队列,直至在队列满后被阻塞。当所有 Barrier 到达后,算子进行本地快照,输出 Barrier 到下游并恢复正常处理。

a888e9eaf7fd4f07a459dd8585dfada8.png

比起其他分布式快照,该算法的优势在于辅以 Copy-On-Write 技术的情况下不需要 “Stop The World” 影响应用吞吐量,同时基本不用持久化处理中的数据,只用保存进程的状态信息,大大减小了快照的大小。

目前的 Checkpoint 算法在大多数情况下运行良好,然而当作业出现反压时,阻塞式的 Barrier 对齐反而会加剧作业的反压(参考 Flink 官方文档 Checkpointing under backpressure),甚至导致作业的不稳定:

  • 首先, Chandy-Lamport 分布式快照的结束依赖于 Marker 的流动,而反压则会限制 Marker 的流动,导致快照的完成时间变长甚至超时。无论是哪种情况,都会导致 Checkpoint 的时间点落后于实际数据流较多。这时作业的计算进度是没有被持久化的,处于一个比较脆弱的状态,如果作业出于异常被动重启或者被用户主动重启,作业会回滚丢失一定的进度。如果 Checkpoint 连续超时且没有很好的监控,回滚丢失的进度可能高达一天以上,对于实时业务这通常是不可接受的。更糟糕的是,回滚后的作业落后的 Lag 更大,通常带来更大的反压,形成一个恶性循环。

  • 其次,Barrier 对齐本身可能成为一个反压的源头,影响上游算子的效率,而这在某些情况下是不必要的。比如典型的情况是一个作业读取多个 Source,分别进行不同的聚合计算,然后将计算完的结果分别写入不同的 Sink。通常来说,这些不同的 Sink 会复用公共的算子以减少重复计算,但并不希望不同 Source 间相互影响。

为了解决这个问题,Flink 在 1.11 版本引入了 Unaligned Checkpoint 的特性。要理解 Unaligned Checkpoint 的原理,首先需要了解 Chandy-Lamport 论文中对于 Marker 处理规则的描述:

4efa28f024814e3380020b86a2f3a08a.png

其中关键是 if q has not recorded its state,也就是接收到 Marker 时算子是否已经进行过本地快照。一直以来 Flink 的 Aligned Checkpoint 通过 Barrier 对齐,将本地快照延迟至所有 Barrier 到达,因而这个条件是永真的,从而巧妙地避免了对算子输入队列的状态进行快照,但代价是比较不可控的 Checkpoint 时长和吞吐量的降低。

ddbe150e5cba455db23a6e90de786505.png

4 Event Time and Watermarks

Flink 在流程序中支持不同的 Time 概念:

0d5506ac51604a41adc6a3206d9f9e05.png

  • Processing Time:是指事件被处理时机器的系统时间

  • Event Time:是事件发生的时间,一般就是数据本身携带的时间

  • Ingestion Time:是事件进入 Flink 的时间

流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生。所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的。

一旦出现乱序,如果只根据 Event Time 决定 window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了,这个特别的机制,就是 Watermark:

  • Watermark 是一种衡量 Event Time 进展的机制

  • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现。

  • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据都已经
    到达了
    ,因此,window 的执行也是由 Watermark 触发的

  • Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时
    时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 eventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime - t,那么这个窗口被触发执行

b10e4e13af78468d859fad487279989b.png


438273ac0c68416c8985edc01bd32adf.png


c85357f944874ab5abc2a45f5ca38751.png

5 Windows

Window 是无限数据流处理的核心,Window 将一个无限的 Stream 拆分成有限大小的“buckets”桶,我们可以在这些桶上做计算操作。Window 可以分成两类:

  • CountWindow:按照指定的数据条数生成一个 Window,与时间无关

  • TimeWindow:按照时间生成 Window

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling
Window)、滑动窗口(Sliding Window)和会话窗口(Session Window),见下图:

ae0723595b954f42a8e27a85d0ad40a1.png


4527c4d3b4fc49bea859aba9aa93b59f.png


39ff2b70971545e78a9a6edde1129a39.png

还有一种特殊的窗口——全局窗口(Global Window)。这种窗口模式只有在您还指定了自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有可以处理聚合元素的自然结束点。

b1750bc21ffb42ec9b14a3bec3113d5e.png

6 Jobs and Scheduling

6.1 执行图

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图

  • StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图,用来表示程序的拓扑结构

  • JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 JobManager 的数据结构,主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,避免在不同的 slot 而进行网络IO

  • ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph,ExecutionGraph 是 JobGraph 的并行化版本,是调度层最核心的数据结构

  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构

b677318e46a9420a92ffa08a58ccaab6.png

6.2 任务链(Operator Chains)

405d225cdda24166bcb8d1fd7c3bdf07.png

  • Flink 采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过本地转发(local forward)的方式进行连接

  • 相同并行度的 one-to-one(维护分区内数据有序)操作,Flink 将这样相连的算子链接在一起形成一个 task,原来的算子成为里面的 subtask

  • 并行度相同、并且是 one-to-one 操作,两个条件缺一不可

7 细粒度资源管理

Apache Flink 努力为所有应用程序自动派生合理的默认资源需求。对于希望根据特定场景的知识调整资源消耗的用户,Flink 提供了细粒度的资源管理。

注意:此特性目前是MVP(“最小可行产品”)特性,仅对DataStream API可用。

4e4da5ec3bae4e57bb5c6b6d48ed1826.png

42a16b0f477347b2a4bb891fa8244b98.png

DEMO代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

SlotSharingGroup ssgA = SlotSharingGroup.newBuilder("a")
  .setCpuCores(1.0)
  .setTaskHeapMemoryMB(100)
  .build();

SlotSharingGroup ssgB = SlotSharingGroup.newBuilder("b")
  .setCpuCores(0.5)
  .setTaskHeapMemoryMB(100)
  .build();

someStream.filter(...).slotSharingGroup("a") // Set the slot sharing group with name “a”
.map(...).slotSharingGroup(ssgB); // Directly set the slot sharing group with name and resource.

env.registerSlotSharingGroup(ssgA); // Then register the resource of group “a”

8 弹性伸缩(Elastic Scaling)

如果想在 Flink 中重新伸缩作业,老版本中需要手动完成这一操作,方法是先停止作业,然后在关闭期间使用不同的并行性从创建的保存点重新启动。新版本中提供两种新方案:

8.1 Reactive Mode

响应模式配置一个 Job,使它总是使用集群中所有可用的资源。添加 TaskManager 会使你的工作规模扩大,而移除资源则会使工作规模缩小。Flink 将管理作业的并行性,总是将其设置为可能的最高值。

响应模式在伸缩事件上会重新启动作业,从最近完成的检查点恢复作业。这意味着没有创建保存点的开销(手动调整作业时需要保存点)。此外,缩放后重新处理的数据量取决于检查点间隔,而恢复时间取决于状态大小。

83a23bb366304ed7932b59f004b2c807.png

8.2 Adaptive Scheduler

自适应调度器可以根据可用的 slot 来调整作业的并行度。如果没有足够的 slot (可能是因为提交时没有足够的资源可用,也可能是 TaskManager 在作业执行期间中断)来使用初始配置的并行度运行作业,它会自动降低并行度。如果有新的 slot 可用,作业将再次扩展,达到配置的并行度。

自适应调度器不同于响应模式,在响应模式中,配置的并行度被忽略,并被视为无穷大,让作业总是使用尽可能多的资源。

  • 自适应调度器相对于默认调度器的一个好处是,它可以优雅地处理 TaskManager 的损失,因为在这些情况下,它会缩小规模

  • 用法:

    • jobmanager.scheduler: adaptive: Change from the default scheduler to adaptive scheduler
    • cluster.declarative-resource-management.enabled Declarative resource management must be enabled (enabled by default).
  • 限制:
    5d7815cc5e2546e38093fdfe99039045.png

  • 自适应调度器的限制也适用于响应模式

9 TaskManager 容错

7fa81d868d6f4518918ff4693083b01b.png

在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接,甚至直接挂掉。我们在日志中常见的就是 TaskManager Lost 这样的报错。对于这种情况需要进行作业重启,在重启的过程中需要重新申请资源和重启 TaskManager 进程,这种性能消耗代价是非常高昂的。

对于稳定性要求相对比较高的作业,Flink1.12 提供了一个新的 feature,能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager,这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复,而不需要等待一个重新的资源申请的过程。

通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于所需要的总共的资源数量,会多出两个 TaskManager。

任务可能是相对比较均匀的分布在上面,在能够在利用空闲 TaskManager 的同时,也能够达到一个相对比较好的负载。 一旦发生故障的时候,可以去先把任务快速的调度到现有的还存活的 TaskManager 当中,然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。

10 任务平铺分布

任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能部分 manager 放的任务很满,有的则放的比较松散。

在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数能够控制它,去进行一个相对比较均衡的调度。

205b6fb7d6c546708c5b32d1169d13ba.png

注意:

  • 第一,这个参数只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是根据你作业的需求来决定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的调度需求。在每次调度任务的时候,实际上只能看到当前注册上来的那一个 TaskManager,Flink 没办法全局的知道后面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么特性打开了之后好像并没有起到一个很好的效果,这是第一件事情。

  • 第二个需要注意的点是,这里面我们只能决定每一个 TaskManager 上有多少空闲 slot,然而并不能够决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是完全看不到 task 的。

11 大规模作业调度优化

11.1 边的时间复杂度问题

Flink 提交作业时会生成一个作业的 DAG 图,由多个顶点组成,顶点对应着我们实际的处理节点,如 Map。每个处理节点都会有并发度,此前 Flink 的实现里,当我们把作业提交到 JM 之后,JM 会对作业展开,生成一个 Execution Graph。

如下图,作业有两个节点,并发度分别为 2 和 3。在 JM 中实际维护的数据结构里,会分别维护 2 个 task 和 3 个 task,并由 6 条执行边组成,Flink 基于此数据结构来维护整个作业的拓扑信息。在这个拓扑信息的基础上,Flink 可以单独维护每个 task 的状态,当任务挂了之后以识别需要拉起的 task。

如果以这种 all-to-all 的通信,也就是每两个上下游 task 之间都有边的情况下,上游并发 * 下游并发,将出现 O(N^2) 的数据结构。这种情况下,内存的占用是非常惊人的,如果是 10k * 10k 的边,JM 的内存占用将达到 4.18G。此外,作业很多的计算复杂度都是和边的数量相关的,此时的空间复杂度为 O(N^2) 或 O(N^3),如果是 10k * 10k 的边,作业初次调度时间将达到 62s。

c7f50befa03e4e57ac096e730aeeeaf3.png

可以看出,除了初始调度之外,对于批作业来说,有可能是上游执行完之后继续执行下游,中间的调度复杂度都是 O(N^2) 或 O(N^3),这样就会导致很大的性能开销。另外,内存占用很大的话,GC 的性能也不会特别好。

40db3e11779842ce94637c4d71f4c7dd.png

11.2 Execution Graph 的对称性

针对 Flink 在大规模作业下内存和性能方面存在的一些问题,经过一些深入分析,可以看出上述例子中上下游节点之间是有一定对称性的。Flink 中 “边” 的类型可以分为两种:

870c0a51c4f14e07bd9153e377ea95cb.png

  • 一种是 Pointwise 型,上游和下游是一一对应的,或者上游一个对应下游几个,不是全部相连的,这种情况下,边的数量基本是线性的 O(N), 和算子数在同一个量级。

  • 另一种是 All-to-all 型,上游每一个 task 都要和下游的每一个 task 相连,在这种情况下可以看出,每一个上游的 task 产生的数据集都要被下游所有的 task 消费,实际上是一个对称的关系。只要记住上游的数据集会被下游的所有 task 来消费,就不用再单独存中间的边了。

所以,Flink 在 1.13 中对上游的数据集和下游的节点分别引入了 ResultPartitionGroup 和 VertexGroup 的概念。尤其是对于 All-to-all 的边,因为上下游之间是对称的,可以把所有上游产生的数据集放到一个 Group 里,把下游所有的节点也放到一个 Group 里,在实际维护时不需要存中间的边的关系,只需要知道上游的哪个数据集是被下游的哪个 Group 消费,或下游的哪个顶点是消费上游哪个 Group 的数据集。 通过这种方式,减少了内存的占用。

41df853227e04b97a52fbcf6fdd33860.png

另外,在实际做一些调度相关计算的时候,比如在批处理里,假如所有的边都是 blocking 边的情况下,每个节点都属于一个单独的 region。之前计算 region 之间的上下游关系,对上游的每个顶点,都需要遍历其下游的所有顶点,所以是一个 O(N^2) 的操作。 而引入 ConsumerGroup 之后,就会变成一个 O(N) 的线性操作。

ea4873e1010d4a678f38717726d47a73.png

经过以上数据结构的优化,在 10k * 10k 边的情况下,可以将 JM 内存占用从 4.18G 缩小到 12.08M, 初次调度时间长从 62s 缩减到 12s。这个优化其实是非常显著的,对用户来说,只要升级到 Flink 1.13 就可以获得收益,不需要做任何额外的配置。

a92a91732c3941e3917b1bb0e28f079b.png

12 Sort-Merge Shuffle

一般情况下,批的作业是在上游跑完之后,会先把结果写到一个中间文件里,然后下游再从中间文件里拉取数据进行处理。这种方式的好处就是可以节省资源,不需要上游和下游同时起来,在失败的情况下,也不需要从头执行。

那么,shuffle 过程中,中间结果是如何保存到中间文件,下游再拉取的?

12.1 Hash Shuffle

之前 Flink 引入的是 Hash shuffle,再以 All-to-all 的边举例,上游 task 产生的数据集,会给下游的每个 task 写一个单独的文件,这样系统可能会产生大量的小文件。并且不管是使用文件 IO 还是 mmap 的方式,写每个文件都至少使用一块缓冲区,会造成内存浪费。下游 task 随机读取的上游数据文件,也会产生大量随机 IO。

所以,之前 Flink 的 Hash shuffle 应用在批处理中,只能在规模比较小或者在用 SSD 的时候,在生产上才能比较 work。在规模比较大或者 SATA 盘上是有较大的问题的。

b32d12da024e4e63abe2e3a27d9ed16a.png

12.2 Sort Shuffle

经过 Flink 1.12 和 Flink 1.13 两个版本,引入了一种新的基于 Sort Merge 的 shuffle。这个 Sort 并不是指对数据进行 Sort,而是对下游所写的 task 目标进行 Sort。

大致的原理是,上游在输出数据时,会使用一个固定大小的缓冲区,避免缓冲区的大小随着规模的增大而增大,所有的数据都写到缓冲区里,当缓冲区满时,会做一次排序并写到一个单独文件里,后面的数据还是基于此缓存区继续写,续写的一段会拼到原来的文件后面。最后单个的上游任务会产生一个中间文件,由很多段组成,每个段都是有序的结构。

d2bbe93c31004702b65d8f9428257ea7.png

和其他的批处理的框架不太一样,这边并不是基于普通的外排序。一般的外排序是指会把这些段再做一次单独的 merge,形成一个整体有序的文件,这样下游来读的时候会有更好的 IO 连续性,防止每一段每一个 task 要读取的数据段都很小。但是,这种 merge 本身也是要消耗大量的 IO 资源的,有可能 merge 的时间带来的开销会远超过下游顺序读带来的收益。

所以,这里采用了另外一种方式:在下游来请求数据的时候,比如下图中的 3 个下游都要来读上游的中间文件,会有一个调度器对下游请求要读取的文件位置做一个排序,通过在上层增加 IO 调度的方式,来实现整个文件 IO 读取的连续性,防止在 SATA 盘上产生大量的随机 IO。

3526a68db2f24f2897191d640d87eadd.png

在 SATA 盘上,相对于 Hash shuffle,Sort shuffle 的 IO 性能可以提高 2~8 倍。通过 Sort shuffle,使得 Flink 批处理基本达到了生产可用的状态,在 SATA 盘上 IO 性能可以把磁盘打到 100 多M,而 SATA 盘最高也就能达到 200M 的读写速度。

为了保持兼容性,Sort shuffle 并不是默认启用的,用户可以控制下游并发达到多少来启用 Sort Merge Shuffle。并且可以通过启用压缩来进一步提高批处理的性能。Sort Merge shuffle 并没有额外占用内存,现在占用的上游读写的缓存区,是从 framework.off-heap 中抽出的一块。

c8f066fa34fc4691a341d9a5a6d75f5e.png


标题:Flink 入门篇
作者:yanghao
地址:http://solo.fancydigital.com.cn/articles/2021/12/27/1640592794683.html