大数据之Spark

Spark介绍

什么是Spark

  • 专为大规模数据处理而设计的快速通用的计算引擎
  • 类 Hadoop MapReduce 的通用并行计算框架
  • 拥有 Hadoop MapReduce 所具有的优点
  • 但不同于 MapReduce 的是 Job 中间输出结果可以缓存在内存中,从而不再需要读写 HDFS ,减少磁盘数据交互
  • 因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的算法
  • Spark 是 Scala 编写,方便快速编程

Spark与MR的区别

  • 都是分布式计算框架, Spark 计算中间结果基于内存缓存, MapReduce 基于 HDFS 存储
  • 也正因此,Spark 处理数据的能力一般是 MR 的三到五倍以上
  • Spark 中除了基于内存计算这一个计算快的原因,还有 DAG(DAGShecdule) 有向无环图来切分任务的执行先后顺序

Spark API

  • 多种编程语言的支持: Scala,Java,Python,R,SQL

Spark运行模式

  • Local

    • 多用于本地测试,如在 eclipse , idea 中写程序测试等。
  • Standalone

    • Standalone 是 Spark 自带的一个资源调度框架,它支持完全分布式。
  • Yarn

    • Hadoop 生态圈里面的一个资源调度框架, Spark 也是可以基于 Yarn 来计算的。
  • Mesos

    • 资源调度框架。

SparkCore

Partition

  • 概念

    • Spark RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中
    • RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若干个Partition组成
  • 分区方式

    • HashPartitioner(哈希分区)

      • Hash分区

      • HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区。

      • 其数据分区规则为 partitionId = Key.hashCode % numPartitions

        • partitionId代表该Key对应的键值对数据应当分配到的Partition标识
        • Key.hashCode表示该Key的哈希值
        • numPartitions表示包含的Partition个数。
    • RangePartitioner(范围分区)

      • 范围分区
      • Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。
      • HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,当某几种类型数据量较多时,就会造成若干Partition中包含的数据过大
      • 在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。
      • RangePartitioner基于抽样的思想来对数据进行分区
  • HDFS与Partition

    • block

      • hdfs中的block是分布式存储的最小单元
      • 类似于盛放文件的盒子,一个文件可能要占多个盒子,但一个盒子里的内容只可能来自同一份文件
    • partition

      • spark中的partition 是弹性分布式数据集RDD的最小单元
      • RDD是由分布在各个节点上的partition组成的
      • spark在计算过程中,生成的数据在计算空间内最小单元
    • 区别

      • block位于存储空间、partition 位于计算空间,
      • block的大小是固定的、partition 大小是不固定的,
      • block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到.
    • Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元

RDD

  • 名词解释

    • RDD(Resilient Distributed Dataset) 弹性分布式数据集。
  • RDD五大属性

    • 上图
  • RDD流程图

  • 注意

    • textFile 方法底层封装的是 MR 读取文件的方式,读取文件之前先进行 split 切片,默认 split大小是一个 block 大小。

    • RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据。

    • 什么是 K,V格式的RDD ?

      • 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们就叫做 K,V格式的RDD
    • 哪里体现 RDD 的弹性(容错)?

      • partition 数量,大小没有限制,体现了 RDD 的弹性。
      • RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD
    • 哪里体现 RDD 的分布式?

      • RDD 是由 Partition 组成, partition 是分布在不同节点上的。
      • RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。
  • Lineage血统

    • RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的
    • 如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来

系统架构

  • Master ( standalone 模式):资源管理的主节点(进程)。
  • Cluster Manager :在集群上获取资源的外部服务(例如: standalone ; yarn ; mesos )。
  • Worker ( standalone 模式):资源管理的从节点(进程)或者说是是管理本机资源的进程。
  • Application :基于 Spark 的用户程序,包含 driver 程序和运行在集群上的 executor 程序,即一个完整的 spark 应用 。
  • Dirver ( program ):用来连接工作进程( worker )的程序 。
  • Executor :是在一个 worker 进程所管理的节点上为某 Application 启动的一个个进程,这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executors 。
  • Task :被发送到 executor 上的工作单元。
  • Job :包含很多任务( Task )的并行计算,和 action 算子对应。
  • Stage :一个 job 会被拆分成很多组任务,每组任务被称为 Stage (就像 MapReduce 分为MapTask 和 ReduceTask 一样)。

