[ 2024春节 Flink打卡 ] — 理论基础

2024,游子未归乡。工作需要,flink coding。觉知此事要躬行,未休,特记
之后,文档格式整理 文尾有word链接 相关代码陆续上传
Apache Flink 是一个在有界数据流和无界数据流上进行有状态计算分布式处理引擎和框架。Flink 设计旨在所有常见的集群环境中运行,以任意规模和内存级速度执行计算。

第 1 章 Flink 快速上手
1.1 Flink vs Spark
1.1.1 数据处理架构
从根本上说,Spark 和 Flink 采用了完全不同的数据处理方式。可以说,两者的世界观是截然相反的。
Spark 以批处理为根本,并尝试在批处理之上支持流计算;在 Spark 的世界观中,万物皆批次,离线数据是一个大批次,而实时数据则是由一个一个无限的小批次组成的。所以对于流处理框架 Spark Streaming 而言,其实并不是真正意义上的“流”处理,而是“微批次”
(micro-batching)处理,如图 1-8 所示。。

图 1-8 Spark Streaming 流处理示意
而 Flink 则认为,流处理才是最基本的操作,批处理也可以统一为流处理。在 Flink 的世界观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流。

  1. 无界数据流(Unbounded Data Stream)
    所谓无界数据流,就是有头没尾,数据的生成和传递会开始但永远不会结束,如图 1-9 所示。我们无法等待所有数据都到达,因为输入是无界的,永无止境,数据没有“都到达”的时候。所以对于无界数据流,必须连续处理,也就是说必须在获取数据后立即处理。在处理无界流时,为了保证结果的正确性,我们必须能够做到按照顺序处理数据。
  2. 有界数据流(Bounded Data Stream)
    对应的,有界数据流有明确定义的开始和结束,如图 1-9 所示,所以我们可以通过获取所有数据来处理有界流。处理有界流就不需要严格保证数据的顺序了,因为总可以对有界数据集进行排序。有界流的处理也就是批处理。

图 1-9 有界流与无界流

正因为这种架构上的不同,Spark 和 Flink 在不同的应用领域上表现会有差别。一般来说, Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark 能够处理的吞吐量更大,加上其完善的生态和成熟易用的 API,目前量数据的批处理领域,Spark 能够处理的吞吐量更大,加上其完善的生态和成熟易用的 API,目前同样优势比较明显。

1.1.2 数据模型和运行架构
Spark 和 Flink 在底层实现最主要的差别就在于数据模型不同。
Spark 底层数据模型是弹性分布式数据集(RDD),Spark Streaming 进行微批处理的底层接口DStream,实际上处理的也是一组组小批数据 RDD 的集合。可以看出,Spark 在设计上本身就是以批量的数据集作为基准的,更加适合批处理的场景。
而 Flink 的基本数据模型是数据流(DataFlow),以及事件(Event)序列。Flink 基本上是完全按照 Google 的 DataFlow 模型实现的,所以从底层数据模型上看,Flink 是以处理流式数据作为设计目标的,更加适合流处理的场景。
数据模型不同,对应在运行处理的流程上,自然也会有不同的架构。Spark 做批计算,需要将任务对应的 DAG 划分阶段(Stage),一个完成后经过 shuffle 再进行下一阶段的计算。而 Flink 是标准的流式执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理。

1.1.3 Spark 还是 Flink?
通过前文的分析,我们已经可以看出,Spark 和 Flink 可以说目前是各擅胜场,批处理领域 Spark 称王,而在流处理方面 Flink 当仁不让。具体到项目应用中,不仅要看是流处理还是批处理,还需要在延迟、吞吐量、可靠性,以及开发容易度等多个方面进行权衡。
如果在工作中需要从 Spark 和 Flink 这两个主流框架中选择一个来进行实时流处理,我们更加推荐使用Flink,主要的原因有:
 Flink 的延迟是毫秒级别,而 Spark Streaming 的延迟是秒级延迟。
 Flink 提供了严格的精确一次性语义保证。
 Flink 的窗口API 更加灵活、语义更丰富。
 Flink 提供事件时间语义,可以正确处理延迟数据。
 Flink 提供了更加灵活的可对状态编程的API。

