基于网球体育数据的MapReduce实现案例

一、环境准备

编辑器:vscode

JDK版本:JDK1.8

项目管理器:maven

二、项目结构以及坐标依赖

项目结构:


在这里插入图片描述

坐标依赖:

	<dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.5</version>
        </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.7.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.7.5</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-yarn-common</artifactId>
      <version>2.7.5</version>
    </dependency>

三、数据结构

数据下载地址:

链接:https://pan.baidu.com/s/10h_3TL27zYO0_WRTTfn2CQ
提取码:8888

数据预览:
在这里插入图片描述
去掉首行索引保存为.csv文件,上传到hadoop存储即可,存储路径自定义,只需后续在代码中修改即可。

四、项目代码

①MapReduce作业一

目标:job_counter: 统计每个球员的总的ACE数据

文件结构:


在这里插入图片描述

map_ace.java代码:

package job_counter;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class map_ace extends Mapper<LongWritable,Text,Text,LongWritable> {
    /*
    输入的:k1 v1
                k1                           v1
longwriteable    行号                      text "1,2014,1,Thanasi Kokkinakis,Igor Sijsling,1,FirstRound,1,1,570"

    输出的:list(k2 v2)
                k2                            v2
    text        name           longwriteable ace
                A                              8
                A                              5
                B                              3
                
    */



    //map方法统计ACE球的个数
    /*
    输入(行,行值)
    输出(名字,ACE个数)
    */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //1.每行文本拆分,并获取值
        String[] line=value.toString().split(",");
        String name=line[3];
        String ace=line[15];
        //2.写入上下文
        Text text=new Text();
        LongWritable longWritable=new LongWritable();
        text.set(name);
        longWritable.set(Long.parseLong(ace));
        context.write(text,longWritable);
    }
}

reduce_ace.java代码:

package job_counter;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reduce_ace extends Reducer<Text, LongWritable, Text, LongWritable> {
    //reduce方法统计总和
    /*  meger list(k2,v2)====>k2 list(v2)===A <8,5>
        输入的: k2    v2
                A       <8,5>
                B       <10,7>
    输出的:  k3    v3
            name    ace_totle
    */

    @Override
    protected void reduce(Text key,Iterable<LongWritable> values,Context context) throws IOException, InterruptedException {
        //遍历集合,将每个值累加
        long point=0;
        for (LongWritable value : values) {
            point=point+value.get();
            
        }
        //将k3,v3写入上下文
        context.write(key, new LongWritable(point));
    }
}

main_ace.java代码:

package job_counter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class main_ace extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        //1.创建一个job任务对象
        Configuration conf=super.getConf();
        Job job=Job.getInstance(conf,"main_ace");
        //打包运行时函数
        job.setJarByClass(main_ace.class);
        //2.配置job对象
        //第一步:指定文件的读取方式和路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.96.138:9000/user/hadoop/job_data"));
        
        //第二步,指定map阶段的处理方式
        job.setMapperClass(map_ace.class);
        //设置map阶段k2类型
        job.setMapOutputKeyClass(Text.class);
        //设置map阶段v2类型
        job.setMapOutputValueClass(LongWritable.class);
        //shuffe使用默认
        /*
        k1,v1 ===>k2,v2 ===>k3,v3
        meger list(k2,v2)====>k2 list(v2)===A <8,5>
        */
        //第七步指定reduce阶段的处理方式和数据类型
        job.setReducerClass(reduce_ace.class);
        //k3类型
        job.setOutputKeyClass(Text.class);
        //v3类型
        job.setOutputValueClass(LongWritable.class);


         //判断文件路径是否存在
         Path output= new Path("hdfs://192.168.96.138:9000/user/hadoop/counter_out_ace");
         FileSystem fs = FileSystem.get(conf);
         if (fs.exists(output)) {
             fs.delete(output, true);
         }
 
        //设置输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, output);

        //等待任务结束
        boolean b1=job.waitForCompletion(true);

        return b1 ? 0:1;
    }
    public static void main(String[] args) throws Exception {
        Configuration configuration=new Configuration();
        //start job
        int run=ToolRunner.run(configuration, new main_ace(), args);
        System.exit(run);
    }
}

②MapReduce作业二
目标: job_class: 对%DF的球员根据四分位数分成四组
文件结构:
在这里插入图片描述
map_class.java代码:

package job_class;


import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class map_class extends Mapper<LongWritable, Text, Text, NullWritable> {
    ///对每个运动员进行四分位分类操作
    /*
    输入(行,行值)
    输出(行值,null)
    */
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //写入上下文
        context.write(value,NullWritable.get());
    }
}

part_class.java代码:

package job_class;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class part_class extends Partitioner<Text,NullWritable>{
    /*
    1:定义分区
    2:返回对应的分区编号
    */
    /*四分位数
    25	0.111111111111110
	50	0.181818181818182
	75	0.300000000000000

    */
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        // TODO Auto-generated method stub
        //1,:拆分文本,获得字段
        String[] split=text.toString().split(",");
        String df=split[18];//%DF的坐标
        double low = 0.111111111111110;
        double zhong=0.181818181818182;
        double hig=0.300000000000000;
        //判断df与四分位数的关系
        if(Double.parseDouble(df)<low){//小于25%
            return 3;
        }
        else if(Double.parseDouble(df)>low & Double.parseDouble(df)<=zhong){//大于25%小于50%
            return 2; 
        }
        else if(Double.parseDouble(df)>zhong & Double.parseDouble(df)<=hig){//大于50%小于75%
            return 1;
        }
        else{//大于75%
            return 0;
        }
    }
}

reduce_class.java代码:

package job_class;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class reduce_class extends Reducer<Text, NullWritable, Text, NullWritable> {
    //reduce方法不做处理,直接输出
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }  
}

main_class.java代码:

package job_class;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class main_class extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        // 1.创建一个job任务对象
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "main_class");
        // 打包运行时函数
        job.setJarByClass(main_class.class);
        // 2.配置job对象
        // 第一步:指定文件的读取方式和路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://192.168.96.138:9000/user/hadoop/job_data"));

        // 第二步,指定map阶段的处理方式
        job.setMapperClass(map_class.class);
        // 设置map阶段k2类型
        job.setMapOutputKeyClass(Text.class);
        // 设置map阶段v2类型
        job.setMapOutputValueClass(NullWritable.class);
        // shuffe使用part_class
        job.setPartitionerClass(part_class.class);
        //设置4个任务
        job.setNumReduceTasks(4);
        /*
         * k1,v1 ===>k2,v2 ===>k3,v3
         */
        // 第七步指定reduce阶段的处理方式和数据类型
        job.setReducerClass(reduce_class.class);
        // k3类型
        job.setOutputKeyClass(Text.class);
        // v3类型
        job.setOutputValueClass(NullWritable.class);

        // 判断文件路径是否存在
        Path output = new Path("hdfs://192.168.96.138:9000/user/hadoop/class_out_%DF");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(output)) {
            fs.delete(output, true);
        }

        // 设置输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, output);

        // 等待任务结束
        boolean b1 = job.waitForCompletion(true);

        return b1 ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        // start job
        int run = ToolRunner.run(configuration, new main_class(), args);
        System.exit(run);
    }
}

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
 

)">
下一篇>>