算子(单文件)

转换算子

  • 概念

    • Transformations 类算子叫做转换算子(本质就是函数), Transformations 算子是延迟执行,也叫懒加载执行。
  • 常见 Transformation 类算子

    • filter :过滤符合条件的记录数, true 保留, false 过滤掉。

    • map :将一个 RDD 中的每个数据项,通过 map 中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。

    • flatMap :先 map 后 flat 。与 map 类似,每个输入项可以映射为0到多个输出项。

    • sample 随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样。

    • reduceByKey 将相同的 Key 根据相应的逻辑进行处理。

      • reduceByKey有一个操作combiner合并
        当有多个分区的时候会先将每个分区先进行reduceByKey再将结果传出
        所以在TOPN练习中reduceByKey((x,y))按理说x为sum总数,y为1但当有多个分区的时候y就会等于该分区的累加和

    • sortByKey / sortBy 作用在 K,V格式的RDD 上,对 key 进行升序或者降序排序。

行动算子

  • 概念

    • Action 类算子叫做行动算子, Action 类算子是触发执行。
    • 一个 application 应用程序中有几个 Action 类算子执行,就有几个 job 运行。
  • 常见 Action 类算子

    • count :返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。
    • take(n) :返回一个包含数据集前 n 个元素的集合。
    • first :效果等同于 take(1) ,返回数据集中的第一个元素。
    • foreach :循环遍历数据集中的每个元素,运行相应的逻辑。
    • collect :将计算结果回收到 Driver 端。

控制算子

  • 概念

    • 将 RDD 持久化,持久化的单位是 partition 。
    • 控制算子有三种, cache , persist , checkpoint 。 cache 和 persist 都是懒执行的。必须有一个 action 类算子触发执行。
    • checkpoint 算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。
  • cache

    • 默认将 RDD 的数据持久化到内存中。 cache 是懒执行。
    • cache() = persist() = persist(StorageLevel.Memory_Only)
    • rdd.cache().count() 返回的不是持久化的RDD,而是一个count的数值
  • persist

    • 可以指定持久化的级别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK 。

    • 持久化级别如下

    • cache是persist的特例,MEMORY_ONLY就是cache

  • checkpoint

    • checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系,也是懒执行。

    • 执行原理

      • 当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
      • 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。
      • Spark 框架会自动启动一个新的 job ,重新计算这个 RDD 的数据,将数据持久化到Checkpint目录中。
    • 使用 checkpoint 时常用优化手段

      • 对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache
      • 这样新启动的 job 只需要将内存中的数据拷贝到Checkpint目录中就可以,省去了重新计算这一步。
      • 但是缺点是:多花费一倍的内存空间

任务提交方式

Standalone-client

  • 提交命令

  • 执行流程

    • client 模式提交任务后,会在客户端启动 Driver 进程。
    • Driver 会向 Master 申请启动 Application 启动的资源。资源申请成功,
    • Driver 端将 task 分发到 worker 端执行,启动 executor 进程(任务的分发)。
    • Worker 端( exectuor 进程)将 task 执行结果返回到 Driver 端(任务结果的回收)
  • 总结

    • client 模式适用于测试调试程序。 Driver 进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在 Driver 端可以看到 task 执行的情况。
    • 生产环境下不能使用 client 模式,是因为:假设要提交100个 application 到集群运行,Driver 每次都会在 client 端启动,那么就会导致客户端100次网卡流量暴增的问题。

Standalone-cluster

  • 提交命令

  • 执行流程

    • cluster 模式提交应用程序后,会向 Master 请求启动 Driver 。
    • Master 接受请求,随机在集群一台节点启动 Driver 进程。
    • Driver 启动后为当前的应用程序申请资源。
    • Driver 端发送 task 到 worker 节点上执行(任务的分发)。
    • worker 上的 executor 进程将执行情况和执行结果返回给 Driver 端(任务结果的回收)。
  • 总结

    • Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的
      worker 节点都要有,因为此种方式, spark 不会自动上传包

    • 解决方案

      • 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。
      • 将所有的依赖包和文件各放一份在 worker 节点上。

