MapReduce

MapReduce详解

MapReduce设计理念

map–>映射(key value)

reduce–>归纳

mapreduce必须构建在hdfs之上一种大数据离线计算框架

  • 在线:实时数据处理
  • 离线:数据处理时效性没有在线那么强,但是相对也需要很快得到结果

mapreduce不会马上得到结果,他会有一定的延时

  • 如果数据量小,使用mapreduce反而不合适
  • 杀鸡用牛刀

原始数据–>map(Key,Value)–>Reduce

分布式计算

  • 将大的数据切分成多个小数据,交给更多的节点参与运算

计算向数据靠拢

  • 将计算传递给有数据的节点上进行工作

MR的计算流程

原始数据File

  • 1T数据被切分成块存放在HDFS上,每一个块有128M大小

数据块Block

  • hdfs上数据存储的一个单元,同一个文件中块的大小都是相同的
  • 因为数据存储到HDFS上不可变,所以有可能块的数量和集群的计算能力不匹配
  • 我们需要一个动态调整本次参与计算节点数量的一个单位
  • 我们可以动态的改变这个单位–》参与的节点

切片Split

  • 切片是一个逻辑概念

  • 在不改变现在数据存储的情况下,可以控制参与计算的节点数目

  • 通过切片大小可以达到控制计算节点数量的目的

    • 有多少个切片就会执行多少个Map任务
  • 一般切片大小为Block的整数倍(2 1/2)

    • 防止多余创建和很多的数据连接
  • 如果Split>Block ,计算节点少了

  • 如果Split<Block ,计算节点多了

  • 默认情况下,Split切片的大小等于Block的大小 ,默认128M

  • 一个切片对应一个MapTask

MapTask

  • map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中

  • 我们可以根据自己书写的分词逻辑(空格分隔).计算每个单词出现的次数

  • 这是就会产生 (Map<String,Integer>)临时数据,存放在内存中

  • 但是内存大小是有限的,如果多个任务同时执行有可能内存溢出(OOM)

  • 如果把数据都直接存放到硬盘,效率太低

  • 我们需要在OOM和效率低之间提供一个有效方案

    • 可以现在内存中写入一部分,然后写出到硬盘

环形数据缓冲区

  • 可以循环利用这块内存区域,减少数据溢写时map的停止时间

    • 每一个Map可以独享的一个内存区域
    • 在内存中构建一个环形数据缓冲区(kvBuffer),默认大小为100M
    • 设置缓冲区的阈值为80%,当缓冲区的数据达到80M开始向外溢写到硬盘
    • 溢写的时候还有20M的空间可以被使用效率并不会被减缓
    • 而且将数据循环写到硬盘,不用担心OOM问题

分区Partation

  • 根据Key直接计算出对应的Reduce

  • 分区的数量和Reduce的数量是相等的

  • hash(key) % partation = num

  • 默认分区的算法是Hash然后取余

    • Object的hashCode()—equals()
    • 如果两个对象equals,那么两个对象的hashcode一定相等
    • 如果两个对象的hashcode相等,但是对象不一定equlas

排序Sort

  • 对要溢写的数据进行排序(QuickSort)
  • 按照先Partation后Key的顺序排序–>相同分区在一起,相同Key的在一起
  • 我们将来溢写出的小文件也都是有序的

溢写Spill

  • 将内存中的数据循环写到硬盘,不用担心OOM问题
  • 每次会产生一个80M的文件
  • 如果本次Map产生的数据较多,可能会溢写多个文件

合并Merge

  • 因为溢写会产生很多有序(分区 key)的小文件,而且小文件的数目不确定
  • 后面向reduce传递数据带来很大的问题
  • 所以将小文件合并成一个大文件,将来拉取的数据直接从大文件拉取即可
  • 合并小文件的时候同样进行排序(归并排序),最终产生一个有序的大文件

拉取Fetch

  • 我们需要将Map的临时结果拉取到Reduce节点

  • 原则

    • 相同的Key必须拉取到同一个Reduce节点
    • 但是一个Reduce节点可以有多个Key
  • 未排序前拉取数据的时候必须对Map产生的最终的合并文件做全序遍历

    • 而且每一个reduce都要做一个全序遍历
  • 如果map产生的大文件是有序的,每一个reduce只需要从文件中读取自己所需的即可

合并Merge

  • 因为reduce拉取的时候,会从多个map拉取数据
  • 那么每个map都会产生一个小文件,这些小文件(文件与文件之间无序,文件内部有序)
  • 为了方便计算(没必要读取N个小文件),需要合并文件
  • 归并算法合并成2个(qishishilia)
  • 相同的key都在一起

归并Reduce

  • 将文件中的数据读取到内存中
  • 一次性将相同的key全部读取到内存中
  • 直接将相同的key得到结果–>最终结果

写出Output

  • 每个reduce将自己计算的最终结果都会存放到HDFS上

MapReduce架构

MapReduce1.x

  • client

    • 客户端发送mr任务到集群

    • 客户端的种类有很多种

      • hadoop jar wordcount.jar
  • JobTracker

    • 接受客户端的mr任务

      • 选择一个资源丰富的,执行对应的任务
      • 并且给这个任务分配资源
    • 与TaskTracker保持心跳,接受汇报信息

  • TaskTracker

    • 保持心跳,汇报资源

      • 当前机器内存,当前机器任务数
    • 当分配资源之后,开始在本机分配对应的资源给Task

    • 并且实时监控任务的执行,并汇报

  • Slot(槽)

    • 属于JobTracker分配的资源
    • 计算能力,IO能力…
    • 不管任务大小,资源是恒定的,不灵活但是好管理
  • Task(MapTask–ReduceTask)

    • 开始按照MR的流程执行业务
    • 当任务完成时,JobTracker告诉TaskTracker回收资源
  • 缺点

    • 单点故障

    • 内存扩展

    • 业务瓶颈

    • 只能执行MR的操作

      • 如果其他框架需要运行在Hadoop上,需要独立开发自己的资源调度框架

MapReduce2.x

  • 2.x开始使用Yarn(Yet Another Resource Negotiator,另一种资源协调者)统一管理资源

  • 以后其他的计算框架可以直接访问yarn获取当前集群的空闲节点

  • client

    • 客户端发送mr任务到集群

    • 客户端的种类有很多种

      • hadoop jar wordcount.jar
  • ResourceManager

    • 资源协调框架的管理者

    • 分为主节点和备用节点(防止单点故障)

      • 主备的切换基于Zookeeper进行管理
    • 时刻与NodeManager保持心跳,接受NodeManager的汇报

      • NodeManager汇报当前节点的资源情况
    • 当有外部框架要使用资源的时候直接访问ResourceManager即可

    • 如果有MR任务,先去ResourceManager申请资源,ResourceManager根据汇报相对灵活分配资源

      • 资源在NodeManager1,NodeManager1要负责开辟资源
  • NodeManager

    • 资源协调框架的执行者
    • 每一个DataNode上默认有一个NodeManager
    • NodeManager汇报自己的信息到ResourceManager
  • Container

    • 2.x资源的代名词
    • Container动态分配的
  • ApplicationMaster

    • 我们本次JOB任务的主导者
    • 负责调度本次被分配的资源Container
    • 当所有的节点任务全部完成,application告诉ResourceManager请求杀死当前ApplicationMaster线程
    • 本次任务所有的资源都会被释放
  • Task(MapTask–ReduceTask)

    • 开始按照MR的流程执行业务
    • 当任务完成时,ApplicationMaster接收到当前节点的回馈
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>