基于以上特点,使用 Flink 可以解放程序员, 加快编程效率, 把本来需要程序员花大力气手动完成的工作交给框架完成。
当然,在海量数据的批处理方面,Spark 还是具有明显的优势。而且 Spark 的生态更加成熟,也会使其在应用中更为方便。相信随着Flink 的快速发展和完善,这方面的差距会越来越小。
另外,Spark 2.0 之后新增的 Structured Streaming 流处理引擎借鉴DataFlow 进行了大量优化,同样做到了低延迟、时间正确性以及精确一次性语义保证;Spark 2.3 以后引入的连续处理(Continuous Processing)模式,更是可以在至少一次语义保证下做到 1 毫秒的延迟。而 Flink自 1.9 版本合并Blink 以来,在 SQL 的表达和批处理的能力上同样有了长足的进步。

第 3 章 Flink 部署
Flink 中的几个关键组件:客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。我们的代码,实际上是由客户端获取并做转换,之后提交给 JobManger 的。所以 JobManager 就是 Flink 集群里的“领导者”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。这里的 TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的,如图 3-1 所示。

图 3-1 Flink 集群中的主要组件
3.1 部署模式
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的部署模式,主要有以下三种:
 会话模式(Session Mode)
 单作业模式(Per-Job Mode)
 应用模式(Application Mode)
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的 main 方法到底在哪里执行——客户端(Client)还是 JobManager。
3.1.1 会话模式(Session Mode)
会话模式(Session Mode) 单作业模式(Per-Job Mode) 应用模式(Application Mode)

先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业,。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。 会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式
会话模式比较适合于单个规模小、执行时间短的大量作业。 Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理平台来启动集群,比如 YARN、Kubernetes
应用模式引入
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给 JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所谓的应用模式
单作业模式vs应用模式
单作业模式与应用模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。
第 4 章 Flink 运行时架构
4.1 系统架构
Flink 的运行时架构中,最重要的就是两大组件:作业管理器(JobManger)和任务管理器
(TaskManager)。对于一个提交执行的作业,JobManager 是真正意义上的“管理者”(Master),负责管理调度,所以在不考虑高可用的情况下只能有一个;而 TaskManager 是“工作者”
(Worker、Slave),负责执行任务处理数据,所以可以有一个或多个。Flink 的作业提交和任务处理时的系统如图 所示。

JobManager 是一个Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。
TaskManager 是 Flink 中的工作进程,负责数据流的具体计算任务(task)。
4.1.1 作业管理器(JobManager)
JobManager 是一个Flink 集群中任务管理和调度的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的 JobManager 所控制执行。
JobManger 又包含 3 个不同的组件。

  1. JobMaster
    JobMaster 是 JobManager 中最核心的组件,负责处理单独的作业(job)。JobMaster 和具体的 job 是一一对应的,多个 job 可以同时运行在一个 Flink 集群中, 每个 job 都有一个自己的 JobMaster。
    在作业提交时,JobMaster 会先接收到要执行的应用。JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”(ExecutionGraph),它包含了所有可以并发执行的任务。JobMaster 会向资源管理器(ResourceManager)发出请求,申请执行任务必要的资源。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager 上。
    而在运行过程中,JobMaster 会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
  2. 资源管理器(ResourceManager)
    ResourceManager 主要负责资源的分配和管理,在 Flink 集群中只有一个。所谓“资源”,主要是指TaskManager 的任务槽(task slots)。任务槽就是 Flink 集群中的资源调配单元,包含了机器用来执行计算的一组 CPU 和内存资源。每一个任务(task)都需要分配到一个 slot 上执行。
    Flink 的ResourceManager,针对不同的环境和资源管理平台(比如 Standalone 部署,或者 YARN),有不同的具体实现。在 Standalone 部署时,因为 TaskManager 是单独启动的(没有 Per-Job 模式),所以 ResourceManager 只能分发可用TaskManager 的任务槽,不能单独启动新 TaskManager。
    而在有资源管理平台时,就不受此限制。当新的作业申请资源时,ResourceManager 会将