yarn-client

  • 提交命令

  • 执行流程

    • 客户端提交一个 Application ,在客户端启动一个 Driver 进程。
    • 应用程序启动后会向 RS ( ResourceManager )(相当于 standalone 模式下的 master 进程)发送请求,启动 AM ( ApplicationMaster )。
    • RS 收到请求,随机选择一台 NM ( NodeManager )启动 AM 。这里的 NM 相当于 Standalone 中的 Worker 进程。
    • AM 启动后,会向 RS 请求一批 container 资源,用于启动 Executor 。
    • RS 会找到一批 NM (包含 container )返回给 AM ,用于启动 Executor 。
    • AM 会向 NM 发送命令启动 Executor 。
    • Executor 启动后,会反向注册给 Driver , Driver 发送 task 到 Executor ,执行情况和结果返回给 Driver 端。
  • 总结

    • Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地, Driver 会与 yarn 集群中的 Executor 进行大量的通信

    • ApplicationMaster (executorLauncher)的在此模式中的作用:

      • 为当前的 Application 申请资源
      • 给 NodeManager 发送消息启动 Executor 。
      • 注意: ApplicationMaster 在此种模式下没有作业调度的功能。

yarn-cluster

  • 提交命令

  • 执行流程

    • 客户机提交 Application 应用程序,发送请求到 RS ( ResourceManager ),请求启动AM ( ApplicationMaster )。
    • RS 收到请求后随机在一台 NM ( NodeManager )上启动 AM (相当于 Driver 端)。
    • AM 启动, AM 发送请求到 RS ,请求一批 container 用于启动 Excutor 。
    • RS 返回一批 NM 节点给 AM 。
    • AM 连接到 NM ,发送请求到 NM 启动 Excutor 。
    • Excutor 反向注册到 AM 所在的节点的 Driver 。 Driver 发送 task 到 Excutor 。
  • 总结

    • Yarn-Cluster 主要用于生产环境中因为 Driver 运行在 Yarn 集群中某一台 nodeManager中,每次提交任务的 Driver 所在的机器都是不再是提交任务的客户端机器,而是多个 NM 节点中的一台不会产生某一台机器网卡流量激增的现象但同样也有缺点,任务提交后不能看到日志。只能通过 yarn 查看日志

    • ApplicationMaster 在此模式中的的作用

      • 为当前的 Application 申请资源
      • 给 NodeManger 发送消息启动 Executor 。
      • 任务调度。

算子(多文件)

转换算子

  • 转换算子join

    • leftOuterJoin
    • rightOuterJoin
    • fullOuterJoin
    • 这些 join 都是作用在 K,V 格式的 RDD 上。根据 key 值进行连接,例如: (K,V)join(K,W)返回(K,(V,W))
    • 注意: join 后的分区数与父RDD分区数多的那一个相同。
  • union

    • 合并两个数据集。两个数据集的类型要一致。
    • 返回新的 RDD 的分区数是合并 RDD 分区数的总和。
  • intersection

    • 取两个数据集的交集。
    • 注意: join 后的分区数与父RDD分区数多的那一个相同。
  • subtract

    • 取两个数据集的差集。
    • 新分区数等于前面那个RDD的分区数
  • mapPartitions

    • mapPartition与 map 类似,单位是每个 partition 上的数据。
    • Map遍历的是每个元素
      mapPartition是按分区遍历,进迭代器返回迭代器
    • 分区数不变
  • distinct(map+reduceByKey+map)

    • 对 RDD 内数据去重。
    • 会对整个对象匹配,K,V都相同才去重
    • 分区数不变
  • cogroup

    • 当调用类型 (K,V) 和 (K,W) 的数据上时,返回一个数据集 (K,(Iterable,Iterable))
    • 全外连接,相同K会放在一起,若没有迭代器返回NIL
    • 新RDD分区数为父RDD分区数多的那一个

行动算子

  • foreachPartition

    • 遍历的数据是每个 partition 的数据。
    • 参数为迭代器

窄依赖和宽依赖

窄依赖

  • 父 RDD 和子 RDD 的 partition 之间的关系是一对一的。或者父 RDD 和子 RDD 的 partition 关系是多对一的。不会有 shuffle 的产生

宽依赖

  • 父 RDD 与子 RDD 的 partition 之间的关系是一对多。会有 shuffle 的产生。

宽窄依赖图理解

Stage(阶段)

图解

