为了了解Mapreduce切片机制,我开始了第一次源码探索……
说在前面
今天的视频在讲解源码,不出意外的我又在迷糊中听了大半,老师在后面摇了好几下我才醒,然后就被安排了一个伟大而艰巨的任务——学会自己看源码……
怎么办呢?那就看嘛!在看之前我还不忘百度搜索一下看源码的好处。下方为知乎某大佬原话
“我为什么读源码”
很多人一定和我一样的感受:源码在工作中有用吗?用处大吗?很长一段时间内我也有这样的疑问,认为哪些有事没事扯源码的人就是在装,只是为了提高他们的逼格而已。
那为什么我还要读源码呢?一刚开始为了面试,后来为了解决工作中的问题,再后来就是个人喜好了。说的好听点是有匠人精神;说的委婉点是好奇(底层是怎么实现的);说的不自信点是对黑盒的东西我用的没底,怕用错;说的简单直白点是提升自我价值,为了更高的薪资待遇(这里对真正的技术迷说声抱歉)。
源码中我们可以学到很多东西,学习别人高效的代码书写、学习别人对设计模式的熟练使用、学习别人对整个架构的布局,等等。如果你还能找出其中的不足,那么恭喜你,你要飞升了!会使用固然重要,但知道为什么这么使用同样重要。从模仿中学习,从模仿中创新。
读源码不像围城(外面的人想进来,里面的人想出去),它是外面的人不想进来,里面的人不想出去;当我们跨进城内,你会发现(还是城外好,皮!)城内风光无限,源码的海洋任我们遨游!
走!看源码!看源码!
人话模式
1.探索前的热身
一个结论:
FileInputFormat 是InputFormat的子实现类,实现切片逻辑
TextInputFormat是FileInputFormat的子实现类,实现读取数据逻辑
先探索一下TextInputFormat怎么就实现了读取数据的逻辑吧!
故事得从提交作业开始说起……
脑海里对提交作业后有个大概的思路:提交job–>然后……->mapreduce。中间省略若干步骤(为什么不写大家懂得都懂)
在提交job的方法文档注释上我发现了以下珍贵文案,于是三年没学英语的我站出来了!
/**
* Internal method for submitting jobs to the system.
译:系统提交文件的内部方法。
*
* <p>The job submission process involves:
译:作业提交过程涉及到:
* <ol>
* <li>
* Checking the input and output specifications of the job.
译:检查作业的输入和输出的规格。
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
译:计算作业的输入切片。
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
译:如果必要的话,为作业的分布式缓存设置必要的计算信息。
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system
* directory on the distributed file-system.
译:拷贝作业的jar包和配置到hdfs的MapReduce系统的目录
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
译:提交作业到jobTracker(大概是作业追踪器)然后可选地追踪它的状态。
所以!大概思路就是上面的译文了!
总结一下:
-
作业提交过程涉及到: 1. **检查作业的输入和输出的规格。** 2. **计算作业的输入切片。** 3. **如果必要的话,为作业的分布式缓存设置必要的计算信息。** 4. **拷贝作业的jar包和配置到hdfs的MapReduce系统的目录** 5. **提交作业到jobTracker(大概是作业追踪器)然后可选地追踪它的状态。**
让我们把今天的目光聚焦到切片机制。我们往下找啊找….终于找到了写切片的地方啦,进入writeSplits方法。
方法体如下:
writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
你还别说,确实…看不大懂。不过我们看第6、7行还是没问题的,这个逻辑判断就是看你使用的Mapper是旧版的还是新版的,好,我们走进writeNewSplits……
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
代码依旧那么复杂呀,不过我还记得第一步是检查输入文件的规格,因此我们要看看它是怎么检查的。于是走进第六行job.getInputFormatClass方法。
显然这是个接口方法,我们要做的是找到它具体的实现逻辑,因此点击左边框内的小绿点找它的具体实现类,如下:
我们知道现在提交的job与map和reduce无关,而最有可能的是最JobContext的实现方法,因此点进去
/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
注意力只要放在第十行代码那里,INPUT_FORMAT_CLASS_ATTR的值为TextInputFormat.class,所以,当我们不指定输入文件规格的时候呢,默认的规格是文本输入规格,这也就是为什么我们在对文件做Mapreduce的时候需要指定输入和输出文件规格了!
这也佐证了为什么TextInputFormat负责读取数据的逻辑了!
看似没用, 其实锻炼了我们探索的能力!
2.探索ing
探索1的结论还记得吗?
FileInputFormat 是InputFormat的子实现类,实现切片逻辑。
TextInputFormat是FileInputFormat的子实现类,实现读取数据逻辑。
怎么样通过抽象方法找到它的实现呢?这就用到啦!
教你们个小技巧,首先找到InputFormat这个抽象类,我们通过(ALT+7)类结构惊奇的发现:
它定义了两个抽象方法恰恰是切片(getSplits)和读取数据(createRecordReader)。我们目的专一。
点击左框绿点,找到它的实现逻辑—-FileInputFormat类
准备好了吗?准备好了!
各单位注意!!!源码来啦!!!
现在是2021年11月12日21:37:56,会是什么时候搞完睡觉呢?
源码
各单位注意,中文注释是小鹏自己理解来写的,其他都是源码作者写的。
public List<InputSplit> getSplits(JobContext job) throws IOException {
//StopWtch是测时单位,可以以纳秒为单位,这里开始计时,到结尾我们可以观察到有对应的stop方法,用作统计该切分使用的时间,不用关注
StopWatch sw = new StopWatch().start();
/*将两个变量较大值作为切片的最小值,前者是固定值1,后者可以配置,如果没有配置的话使用默认值,可以在mapred-site.xml的"mapreduce.input.fileinputformat.split.minsize"
查看到,默认值为0。
*/
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
//切片最大值默认为Long类型的最大边界值2**63-1,也可以通过"mapreduce.input.fileinputformat.split.maxsize"设置。
long maxSize = getMaxSplitSize(job);
// generate splits
//新建一个InputSplit类型的集合用来存放所有切片对象,也将splits作为本方法的返回值
List<InputSplit> splits = new ArrayList<InputSplit>();
//通过job提供的输入路径拿到当前目录下所有文件的详情
List<FileStatus> files = listStatus(job);
//顾名思义,忽略文件夹
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
//遍历列表中文件的详情
for (FileStatus file: files) {
//如果是目录或者是忽略文件夹则跳过
if (ignoreDirs && file.isDirectory()) {
continue;
}
//获取文件路径
Path path = file.getPath();
//获取文件的内容大小
long length = file.getLen();
//如果不是空文件
if (length != 0) {
//BlockLocation[]包括块的网络位置、包含块副本的主机的信息以及其他块元数据(例如与块关联的文件偏移量、长度、是否损坏等)
BlockLocation[] blkLocations;
//判断instanceof左边显式声明的类型与右边操作元是否是同种类或存在继承关系
//如果是本地文件
if (file instanceof LocatedFileStatus) {
//获取块位置信息
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
//否则是集群文件
FileSystem fs = path.getFileSystem(job.getConfiguration());
//获取块位置信息
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//是否可切片,通常是正确的,但如果文件是流压缩的,则不会。
if (isSplitable(job, path)) {
//获取数据块大小
long blockSize = file.getBlockSize();
//计算切片的大小-->一般情况下永远都是块大小 128M
//方法内部通过 默认的最大切片值和数据块大小的较小值 与最小切片大小 比较的较大值作为切片大小
/*方法如下
*protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
*/
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//将文件的剩余大小赋值
long bytesRemaining = length;
//判断当前文件的剩余内容是否要继续切片。公式:bytesRemaining)/splitSize > SPLIT_SLOP
//解读:文件剩余大小/切片的大小 是否> split_slop(值为1.1) 如果除出来的值>1.1就会继续切,否则就将剩余内容作为一个片处理。目的是为了让每一个MapTask处理的数据更加的均衡。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
//获取需要切分的块在文件中的偏移量,从此处开始切
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
//制作一个切片对象并加入到存放切片对象的数组中
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
//文件剩余内容减去该切片的大小
bytesRemaining -= splitSize;
}
//这里的逻辑判断是建立在文件剩余大小/切片的大小 <= split_slop(值为1.1) 的情况,也就是说只剩下最后一点点了,这个时候再创建一个片并添加到切片对象的数组中本次切片就圆满完成了
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}
//这里的else对应的是上访的是否可以切分,不能的话就会开始日志或者debug来袭。。
else { // not splitable
if (LOG.isDebugEnabled()) {
// Log only if the file is big enough to be splitted
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
//这里是在文件长度不为0但不能切分的条件下执行的只创建一个切片对象(因为不能切分说明该文件很小)
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
}
//这里对应的是文件长度为0的情况
else {
//Create empty hosts array for zero length files
//创建一个空的对象数组
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
//保存输入文件的数量作为加载源
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
//停止计时,并打印切片数目和花费时间
//返回切分后的对象
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
说在后面
现在的时间是2021年11月12日23:30:42。
第一次认真的探索源码,我有点喜欢上了这种探索的感觉,一定会有下一次,下下次以及很多次的。