有空闲槽位的 TaskManager 分配给 JobMaster。如果 ResourceManager 没有足够的任务槽,它还可以向资源提供平台发起会话, 请求提供启动 TaskManager 进程的容器。另外, ResourceManager 还负责停掉空闲的TaskManager,释放计算资源。
3. 分发器(Dispatcher)
Dispatcher 主要负责提供一个 REST 接口,用来提交作业,并且负责为每一个新提交的作业启动一个新的 JobMaster 组件。Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。Dispatcher 在架构中并不是必需的,在不同的部署模式下可能会被忽略掉。

4.1.2 任务管理器(TaskManager)
TaskManager 是 Flink 中的工作进程,负责数据流的具体计算任务(task)。Flink 集群中必须至少有一个TaskManager;当然由于分布式计算的考虑,通常会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。Slot 是资源调度的最小单位,slots的数量限制了TaskManager 能够并行处理的任务数量。
启动之后,TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后, TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用,JobMaster 就可以分配任务来执行了。
在执行过程中,TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager
交换数据。

4.1.3 并行度(Parallelism)

  1. 并行子任务和并行度
    把一个算子操作,“复制”多份到多个节点,数据来了之后就可以到其中任意一个执行。这样一来,一个算子操作就被拆分成了多个并行的“子任务”(subtasks),再将它们分发到不同节点,就真正实现了并行计算。
    在Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。

图 4-8 并行数据流
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
如图 4-8 所示,当前数据流中有 Source、map()、keyBy()/window()/apply()、Sink 四个算子,除最后 Sink,其他算子的并行度都为 2。整个程序包含了 7 个子任务,至少需要 2 个分区来并行执行。我们可以说,这段流处理程序的并行度就是 2。
2. 并行度的设置
在 Flink 中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。
(1) 代码中设置
我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度:
stream.map((_,1)).setParallelism(2)
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用执行环境的 setParallelism()方法,全局设定并行度:
env.setParallelism(2)
这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于 keyBy()方法返回的不是算子,所以无法对 keyBy()设置并行度。
(2) 提交作业时设置
在使用 flink run 命令提交作业时,可以增加-p 参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:

如果我们直接在Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
(3) 配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度:

parallelism.default: 2
这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是提交时的-p 参数,都不是必须的。所以,在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。
4.1.4 任务(Tasks)和任务槽(Task Slots)

  1. 任务槽(Task Slots)
    Flink 中每一个 worker(也就是 TaskManager)都是一个 JVM 进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。
    为了控制并发量,我们需要在 TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的任务槽(task slots)。
    每个任务槽(task slot)其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。

  2. 任务槽数量的设置

图 4-13 TaskManager 的 slot 与任务分配

我们可以通过集群的配置文件来设定 TaskManager 的 slots 数量:
taskmanager.numberOfTaskSlots: 8
通过调整 slots 的数量,我们就可以控制子任务之间的隔离级别。
需要注意的是,slots 目前仅仅用来隔离内存,不会涉及CPU 的隔离。
3. 任务对任务槽的共享

图 4-14 子任务共享Slot
默认情况下,Flink 允许子任务共享 slots。如图 4-14 所示,只要属于同一个作业,那么对于不同任务节点的并行子任务,就可以放到同一个slot 上执行。
如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slots,我们也可以通过设置“slot 共享组”(SlotSharingGroup)手动指定:
.map((_,1)).slotSharingGroup(“1”);
这样,只有属于同一个 slot 共享组的子任务,才会开启 slots 共享;不同组之间的任务是完全隔离的,必须分配到不同的 slots 上。
4. 任务槽和并行度的关系
slots 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。简单来说,task slots 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数 taskmanager.numberOfTaskSlots 进行配置; 而并行度( parallelism ) 是动态概念, 也就是 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。

第 5 章 DataStream API

5.1 转换算子(Transformation)

图 5-4 转换算子(Transformation)

5.1.1 用户自定义函数(UDF)
Flink 的DataStream API 编程风格其实是一致的:基本上都是基于 DataStream 调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。
这些接口有一个共同特点:全部都以算子操作名称 + Function 命名,例如源算子需要实现 SourceFunction 接口, map 算子需要实现 MapFunction 接口, reduce() 算子需要实现 ReduceFunction 接口。我们不仅可以通过自定义函数类或者匿名类来实现接口,也可以直接传入 Lambda 表达式。这就是所谓的用户自定义函数(user-defined function,UDF)。