简介

  • stage

    • 根据RDD依赖关系形成一个DAG有向无环图
    • 若父RDD和子RDD不需要shuffle(窄依赖)我们可以将它们连接在一起,减少数据的网络传输
  • pipeline

    • 将窄依赖的RDD连接到一起,当前RDD链和其他RDD链不相关
    • 子RDD链不必等父RDD全部执行完才开始执行
    • 只需要等当前链的上一个task计算结果,当前task就可以执行

stage切割规则

  • 从后往前,遇到宽依赖就切割 stage

    • 1.从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
    • 2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的;
    • 3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask;
    • 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤;
  • 总结

    • 由于spark中stage的划分是根据shuffle来划分的,而宽依赖必然有shuffle过程,因此可以说spark是根据宽窄依赖来划分stage的。

stage计算模式

  • pipeline

    • 管道计算模式, pipeline 只是一种计算思想、模式

    • 在spark中pipeline是一个partition对应一个partition,所以在stage内部只有窄依赖

    • 数据一直在管道里面什么时候数据会落地

      • 对 RDD 进行持久化( cache , persist )。
      • shuffle write 的时候。
  • Stage 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定的

    • 如何改变 RDD 的分区数

      • reduceByKey(XXX,3)
      • GroupByKey(4)
      • sc.textFile(path,numpartition)
  • 使用算子时传递 分区num参数 就是分区 partition 的数量

Spark资源调度和任务调度

调度流程

  • 启动集群后, Worker 节点会向 Master 节点汇报资源情况, Master 掌握了集群资源情况。

  • 当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图。

  • 任务提交后, Spark 会在 Driver 端创建两个对象: DAGScheduler 和 TaskScheduler ,DAGScheduler 是任务调度的高层调度器,是一个对象

  • DAGScheduler 的主要作用就是将 DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage ,然后将这些 Stage 以 TaskSet 的形式提交给 TaskScheduler ( TaskScheduler 是任务调度的低层调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中的并行的 task 任务)

  • TaskSchedule 会遍历 TaskSet 集合,拿到每个 task 后会将 task 发送到 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。

  • task 在 Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试3次。如果重试3次依然失败,那么这个 task 所在的 stage 就失败了。

  • stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到 TaskScheduler , Stage 默认重试4次。如果重试4次以后依然失败,那么这个 job 就失败了。 job 失败了, Application 就失败了

  • TaskScheduler 不仅能重试失败的 task ,还会重试 straggling (落后,缓慢) task ( 也就是执行速度比其他task慢太多的task )。如果有运行缓慢的 task 那么 TaskScheduler 会启动一个新的task 来与这个运行缓慢的 task 执行相同的处理逻辑。两个 task 哪个先执行完,就以哪个 task的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行可以通过 spark.speculation 属性来配置

    • 注意

      • 对于 ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
      • 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有 task 重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

流程图解

粗细粒度资源申请

  • 粗粒度资源申请(Spark)

    • 在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。
    • 优点:在 Application 执行之前,所有的资源都申请完毕,每一个 task 直接使用资源就可以了,不需要 task 在执行前自己去申请资源, task 启动就快了, task 执行快了, stage 执行就快了,job 就快了, application 执行就快了。
    • 缺点:直到最后一个 task 执行完成才会释放资源,集群的资源无法充分利用。
  • 细粒度资源申请(MR)

    • Application 执行之前不需要先去申请资源,而是直接执行,让 job 中的每一个 task 在执行前自己去申请资源, task 执行完成就释放资源。
    • 优点:集群的资源可以充分利用。
    • 缺点: task 自己去申请资源, task 启动变慢, Application 的运行就响应的变慢了。

算子(分区)

转换算子

  • mapPartitionsWithIndex

    • 类似于 mapPartitions ,除此之外还会携带分区的索引值。
    • 参数为分区索引和每个分区的迭代器
  • repartition (重新分区)

    • 增加或减少分区。此算子会产生 shuffle 。
  • coalesce

    • 常用来减少分区,算子中第二个参数是减少分区的过程中是否产生 shuffle
    • true 为产生 shuffle , false 不产生 shuffle 。默认是 false 。
    • 如果 coalesce 设置的分区数比原来的 RDD 的分区数还多的话,第二个参数设置为 false 不会起作用(转换之后分区数大于之前),如果设置成 true ,效果和 repartition 一样
    • repartition(numPartitions) = coalesce(numPartitions,true)
  • groupByKey

    • 作用在 K,V 格式的 RDD 上。根据 Key 进行分组。作用在 (K,V) ,返回 (K,Iterable)
  • zip

    • 合并RDD,将两个 RDD 中的元素( KV格式/非KV格式 )变成一个 KV 格式的 RDD ,两个 RDD 的个数必须相同。其中一个RDD的元素为K一个为V
  • zipWithIndex

    • 该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从0开始)组合成 (K,V) 对

