Mapreduce排序器对象获取规则源码分析

写在前面

本篇博客背景

​ 在使用Mapreduce程序时,不难看出Map阶段的输出键值对其实是默认按照字典排序的,而我们在使用该程序的时候也会因为需要满足某种需求自定义排序规则。当前我们有两种实现自定义排序的方式:


两种实现自定义排序的方式

  1. 直接让参与排序的对象实现WritableComparable接口,并在接口中实现compareTo方法,当运行的时候hadoop会自动帮助我们生成WritableComparator对象。

    具体可见下例:

    phoneBean.java

    package com.yxp.complication.exercise.sorttest;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author 尤小鹏
     * 切忌一味模仿!
     * 2021  11 14  17:50
     * description:
     */
    public class PhoneBean implements WritableComparable<PhoneBean> {
    
        public int upFlow;
        public int downFlow;
        public int sumFlow;
    
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
    
        public int getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow=upFlow+downFlow;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
    
           this.upFlow = in.readInt();
           this.downFlow=in.readInt();
            this.sumFlow=in.readInt();
        }
    
        @Override
        public String toString() {
            return upFlow + "t" + downFlow + "t" + sumFlow ;
        }
    
        //降序
        @Override
        public int compareTo(PhoneBean o) {
            return -(this.sumFlow-o.sumFlow);
        }
    }
    
    
  2. 自定义一个比较器对象,继承WritableComparator类并重写compare方法,在该方法中定义比较规则,注意在自定义的比较器对象中通过调用父类的super方法将自定义的比较器对象和要参与比较的对象关联。最后在Driver类中设置该比较器类为作业的比较器类。

具体可见下例:

PhoneBeanComparator.java

package com.yxp.complication.exercise.sorttest;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @author 尤小鹏
 * 切忌一味模仿!
 * 2021  11 15  8:56
 * description:
 */
public class PhoneBeanComparator extends WritableComparator {

    public PhoneBeanComparator() {
        super(PhoneBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        PhoneBean abean = (PhoneBean) a;
        abean.setSumFlow();

        PhoneBean bbean = (PhoneBean) b;
        bbean.setSumFlow();
        return -(abean.sumFlow - bbean.sumFlow);
    }
}


我的疑问

疑问便是第一种实现方式是如何得到排序器对象从而排序的,因为它根本没有实例化啊!虽然结论是Hadoop帮他构造的,但是只知道这个未免有些粗浅了,因此我们要找到源头!

探索底层规则,记得找源码,看源码,理解源码。


思路展开

我们都知道使用job对象可以设置提交作业的各种参数,如:Mapper类、Reducer类、Map端的输出键值对类型,程序的输出键值对类型……由此自然而然的联想到程序的比较器是不是也可以在这里设置呢?

寻找过程

getSortComparator()–>

我尝试着使用job.comparator然后聪明的IEDA就为我找到了下面的这个方法。

image-20211115111247356

ctrl+鼠标左键进入该方法,方法体如下:

 /**
   * Get the {@link RawComparator} comparator used to compare keys.
   * 
   * @return the {@link RawComparator} comparator used to compare keys.
   */
  public RawComparator<?> getSortComparator() {
    return conf.getOutputKeyComparator();
  }

第二行描述足够明显,于是点进7行中的getOutputKeyComparator()方法——方法体如下:

/**
   * Get the {@link RawComparator} comparator used to compare keys.
   * 
   * @return the {@link RawComparator} comparator used to compare keys.
   */
  public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

在这里逻辑已经相对具体了,我们把聚焦在getClass()的三个参数上。

第一个:JobContext.KEY_COMPARATOR,关于这个变量的定义是这样的:

public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";

第二个:null,作为第一个变量的默认值。

第三个:RawComparator.class,直接对对象的字节表示进行操作的Comparator 。

它出现在这里是什么意思呢?或者说:

getClass()的三个参数代表什么意义呢?还是要走进去看一看。

   * @param name the class name. (类名)
   * @param defaultValue default value.(默认值)
   * @param xface the interface implemented by the named class.(由类名实现的接口)
   * @return property value as a <code>Class</code>, 
   *         or <code>defaultValue</code>.
   */
  public <U> Class<? extends U> getClass(String name, 
                                         Class<? extends U> defaultValue, 
                                         Class<U> xface) 

所以我们可以理解了,getClass()的功能是获取比较器的类,如果用户设置了就使用设置的类,没有设置的话值为null。

那么下面的代码看起来也相对好理解一些了。

1-2行:如果不为null(设置了比较器的类),就使用反射的工具类,将这个类实例化。

3行:返回WritableComparator的get()方法的返回值。这是建立在上面的判断不成立的情况,也就是说如果我们没有设置比较器类就会调用该语句,我仿佛接近真相了。

该方法的两个参数:

  1. getMapOutputKeyClass().asSubclass(WritableComparable.class)作用是

    获取Map的输出key的类,将其作为WritableComparable类的子类

  2. this表示是该作业的conf对象。

 if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);

走进get方法体——>

 /** Get a comparator for a {@link WritableComparable} implementation. */
  public static WritableComparator get(
      Class<? extends WritableComparable> c, Configuration conf) {
      
      //从比较器集合(comparators)中获取参数对应的比较器comparator,但是当前比较的类型得是Hadoop自带的数据类型,否则是拿不到的。
      
    WritableComparator comparator = comparators.get(c);
      
      //两个判断是否为null,第一个判断式为了防止一些极端情况可能会发生GCC回收机制导致比较器被回收
      
    if (comparator == null) {
        
        //如果被回收了,强制初始化
        
      forceInit(c);
      
        //再获取一次构造器
        
      comparator = comparators.get(c);
      
        //如果还是没有,说明排序的对象不是hadoop自带的数据类型,比如我们自己建的对象。
        
      if (comparator == null) {
      
          //为它创造一个WritableComparator对象实例,使第二种实现方式有了等同第一种实现方式的效果。
          
        comparator = new WritableComparator(c, conf, true);
      
      }
        
    }
      //为该构造器对象传入方法里传入的conf对象。
      
    ReflectionUtils.setConf(comparator, conf);
    return comparator;
  }

总结

Mapreduce程序在对Key排序时,

如果已经给Job对象设置好了比较器的类,会为它创建实例并按照其中的排序规则对Key进行排序,此为第二种自定义排序实现方式的逻辑。

否则,会从Hadoop自带的数据类型对应的比较器集合中寻找当前需要比较对象的比较器,

​	如果没有找到,为了防止极端情况发生导致原本该存在的比较器被回收,会进行一次强制初始化,然后再从集合中寻找它。

​		这一次如果再找不到,会判断该对象不是Hadoop自带的数据类型,如果需要比较的类实现了WritableComparable接口,则为它创造一个WritableComparator对象实例,使第二种实现方式有了等同第一种实现方式的效果。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇

)">
下一篇>>