接下来我们就对这几种编程方式做一个梳理总结。

  1. 函数类(Function Classes)
    对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。
    所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。
    下面例子实现了FilterFunction 接口,用来筛选 url 中包含“home”的内容:

当然还可以通过匿名类来实现 FilterFunction 接口:
为了类可以更加通用,我们还可以将用于过滤的关键字“home”抽象出来作为类的属性,调用构造方法时传进去。

对于 Scala 这样的函数式编程语言,更为简单的写法是直接传入一个 Lambda 表达式:
stream.filter(_.url.contains(“home”)).print()
这样我们用一行代码就可以搞定,显得更加简洁明晰。
2. 富函数类(Rich Function Classes)
“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其 Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、 RichReduceFunction 等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
典型的生命周期方法有:
 open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 流的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在open()方法中完成。
 close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,
实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。来看一个例子:

输出结果是:
一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,推荐的最

佳实践如下:
另外,富函数类提供了 getRuntimeContext()方法(我们在本节的第一个例子中使用了一下),可以获取到运行时上下文的一些信息,例如程序执行的并行度,任务名称,以及状态
(state)。这使得我们可以大大扩展程序的功能,特别是对于状态的操作,使得 Flink 中的算子具备了处理复杂业务的能力。
5.2 自定义源算子(Source)
创建一个自定义的数据源,实现 SourceFunction 接口。主要重写两个关键方法:
run()和 cancel()。
 run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
 cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
5.3 自定义 Sink 输出
与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkFunction抽象类,只要实现它,通过简单地调用 DataStream 的 addSink()方法就可以自定义写入任何外部存储。

第 6 章 Flink 中的时间和窗口
6.1 时间语义
6.1.1 Flink 中的时间语义

图 6-1 流式数据的生成与处理
6.2 水位线(Watermark)

1、生成原理
watermark= 当前最大事件时间 - 乱序程度 -1ms
BoundedOutOfOrdernessWatermark 中明确指定
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));