行动算子

  • countByKey

    • 作用到 K,V 格式的 RDD 上,根据 Key 计数相同 Key 的数据集元素个数。
  • countByValue

    • 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
  • reduce

    • 根据聚合逻辑聚合数据集中的每个元素,迭代

广播变量和累加器

广播变量

  • 广播变量理解图

    • 若每个Excutor中Task都需要用到某个变量,就需将变量传到每一个Excutor中,则可使用广播变量
  • 广播变量使用

  • 注意事项

    • 广播变量只能在 Driver 端定义,不能在 Executor 端定义。
    • 在 Driver 端可以修改广播变量的值,在 Executor 端无法修改广播变量的值。

累加器

  • 累加器理解图

    • Driver端定义变量,将变量封装到Task中发送到Executor,在Excutor端对变量进行累加,但dirver端变量的值并不会发生改变
    • 可以用到累加器,Excutor端累加器加1,然后更新Driver端累加器
  • 累加器的使用

  • 注意事项

    • 累加器在 Driver 端定义赋初始值,累加器只能在 Driver 端读取,在 Excutor 端更新。

SparkShuffle

SparkShuffle概念

  • reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>对的形式,这样每一个key对应一个聚合起来的value

  • 如何聚合

    • Shuffle Write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。
    • Shuffle Read:reduce task就会从上一个stage的所有task所在的机器上寻找属于自己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合。
  • Spark中有两种Shuffle类型,HashShuffle和SortShuffle

    • Spark1.2之前是HashShuffle
    • Spark1.2引入SortShuffle
    • spark2.0就只有sortshuffle。

HashShuffle

  • 普通机制

    • 执行流程

      • 每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。
      • 每个buffer文件最后对应一个磁盘小文件。
      • reduce task来拉取对应的磁盘小文件。
    • 总结

      • map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。

      • 产生的磁盘小文件的个数:

        • M(map task的个数)*R(reduce task的个数)
    • 产生的磁盘小文件过多,会导致以下问题

      • 在Shuffle Write过程中会产生很多写磁盘小文件的对象。
      • 在Shuffle Read过程中会产生很多读取磁盘小文件的对象。
      • 在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存的话,就会OOM。
      • 在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。
  • 合并机制

    • 执行流程

      • 合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制
      • 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
      • 假设第一个stage有50个task,第二个stage有100个task,总共还是有10个Executor,每个Executor执行5个task。那么原本使用未经优化的HashShuffleManager时,每个Executor会产生500个磁盘文件,所有Executor会产生5000个磁盘文件的。但是此时经过优化之后,每个Executor创建的磁盘文件的数量的计算公式为:CPU core的数量 * 下一个stage的task数量。也就是说,每个Executor此时只会创建100个磁盘文件,所有Executor只会创建1000个磁盘文件。
    • 总结

      • 产生磁盘小文件的个数: C(core的个数)*R(reduce的个数)

SortShuffle

(类似MapReduce的shuffle)

  • 普通机制

    • 执行流程

      • map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是 5M 。
      • 在 shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过 5M 时,比如现在内存结构中的数据为 5.01M ,那么他会申请 5.01*2-5=5.02M 内存给内存数据结构
      • 如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。
      • 在溢写之前内存结构中的数据会进行排序分区
      • 然后开始溢写磁盘,写磁盘是以 batch 的形式去写(批量),一个 batch 是1万条数据。
      • map task 执行完成后,会将这些 磁盘小文件 合并成一个大的磁盘文件,同时生成一个 索引文件 。
      • reduce task 去 map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
    • 总结

      • 产生磁盘小文件的个数: 2*M(map task的个数)
  • bypass机制

    • 触发条件

      • shuffle reduce task 的数量小于 spark.shuffle.sort.bypassMergeThreshold 的参数值。这个值默认是 200 。
      • 不需要进行 map 端的预聚合,比如 groupBykey , join 。
      • 产生的磁盘小文件为: 2*M(map task的个数) 。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>