Hadoop-MapReduce-源码跟读-ReduceTask阶段篇

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、Reducer类

我们先看下我们写的reduce所继承的Reducer类

public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

  /**
   * 传递给Reducer实现的上下文
   */
  public abstract class Context 
    implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  }

  /**
   * 在任务开始时调用一次
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * 每个键调用一次此方法。大多数应用程序将通过重写此方法来定义其reduce类。默认实现是标识函数
   * 
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  /**
   * 在任务结束时调用一次
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * 高级应用程序编写者可以使用 run(org.apache.hadop.mapreduce.Reporter.Context) 方法
   * 来控制reduce任务的工作方式
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        //如果使用了备份存储,请将其重置
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }
    } finally {
      cleanup(context);
    }
  }
}

经过该类的注释我们可以得到以下信息:

1、将map阶段输出的key相同的一组中间值缩减为一组较小的值

2、Reducer实现可以通过JobContext.getConfiguration()访问作业的配置

3、Reducer有三个主要阶段

        3.1、Shuffle:Reducer使用HTTP在网络上复制每个Mapper的排序输出

        3.2、Sort:框架合并多个Mapper的输出并按key进行排序(因为不同的Mapper可能输出相同的key)

        Shuffle 和 Sort 阶段同时发生,即当提取输出时,它们被合并

        3.3、SecondarySort:为了对迭代器返回的value进行二次排序,应用程序应该用二次关键字扩展关键字,并定义一个分组比较器。键将使用整个键进行排序,但将使用分组比较器进行分组,以决定在同一调用中发送哪些key和value到reduce。分组比较器是通过Job.setGroupingComparatorClass(Class)指定的。排序顺序由Job.setSortCompratorclass(Class)控制

4、Reduce阶段:为排序输入中的每<key,value集合>调用reduce()

5、ReduceTask的输出通常通过Context.write()写入RecordWriter

6、Reducer的输出未重新排序

三、ReduceTask是如何调起的

ReduceTask和MapTask一样都是由YarnChild启动的,详细可以看下上一篇博客<Hadoop-MapReduce-源码跟读-MapTask阶段篇>中的MapTask调起过程

四、ReduceTask运行细节(源码跟读)

这里我们就不从YarnChild开始捋了,而是从ReduceTask的run方法开始跟读

请注意:以下分析的是一个Job中一个ReduceTask的源码,一个Job可以有多个ReduceTask

1、ReduceTask

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      //Progress帮助生成进度报告的实用程序。是层次结构。子阶段的节点通过调用addPhase()创建
      //添加copy、sort、reduce到流程中
      copyPhase = getProgress().addPhase("copy");
      sortPhase  = getProgress().addPhase("sort");
      reducePhase = getProgress().addPhase("reduce");
    }
    //启动将处理与父级通信的线程(创建TaskReporter并启动通信线程)
    TaskReporter reporter = startReporter(umbilical);
    
    //是否是哟个新API,默认false,可以通过mapred.reducer.new-api设置
    boolean useNewApi = job.getUseNewReducer();
    //初始化:
    //    1、构建job的上下文
    //    2、构建尝试任务的上下文
    //    3、更改任务的状态UNASSIGNED到RUNNING
    //    4、获取输出格式化类,默认是TextOutputFormat.class 可以通过mapreduce.job.outputformat.class设置
    //    5、获取输出提交器,框架依赖输出提交器做一下操作:
    //        5.1、在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录
    //        5.2、作业完成后清理作业。例如,在作业完成后删除临时输出目录
    //        5.3、设置任务临时输出
    //        5.4、检查任务是否需要提交。这是为了在任务不需要提交的情况下避免提交过程
    //        5.5、任务输出的提交
    //        5.6、放弃任务提交
    //    6、获取输出目录,并将其设置为工作目录
    //    7、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)
    //    8、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。
    //    9、更新进程树
    //    10、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)
    initialize(job, getJobID(), reporter, useNewApi);

    //检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }
    
    //初始化编解码器
    //检查是否要压缩map输出数据
    codec = initCodec();
    //RawKeyValueIterator是一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代。
    RawKeyValueIterator rIter = null;
    ShuffleConsumerPlugin shuffleConsumerPlugin = null;
    
    //获取用户定义的组合器类,该类用于在将map输出发送到reduce之前组合映射输出。通常,组合器与作业的 Reducer 相同,即 getReducerClass(),可以通过mapred.combiner.class设置
    Class combinerClass = conf.getCombinerClass();
    //构建组合器的输出收集者
    //默认收集10000个<key,value>向框架汇报一次,可以通过mapreduce.task.combine.progress.records设置
    CombineOutputCollector combineCollector = 
      (null != combinerClass) ? 
     new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

    //获取Shuffle类
    //默认是Shuffle.class,可以通过mapreduce.job.reduce.shuffle.consumer.plugin.class设置
    Class<? extends ShuffleConsumerPlugin> clazz =
          job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
					
    //利用反射获取ShuffleConsumerPlugin
    //ShuffleConsumerPugin用于服务Reducers。它可以从内置的ShuffleHandler或第三方辅助服务中打乱MOF文件。
    shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
    LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

    //构建Shuffle上下文(如果是本地模式,会设置localMapFiles,即:任务map输出都是在本地)
    ShuffleConsumerPlugin.Context shuffleContext = 
      new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                  super.lDirAlloc, reporter, codec, 
                  combinerClass, combineCollector, 
                  spilledRecordsCounter, reduceCombineInputCounter,
                  shuffledMapsCounter,
                  reduceShuffleBytes, failedShuffleCounter,
                  mergedMapOutputsCounter,
                  taskStatus, copyPhase, sortPhase, this,
                  mapOutputFile, localMapFiles);
    //根据Shuffle上下文初始化Shuffle消费者插件,默认会调用Shuffle.init()
    //    1、获取MapTask的个数并设置剩余MapTask和完成MapTask数量
    //    2、设置失败限制:Math.max(30, totalMaps / 10) 既:最小值为30
    //    3、设置重新拉取时间、拉取失败个数(默认5)等
    //    4、创建合并管理器
    shuffleConsumerPlugin.init(shuffleContext);

    //Shuffle开始,我们看看具体实现(第2步)
    rIter = shuffleConsumerPlugin.run();

    //释放数据结构
    mapOutputFilesOnDisk.clear();
    
    //排序阶段完成
    sortPhase.complete();                         
    //开始REDUCE阶段
    setPhase(TaskStatus.Phase.REDUCE); 
    //发生状态变更到task tracker
    statusUpdate(umbilical);
    Class keyClass = job.getMapOutputKeyClass();
    Class valueClass = job.getMapOutputValueClass();
    //获取用户定义的WritableComparable比较器,用于对reduce的输入所有key进行分组。
    RawComparator comparator = job.getOutputValueGroupingComparator();

    if (useNewApi) {
      //我们看新API
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }

    shuffleConsumerPlugin.close();
    done(umbilical, reporter);
  }


private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    final RawKeyValueIterator rawIter = rIter;
    rIter = new RawKeyValueIterator() {
      public void close() throws IOException {
        rawIter.close();
      }
      public DataInputBuffer getKey() throws IOException {
        return rawIter.getKey();
      }
      public Progress getProgress() {
        return rawIter.getProgress();
      }
      public DataInputBuffer getValue() throws IOException {
        return rawIter.getValue();
      }
      public boolean next() throws IOException {
        boolean ret = rawIter.next();
        reporter.setProgress(rawIter.getProgress().getProgress());
        return ret;
      }
    };
    //制作一个任务上下文,以便我们可以获得类
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
          getTaskID(), reporter);
    //制作用户自定义reducer
    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
      (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    //制作RecordWriter
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW = 
      new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
    job.setBoolean("mapred.skip.on", isSkipping());
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    //创建reduce的上下文,comparator是vlaue的比较器
    org.apache.hadoop.mapreduce.Reducer.Context 
         reducerContext = createReduceContext(reducer, job, getTaskID(),
                                               rIter, reduceInputKeyCounter, 
                                               reduceInputValueCounter, 
                                               trackedRW,
                                               committer,
                                               reporter, comparator, keyClass,
                                               valueClass);
    try {
      //运行自定义的reduce(我们还是看WordCount的reduce)
      reducer.run(reducerContext);
    } finally {
      trackedRW.close(reducerContext);
    }
  }

2、Shuffle

Shuffe阶段会返回一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代

public RawKeyValueIterator run() throws IOException, InterruptedException {
    //缩放我们每次RPC调用获取的最大事件数,以缓解ApplicationMaster上的OOM问题
    // TODO: 在 HADOOP-8942 之后应该没有必要这样做
    //MIN_EVENTS_TO_FETCH = 100
    //MAX_RPC_OUTSTANDING_EVENTS = 3000000
    //我们为此作业配置的reduce任务数。默认为1
    //因此eventsPerReducer的取值范围是 [100,3000000]
    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
        MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
    //MAX_EVENTS_TO_FETCH = 10000
    //eventsPerReducer = [100,3000000]
    //因此 maxEventsToFetch 取值范围是 [100,10000]
    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

    //启动MapTask完成事件获取线程EventFetcher
    //EventFetcher 会在TaskTracker中查询给定事件ID中的一组MapTask完成事件
(TaskCompletionEvent.Status.SUCCEEDED)
    //处理任务完成事件:
    //    1.将SUCCEEDED映射保存在knownOutput中以获取输出。
    //    2.将OBSOLETE/FAILED/KILLED状态的map保存在obsoleteOutput中,以停止从这些map中获取。
    //    3.从neededOutput中删除TIPFAILED状态的map,因为我们根本不需要它们的输出。
    final EventFetcher<K, V> eventFetcher =
        new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
            maxEventsToFetch);
    eventFetcher.start();
    
    //启动map输出提取器线程Fetcher
    //localMapFiles:
    //为map和reduce的瞬态存储操作工作区域。
    //map和reduce任务使用此类来标识中间文件需要写入/读取的目录。
    //如果job为本地作业那么localMapFiles是不为空的,既isLocal=true 如果是分布式isLocal=false
    boolean isLocal = localMapFiles != null;
    //在Shuffle阶段,reduce运行的默认并行传输数。默认5个,可以通过mapreduce.reduce.shuffle.parallelcopies设置
    final int numFetchers = isLocal ? 1 :
        jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
    //生成Fetcher数组
    Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];
    
    
    if (isLocal) {
      //如果是本地模式,就创建一个LocalFetcher线程即可
      //LocalJobRunner使用LocalFetcher执行本地文件系统获取
      fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
          merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
          localMapFiles);
      fetchers[0].start();
    } else {
      //分布式环境下需要创建多个Fetcher从不同节点拉取数据
      for (int i=0; i < numFetchers; ++i) {
        fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,
                                       reporter, metrics, this, 
                                       reduceTask.getShuffleSecret());
        fetchers[i].start();
      }
    }
    
    //等待Shuffle成功完成
    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
      reporter.progress();
      
      synchronized (this) {
        if (throwable != null) {
          throw new ShuffleError("error in shuffle in " + throwingThreadName,
                                 throwable);
        }
      }
    }

    //停止map完成事件获取线程,因为Shuffle阶段已经完了
    eventFetcher.shutDown();
    
    //停止map输出获取线程
    for (Fetcher<K, V> fetcher : fetchers) {
      fetcher.shutDown();
    }
    
    //停止调度
    scheduler.close();

    copyPhase.complete(); //复制已完成
    //设置此任务的当前阶段为SORT
    taskStatus.setPhase(TaskStatus.Phase.SORT);
    //向 task tracker 发送状态更新
    reduceTask.statusUpdate(umbilical);

    //完成正在进行的合并
    RawKeyValueIterator kvIter = null;
    try {
      //kvIter 就是<key,value>的迭代器了,也就是传给reduce方法的值,那么SORT阶段就是在这里做了,下面我们详细看下merger.close()中的实现(第3步)
      kvIter = merger.close();
    } catch (Throwable e) {
      throw new ShuffleError("Error while doing final merge ", e);
    }

    // Sanity check
    synchronized (this) {
      if (throwable != null) {
        throw new ShuffleError("error in shuffle in " + throwingThreadName,
                               throwable);
      }
    }
    
    return kvIter;
  }

3、MergeManagerImpl

在这里会对从各个map拉取的数据做排序、合并处理

public RawKeyValueIterator close() throws Throwable {
    //等待正在进行的合并完成
    if (memToMemMerger != null) { 
      memToMemMerger.close();
    }
    inMemoryMerger.close();
    onDiskMerger.close();
    //内存中的map输出数据
    List<InMemoryMapOutput<K, V>> memory = 
      new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
    inMemoryMergedMapOutputs.clear();
    memory.addAll(inMemoryMapOutputs);
    inMemoryMapOutputs.clear();
    //磁盘中的map输出数据
    List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
    onDiskMapOutputs.clear();
    return finalMerge(jobConf, rfs, memory, disk);
  }

private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
                                       List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
                                       List<CompressAwarePath> onDiskMapOutputs
                                       ) throws IOException {
    LOG.info("finalMerge called with " +
        inMemoryMapOutputs.size() + " in-memory map-outputs and " +
        onDiskMapOutputs.size() + " on-disk map-outputs");
    //获取在ReduceTask中可用的最大内存限制
    final long maxInMemReduce = getMaxInMemReduceLimit();
    // merge config params
    Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
    Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
    //是否应保留失败任务的临时文件?默认false,可以通过mapreduce.task.files.preserve.failedtasks设置
    boolean keepInputs = job.getKeepFailedTaskFiles();
    final Path tmpDir = new Path(reduceId.toString());
    //获取用于比较key的比较器,必须是RawComparator 的子类,默认为null,可以通过mapreduce.job.output.key.comparator.class设置
    //如果没有指定比较器,那么会获取WritableComparable(可序列化可比较)实现的比较器
    //RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。
    final RawComparator<K> comparator =
      (RawComparator<K>)job.getOutputKeyComparator();

    //腾出内存所需的段
    List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
    long inMemToDiskBytes = 0;
    boolean mergePhaseFinished = false;
    if (inMemoryMapOutputs.size() > 0) {
      TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
      //这里会和ReduceTask中可用的最大内存限制做比较,把大于内存限制的部分数据放到磁盘上,内存中保存尽可能多的数据
      inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                memDiskSegments,
                                                maxInMemReduce);
      final int numMemDiskSegments = memDiskSegments.size();
      //ioSortFactor 是对文件进行排序时要同时合并的流的数量。这决定了打开的文件句柄的数量。
      //默认是10 可以通过mapreduce.task.io.sort.factor设置
      if (numMemDiskSegments > 0 &&
            ioSortFactor > onDiskMapOutputs.size()) {
        
        //如果我们到达这里,这意味着我们有少于io.sort.factor磁盘段,
        //并且这将增加1(内存段合并的结果)。由于这个总数仍然是<=io.sort.factor,
        //我们将不再进行任何中间合并,所有这些磁盘段的合并将直接提供给reduce方法
        
        mergePhaseFinished = true;
        //必须溢写到磁盘,但不能保留在mem中进行中间合并
        //创建一个本地reduce输入文件名,以.merged为后缀
        final Path outputPath = 
          mapOutputFile.getInputFileForWrite(mapId,
                                             inMemToDiskBytes).suffix(
                                                 Task.MERGED_OUTPUT_PREFIX);
        //开始合并,下面我们详细看下合并细节。(看第3.1步)
        final RawKeyValueIterator rIter = Merger.merge(job, fs,
            keyClass, valueClass, memDiskSegments, numMemDiskSegments,
            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
            mergePhase);

        //用CryptoOutputStream封装给定的FSDataOutputStream。
        //流所需的数据缓冲区大小由“mapreduce.job.encrypted-intermediate-data.buffer.kb”作业配置变量指定
        FSDataOutputStream out =
            IntermediateEncryptedStream.wrapIfNecessary(job,
                fs.create(outputPath), outputPath);
        Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
            codec, null, true);
        try {
          //合并输出到文件,默认合并10000条记录就向MR ApplicationMaster发送进度通知
          Merger.writeFile(rIter, writer, reporter, job);
          writer.close();
          onDiskMapOutputs.add(new CompressAwarePath(outputPath,
              writer.getRawLength(), writer.getCompressedLength()));
          writer = null;
          //添加到最终磁盘输出的列表中
        } catch (IOException e) {
          if (null != outputPath) {
            try {
              fs.delete(outputPath, true);
            } catch (IOException ie) {
              // NOTHING
            }
          }
          throw e;
        } finally {
          if (null != writer) {
            writer.close();
          }
        }
        LOG.info("Merged " + numMemDiskSegments + " segments, " +
                 inMemToDiskBytes + " bytes to disk to satisfy " +
                 "reduce memory limit");
        inMemToDiskBytes = 0;
        memDiskSegments.clear();
      } else if (inMemToDiskBytes != 0) {
        LOG.info("Keeping " + numMemDiskSegments + " segments, " +
                 inMemToDiskBytes + " bytes in memory for " +
                 "intermediate, on-disk merge");
      }
    }

    //磁盘上的段处理
    List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
    long onDiskBytes = inMemToDiskBytes;
    long rawBytes = inMemToDiskBytes;
    CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
        new CompressAwarePath[onDiskMapOutputs.size()]);
    for (CompressAwarePath file : onDisk) {
      long fileLength = fs.getFileStatus(file).getLen();
      onDiskBytes += fileLength;
      rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;

      LOG.debug("Disk file: " + file + " Length is " + fileLength);
      diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
                                         (file.toString().endsWith(
                                             Task.MERGED_OUTPUT_PREFIX) ?
                                          null : mergedMapOutputsCounter), file.getRawDataLength()
                                        ));
    }
    LOG.info("Merging " + onDisk.length + " files, " +
             onDiskBytes + " bytes from disk");
    Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
      public int compare(Segment<K, V> o1, Segment<K, V> o2) {
        if (o1.getLength() == o2.getLength()) {
          return 0;
        }
        return o1.getLength() < o2.getLength() ? -1 : 1;
      }
    });

    // build final list of segments from merged backed by disk + in-mem
    //从备份磁盘和内存构建最终的段数据列表
    List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
    long inMemBytes = createInMemorySegments(inMemoryMapOutputs, 
                                             finalSegments, 0);
    LOG.info("Merging " + finalSegments.size() + " segments, " +
             inMemBytes + " bytes from memory into reduce");
    if (0 != onDiskBytes) {
      final int numInMemSegments = memDiskSegments.size();
      diskSegments.addAll(0, memDiskSegments);
      memDiskSegments.clear();
      //只有当将要进行中间合并时,才通过mergePhase
      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
      RawKeyValueIterator diskMerge = Merger.merge(
          job, fs, keyClass, valueClass, codec, diskSegments,
          ioSortFactor, numInMemSegments, tmpDir, comparator,
          reporter, false, spilledRecordsCounter, null, thisPhase);
      diskSegments.clear();
      if (0 == finalSegments.size()) {
        return diskMerge;
      }
      finalSegments.add(new Segment<K,V>(
            new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
    }
    //开始合并(合并细节看第3.1步)
    return Merger.merge(job, fs, keyClass, valueClass,
                 finalSegments, finalSegments.size(), tmpDir,
                 comparator, reporter, spilledRecordsCounter, null,
                 null);
  
  }

3.1、Merger

Merger是Map和Reduce任务用于合并其内存和磁盘段的实用程序类

最终会调用Merger内部类MergeQueue中的merge()

RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
                                     int factor, int inMem, Path tmpDir,
                                     Counters.Counter readsCounter,
                                     Counters.Counter writesCounter,
                                     Progress mergePhase)
        throws IOException {
      LOG.info("Merging " + segments.size() + " sorted segments");

      /*
       * 如果内存中有段,则它们首先出现在段列表中,然后是已排序的磁盘段。
       * 否则(如果只有磁盘段),则如果段列表中有多个因子段,则它们是已排序的段。
       */
      int numSegments = segments.size();
      int origFactor = factor;
      int passNo = 1;
      if (mergePhase != null) {
        mergeProgress = mergePhase;
      }

      //计算要合并的输入字节的预期大小,将用于计算合并进度。
      //这模拟了上面的merge(),并试图获得所有合并中要合并的字节数(假设合并时没有调用合并器)
      long totalBytes = computeBytesInMerges(factor, inMem);
      if (totalBytes != 0) {
        progPerByte = 1.0f / (float)totalBytes;
      }
      
      //从构造函数中创建的排序map中创建MergeStreams,并将最终输出转储到文件中
      do {
        //获取此合并过程的因子。我们假设内存中的段是段列表中的第一个条目,并且传递因子不适用于它们
        factor = getPassFactor(factor, passNo, numSegments - inMem);
        if (1 == passNo) {
          factor += inMem;
        }
        List<Segment<K, V>> segmentsToMerge =
          new ArrayList<Segment<K, V>>();
        int segmentsConsidered = 0;
        int numSegmentsToConsider = factor;
        long startBytes = 0; //此合并的段的起始字节
        while (true) {
          //提取最小的“factor”段数对空段调用清理(无 key/value 数据)
          List<Segment<K, V>> mStream = 
            getSegmentDescriptors(numSegmentsToConsider);
          for (Segment<K, V> segment : mStream) {
            //在最后可能的时刻初始化段;这有助于确保我们在需要缓冲区之前不会使用它们
            segment.init(readsCounter);
            long startPos = segment.getReader().bytesRead;
            boolean hasNext = segment.nextRawKey();
            long endPos = segment.getReader().bytesRead;
            
            if (hasNext) {
              startBytes += endPos - startPos;
              segmentsToMerge.add(segment);
              segmentsConsidered++;
            }
            else {
              segment.close();
              numSegments--; //我们忽略该段进行合并
            }
          }
          //如果我们有所需数量的分段,或者查看所有可用的分段,我们就会中断
          if (segmentsConsidered == factor || 
              segments.size() == 0) {
            break;
          }
            
          numSegmentsToConsider = factor - segmentsConsidered;
        }
        
        //将流馈送到优先级队列
        initialize(segmentsToMerge.size());
        clear();
        for (Segment<K, V> segment : segmentsToMerge) {
          put(segment);
        }
        
        //如果剩余的段数较少,则只返回迭代器,否则执行另一个单级合并
        if (numSegments <= factor) {
          if (!includeFinalMerge) { //用于reduce任务

            //重置totalBytesProcessed并重新计算剩余段的totalBytes,
            //以跟踪最终合并的进度。最终合并被视为reduce阶段的进展,即reduce任务的第三阶段。
            totalBytesProcessed = 0;
            totalBytes = 0;
            for (int i = 0; i < segmentsToMerge.size(); i++) {
              totalBytes += segmentsToMerge.get(i).getRawDataLength();
            }
          }
          if (totalBytes != 0) //偏执
            progPerByte = 1.0f / (float)totalBytes;
          
          totalBytesProcessed += startBytes;         
          if (totalBytes != 0)
            mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
          else
            mergeProgress.set(1.0f); //最后一次传输,没有剩余的分段-我们就完成了
          
          LOG.info("Down to the last merge-pass, with " + numSegments + 
                   " segments left of total size: " +
                   (totalBytes - totalBytesProcessed) + " bytes");
          return this;
        } else {
          LOG.info("Merging " + segmentsToMerge.size() + 
                   " intermediate segments out of a total of " + 
                   (segments.size()+segmentsToMerge.size()));
          
          long bytesProcessedInPrevMerges = totalBytesProcessed;
          totalBytesProcessed += startBytes;

          //如果在空间限制下可用,我们希望在多个磁盘上分散创建临时文件
          long approxOutputSize = 0; 
          for (Segment<K, V> s : segmentsToMerge) {
            approxOutputSize += s.getLength() + 
                                ChecksumFileSystem.getApproxChkSumLength(
                                s.getLength());
          }
          Path tmpFilename = 
            new Path(tmpDir, "intermediate").suffix("." + passNo);

          Path outputFile =  lDirAlloc.getLocalPathForWrite(
                                              tmpFilename.toString(),
                                              approxOutputSize, conf);

          FSDataOutputStream out = fs.create(outputFile);
          out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
              outputFile);
          Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
              codec, writesCounter, true);
          writeFile(this, writer, reporter, conf);
          writer.close();
          
          //我们完成了一次单级合并;现在清理优先级队列
          this.close();

          //将新创建的分段添加到要合并的分段列表中
          Segment<K, V> tempSegment = 
            new Segment<K, V>(conf, fs, outputFile, codec, false);

          //在排序列表中插入新的合并段
          int pos = Collections.binarySearch(segments, tempSegment,
                                             segmentComparator);
          if (pos < 0) {
            //二进制搜索失败。所以要插入的位置是pos-1
            pos = -pos-1;
          }
          segments.add(pos, tempSegment);
          numSegments = segments.size();
          
          //从totalBytes中减去新段的预期大小和实际大小之间的差值(新段的预计大小为inputBytesOfThisMerge)。
          //若合并中未调用合并器,则预期大小和实际大小将匹配(几乎)
          long inputBytesOfThisMerge = totalBytesProcessed -
                                       bytesProcessedInPrevMerges;
          totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
          if (totalBytes != 0) {
            progPerByte = 1.0f / (float)totalBytes;
          }
          
          passNo++;
        }
        //我们只担心第一次通过合并因子。因此,将系数重置为原来的值
        factor = origFactor;
      } while(true);
    }

4、WordCount中的reduce

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    //key为从map输出收集的去重后的key,value为按照key进行分组合并排序后的value迭代器
    //当处理当下key时,会对下一个key的value按照reduce上下文传入的比较器进行排序
    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      //默认调用TextOutputFormat.write()进行输出
      context.write(key, result);
    }
  }

5、TextOutputFormat

    public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      //换行符“n”
      out.write(NEWLINE);
    }

五、总结

1、初始化:比如构建作业和尝试任务的上下文、更新任务状态,构建输出提交器等

2、Shuffle:根据本地模式和集群模式生成不同的线程(Fetcher)组来收集map端的输出

3、Sort:对Shuffle的结果进行排序合并

4、SecondarySort:对相同key的value进行二次排序

5、构建自定义reducer、RecordWriter、reduce的上下文

6、运行用户自定义的Reduce

7、无序输出结果,一个reduce输出一份结果

 

 

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>