2、生成方式分类
周期性:200ms
间歇性
3、flink怎么处理乱序、迟到数据
a.使用watermark 指定乱序程度 水位线生成策略
.assignTimestampsAndWatermarks( WatermarkStrategy.forBoundedOutOfOrdernessEvent

b.指定窗口允许迟到

.allowedLateness(Time.minutes(1))

c.侧输出流
.sideOutputLateData(outputTag)
水位线生成器
.withTimestampAssigner(
new SerializableTimestampAssigner[Event] {
override def extractTimestamp(t: Event, l: Long): Long = t.timestamp
}
)

6.2.1 什么是水位线
7 水位线的特性
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
我们可以总结一下水位线的特性:
 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
 水位线是基于数据的时间戳生成的
 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
 水位线可以通过设置延迟,来保证正确处理乱序数据
 一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据
水位线是 Flink 流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理
6.3 窗口(Window)
根据分配数据的规则,窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

第 7 章 处理函数
之前所介绍的流处理 API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于 DataStream 进行转换的;所以可以统称为DataStream API,这也是 Flink 编程的核心。而我们知道,为了让代码有更强大的表现力和易用性,Flink 本身提供了多层API,DataStream API 只是中间的一环,如图 7-1 所示:

在更底层,我们可以不定义任何具体的算子(比如map(),filter(),或者 window()),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。

7.1 基本处理函数(ProcessFunction)
处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。我们知道在 Flink 中几乎所有转换算子都提供了对应的函数类接口,处理函数也不例外;它所对应的函数类,就叫作ProcessFunction。

7.1.1 处理函数的功能和使用
我们之前学习的转换算子,一般只是针对某种具体操作来定义的,能够拿到的信息比较有限。比如 map()算子,我们实现的 MapFunction 中,只能获取到当前的数据,定义它转换之后的形式;而像窗口聚合这样的复杂操作,AggregateFunction 中除数据外,还可以获取到当前的状态(以累加器 Accumulator 形式出现)。另外我们还介绍过富函数类,比如 RichMapFunction,它提供了获取运行时上下文的方法 getRuntimeContext(),可以拿到状态,还有并行度、任务名称之类的运行时信息。
但是无论哪种算子,如果我们想要访问事件的时间戳,或者当前的水位线信息,都是完全做不到的。这时就需要使用处理函数(ProcessFunction)。
处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件
(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态
(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
处理函数的使用与基本的转换操作类似,只需要直接基于 DataStream 调用 process()方法就可以了。方法需要传入一个 ProcessFunction 作为参数,用来定义处理逻辑。
stream.process(new MyProcessFunction)
这里 ProcessFunction 不是接口, 而是一个抽象类, 继承了AbstractRichFunction; MyProcessFunction 是它的一个具体实现。所以所有的处理函数,都是富函数(RichFunction),富函数可以调用的东西这里同样都可以调用。

package com.xm.chapter07
import com.xm.chapter05.{ClickSource, Event}
import org.apache.flink.streaming.api.functions._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object ProcessFunctionExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

env
  .addSource(new ClickSource)
  .assignAscendingTimestamps(_.timestamp)
  .process(new ProcessFunction[Event, String] {
    // 每来一条元素都会调用一次
    override def processElement(i: Event, context: ProcessFunction[Event, String]#Context, collector: Collector[String]): Unit = {
      if (i.user.equals("Mary")) {
        // 向下游发送数据
        collector.collect(i.user)
      } else if (i.user.equals("Bob")) {
        collector.collect(i.user) 
        collector.collect(i.user)
      }
      // 打印当前水位线
      println(context.timerService.currentWatermark())
    }
  })
  .print()

env.execute()

}
}
7.1.1 ProcessFunction 解析
在源码中我们可以看到,抽象类 ProcessFunction 继承了 AbstractRichFunction,有两个泛型类型参数:I 表示 Input,也就是输入的数据类型;O 表示Output,也就是处理完成之后输出的数据类型。
内部单独定义了两个方法:一个是必须要实现的抽象方法 processElement();另一个是非抽象方法 onTimer()。

7.1.2 处理函数的分类
Flink 中的处理函数其实是一个大家族,ProcessFunction 只是其中一员。
Flink 提供了 8 个不同的处理函数:
(1) ProcessFunction
最基本的处理函数,基于DataStream 直接调用 process()时作为参数传入。
(2) KeyedProcessFunction

对流按键分区后的处理函数,基于 KeyedStream 调用 process()时作为参数传入。要想使用定时器,必须基于KeyedStream。
(3) ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream 调用 process()时作为参数传入。
(4) ProcessAllWindowFunction
同样是开窗之后的处理函数,基于AllWindowedStream 调用 process()时作为参数传入。
(5) CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用 process()时作为参数传入。
(6) ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用 process()时作为参数传入。
(7) BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用 process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
(8) KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用 process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。
7.2 按键分区处理函数(KeyedProcessFunction)
在 Flink 程序中,为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy()算子对数据流进行“按键分区”,得到一个 KeyedStream。而只有在 KeyedStream 中,才支持使用 TimerService 设置定时器的操作。所以一般情况下,我们都是先做了 keyBy()分区之后,再去定义处理操作;代码中更加常见的处理函数是 KeyedProcessFunction。
7.3 侧输出流(Side Output)
处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。这个概念我们并不陌生,之前在讲到窗口处理迟到数据时,最后一招就是输出到侧输出流。而这种处理方式的本质,其实就是处理函数的侧输出流功能。
具体应用时,只要在处理函数的 processElement()或者 onTimer()方法中,调用上下文的 output()方法就可以了。
这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据。
我们可以在外部先将 OutputTag 声明出来:

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用 getSideOutput()
方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。
第 8 章 多流转换
多流转换可以分为“分流”和“合流”两大类。目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union()、connect()、join()等接口进行连接合并操作。
8.1.1 联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union),如图 8-2所示。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
8.1.2 连接(Connect)

  1. 连接流(ConnectedStreams)
  2. CoProcessFunction
  3. 广播连接流(BroadcastConnectedStream)
    8.2 基于时间的合流——双流联结(Join)
    对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来,“配对”去做处理。这种需求与关系型数据库中表的 join 操作非常相近。事实上,Flink 中两条流的 connect()操作,就可以通过 keyBy()指定键进行分组后合并,实现了类似于 SQL 中的 join 操作;另外 connect()支持处理函数,可以使用自定义状态和 TimerService 灵活实现各种需求,其实已经能够处理双流合并的大多数场景。
    不过处理函数是底层接口,所以尽管 connect()能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要设置定时器、自定义触发逻辑来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink 的DataStrema API 提供了两种内置的join()算子。

注:SQL 中 join 一般会翻译为“连接”;我们这里为了区分不同的算子,一般的合流操作
connect()翻译为“连接”,而把 join()翻译为“联结”。

8.2.1 窗口联结(Window Join)
基于时间的操作,最基本的当然就是时间窗口了。我们之前已经介绍过 Window API 的用法,主要是针对单一数据流在某些时间段内的处理计算。那如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计,又该怎么做呢?
Flink 为这种场景专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

8.2.2 间隔联结(Interval Join)
Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

第 9 章 状态编程

在流处理中,数据是连续不断到来和处理的。每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

9.1.1 有状态算子
在 Flink 中,算子任务可以分为无状态和有状态两种情况。
无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如图 9-1 所示。例如,可以将一个字符串类型的数据拆分开作为元组输出;也可以对数据做一些计算,比如每个代表数量的字段加 1。我们之前讲到的基本转换算子,如 map()、filter()、 flatMap(),计算时不依赖其他数据,就都属于无状态的算子。

图 9-1 无状态算子
而有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的 “其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。比如,做求和(sum)计算时,需要保存之前所有数据的和,这就是状态;窗口算子中会保存已经到达的所有数据,这些也都是它的状态。

图 9-2 有状态算子处理流程
如图 9-2 所示为有状态算子的一般处理流程,具体步骤如下。
(1) 算子任务接收到上游发来的数据;
(2) 获取当前状态;
(3) 根据业务逻辑进行计算,更新状态;
(4) 得到计算结果,输出发送到下游任务。
9.1.2 状态的管理
大数据的场景下,我们必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题就会随之而来了。
 状态的访问权限。我们知道 Flink 上的聚合和窗口操作,一般都是基于 KeyedStream的,数据会按照 key 的哈希值进行分区,聚合处理的结果也应该是只对当前 key 有效。然而同一个分区(也就是 slot)上执行的任务实例,可能会包含多个key 的数据,它们同时访问和更改本地变量,就会导致计算结果错误。所以这时状态并不是单纯的本地变量。
 容错性,也就是故障后的恢复。状态只保存在内存中显然是不够稳定的,我们需要将它持久化保存,做一个备份;在发生故障后可以从这个备份中恢复状态。
 我们还应该考虑到分布式应用的横向扩展性。比如处理的数据量增大时,我们应该相应地对计算资源扩容,调大并行度。这时就涉及了状态的重组调整。
可见状态的管理并不是一件轻松的事。好在 Flink 作为有状态的大数据流式处理框架,已经帮我们搞定了这一切。Flink 有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态的高效存储和访问、持久化保存和故障恢复,以及资源扩展时的调整。这样,我们只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上。

9.1.3 状态的分类

  1. 托管状态(Managed State)和原始状态(Raw State)
    Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
    具体来讲,托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。
    而对比之下,原始状态就全部需要自定义了。Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。
    所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用Flink 提供的算子或者自定义托管状态来实现需求。
  2. 算子状态(Operator State)和按键分区状态(Keyed State)
    接下来我们的重点就是托管状态(Managed State)。
    我们知道在Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slots)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink
    能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
    而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy()进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。
    基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。
    (1) 算子状态(Operator State)
    状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的,如图 9-3 所示。

图 9-3 算子状态(Operator State)
算子状态可以用在所有算子上,使用的时候其实就跟一个本地变量没什么区别——因为本地变量的作用域也是当前任务实例。在使用时,我们还需进一步实现CheckpointedFunction 接口。
(2) 按键分区状态(Keyed State)
状态是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流
(KeyedStream)中,也就keyBy()之后才可以使用,如图 9-4 所示。

图 9-4 按键分区状态(Keyed State)
按键分区状态应用非常广泛。之前讲到的聚合算子必须在 keyBy()之后才能使用,就是因为聚合的结果是以Keyed State 的形式保存的。另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。
所以即使是 map()、filter()这样无状态的基本转换算子,我们也可以通过富函数类给它们 “追加”Keyed State,或者实现CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,不愧是“有状态的流处理”。
无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。关于状态的具体使用,我们会在下面继续展开讲解。
9.2 按键分区状态(Keyed State)
在实际应用中,我们一般都需要将数据按照某个 key 进行分区,然后再进行计算处理;所以最为常见的状态类型就是 Keyed State。之前介绍到 keyBy 之后的聚合、窗口计算,算子所持有的状态,都是Keyed State。
另外,我们还可以通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是Keyed State。

9.2.1 基本概念和特点
按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以key 为作用范围进行隔离。
在进行按键分区之后,具有相同键的所有数据,都会分配到同一个并行子任务中;所以如果当前任务定义了状态,Flink 就会在当前并行子任务实例中,为每个键值维护一个状态的实例。于是当前任务就会为分配来的所有数据,按照key 维护和处理对应的状态。
在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。
需要注意,使用Keyed State 必须基于KeyedStream。没有进行 keyBy 分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。
9.3 算子状态(Operator State)
除按键分区状态(Keyed State)之外,另一大类受控状态就是算子状态(Operator State)。从某种意义上说,算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同key 的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法也会有一些区别。

9.3.1 基本概念和特点
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State。
算子状态的实际应用场景不如Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有key 定义的场景。
当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

9.3.2 状态类型
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState 和 BroadcastState。

  1. 列表状态(ListState)
    与 Keyed State 中的 ListState 一样,将状态表示为一组数据的列表。
    与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
    当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(Round Robin),与之前介绍的 rebanlance()数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。
  2. 联合列表状态(UnionListState)
    与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
    UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
  3. 广播状态(BroadcastState)
    有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。
    因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
    在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。
在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

9.4 状态持久化和状态后端
在 Flink 的状态管理机制中,很重要的一个功能就是对状态进行持久化(persistence)保存,这样就可以在发生故障后进行重启恢复。Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

9.4.1 检查点(Checkpoint)
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
如果保存检查点之后又处理了一些数据,然后发生了故障,那么重启恢复状态之后这些数据带来的状态改变会丢失。为了让最终处理结果正确,我们还需要让源(Source)算子重新读取这些数据,再次处理一遍。这就需要流的数据源具有“数据重放”的能力,一个典型的例子就是Kafka,我们可以通过保存消费数据的偏移量、故障重启后重新提交来实现数据的重放。这是对“至少一次”(at least once)状态一致性的保证,如果希望实现“精确一次”(exactly once)的一致性,还需要数据写入外部系统时的相关保证。关于这部分内容我们会在第 10 章继续讨论。
默认情况下, 检查点是被禁用的, 需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。

这里传入的参数是检查点的间隔时间,单位为毫秒。关于检查点的详细配置,可以参考第
10 章的内容。
除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。

9.4.2 状态后端(State Backends)
检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;
TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向 JobManager 返回确认信息。这个过程是分布式的,当 JobManger 收到所有 TaskManager 的返回信息后,就会确认当前检查点成功保存,如图 9-5 所示。而这一切工作的协调,就需要一个“专职人员”来完成。

图 9-5 检查点的保存
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

  1. 状态后端的分类
    状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。 Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是HashMapStateBackend。
    (1) 哈希表状态后端(HashMapStateBackend)
    这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
    对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。
    HashMapStateBackend 是将本地状态全部放入内存的,这样可以获得最快的读写速度,使计算性能达到最佳;代价则是内存的占用。它适用于具有大状态、长窗口、大键值状态的作业,对所有高可用性设置也是有效的。
    (2) 内嵌RocksDB 状态后端(EmbeddedRocksDBStateBackend)
    RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置
    EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB
    默认存储在TaskManager 的本地数据目录里。
    与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在 RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key 的比较也会按照字节进行,而不是直接调用 hashCode()和 equals()方法。
    对于检查点,同样会写入到远程的持久化文件系统中。
    EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。
    由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。
  2. 如何选择正确的状态后端
    HashMap 和RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。在实际应用中,选择那种状态后端,主要是需要根据业务需求在处理性能和应用的扩展性上做一个选择。
    HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
    而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend 慢一个数量级。
    我们可以发现,实际应用就是权衡利弊后的取舍。
  3. 状态后端的配置
    在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。
    (1) 配置默认的状态后端
    在 flink-conf.yaml 中,可以使用 state.backend 来配置默认状态后端。
    配置项的可能值为 hashmap,这样配置的就是 HashMapStateBackend;也可以是 rocksdb,这样配置的就是EmbeddedRocksDBStateBackend。
    下面是一个配置HashMapStateBackend 的例子:

这里的 state.checkpoints.dir 配置项,定义了状态后端将检查点和元数据写入的目录。
(2) 为每个作业(Per-job)单独配置状态后端
每个作业独立的状态后端,可以在代码中,基于作业的执行环境直接设置。代码如下:

上面代码设置的是HashMapStateBackend,如果想要设置 EmbeddedRocksDBStateBackend,可以用下面的配置方式:

需要注意,如果想在IDE 中使用 EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

第 10 章 容错机制
10.1 检查点(Checkpoint)
检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”。

10.1.1 检查点的保存
什么时候进行检查点的保存呢?最理想的情况下,我们应该“随时”保存,也就是每处理完一个数据就保存一下当前的状态;这样如果在处理某条数据时出现故障,我们只要回到上一个数据处理完之后的状态,然后重新处理一遍这条数据就可以。

  1. 周期性的触发保存
    “随时存档”确实恢复起来方便,可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存,当大量数据同时到来时,就会耗费很多资源来频繁做检查点,数据处理的速度就会受到影响。所以更好的方式是,每隔一段时间去做一次存档,这样既不会影响数据的正常处理,也不会有太大的延迟。在 Flink 中,检查点的保存是周期性触发的,间隔时间可以进行设置。
    所以检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
  2. 保存的时间点
    我们保存状态的策略是:当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。
    首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(Source)任务向数据源重新提交偏移量、请求重放数据就可以了。这需要源任务可以把偏移量作为算子状态保存下来,而且外部数据源能够重置偏移量;Kafka 就是满足这些要求的一个最好的例子,我们会在后面详细讨论。
    10.1.2 检查点算法
    在 Flink 中,对检查点的保存采用了基于 Chandy-Lamport 算法的分布式快照,下面我们就来详细了解一下。
  3. 检查点分界线(Barrier)

把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的 “分界线”(Checkpoint Barrier)。
10.1.3 保存点(Savepoint)
除了检查点(checkpoint)外,Flink 还提供了另一个非常独特的镜像保存功能——保存点
(savepoint)。
第 12 章 Flink CEP
所谓CEP,其实就是“复杂事件处理(Complex Event Processing)”的缩写;而 Flink CEP,就是 Flink 实现的一个用于复杂事件处理的库(library)。
那到底什么是“复杂事件处理”呢?就是可以在事件流里,检测到特定的事件组合并进行处理,比如说“连续登录失败”,或者“订单支付超时”等等。
具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是“复杂事件”;然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。
总结起来,复杂事件处理(CEP)的流程可以分成三个步骤:
(1) 定义一个匹配规则
(2) 将匹配规则应用到事件流上,检测满足规则的复杂事件
(3) 对检测到的复杂事件进行处理,得到结果进行输出

图 12-1 复杂事件模式匹配

12.1.2 应用场景
CEP 主要用于实时流数据的分析处理。CEP 可以帮助在复杂的、看似不相关的事件流中找出那些有意义的事件组合,进而可以接近实时地进行分析判断、输出通知信息或报警。这在企业项目的风控管理、用户画像和运维监控中,都有非常重要的应用。
 风险控制
设定一些行为模式,可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付(刷单),就可以向用户发送通知信息,或是进行报警提示、由人工进一步判定用户是否有违规操作的嫌疑。这样就可以有效地控制用户个人和平台的风险。
 用户画像
利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有特定行为习惯的一些用户,做出相应的用户画像。基于用户画像可以进行精准营销,即对行为匹配预定义规则的用户实时发送相应的营销推广;这与目前很多企业所做的精准推荐原理是一样的。
 运维监控
对于企业服务的运维管理,可以利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。
CEP 的应用场景非常丰富。很多大数据框架,如 Spark、Samza、Beam 等都提供了不同的 CEP 解决方案,但没有专门的库(library)。而 Flink 提供了专门的CEP 库用于复杂事件处理,可以说是目前CEP 的最佳解决方案。

文档
https://mp.csdn.net/mp_download/manage/download/UpDetailed
github

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码

)">
< <上一篇
下一篇>>