Hadoop实战——MapReduce实现主播的播放量等数据的统计及TopN排序(第二篇)

本次实战项目一共分三篇教学(第三篇下周更新)

第一篇:对主播文本数据的清洗,从大量数据中获取我们所需要的数据(如播放量,时长等)

第二篇:对清洗后的数据进行统计求和处理操作,按照主播id号依次整齐显示

第三篇:对统计好的数据进行TopN展示的操作,排序规则可自定义(如播放量,粉丝数量),N的大小也可以自定义

所有技术、实战教学文章总目录点击此处链接


目录

前言:

一、流程简介

 二、创建Maven工程项目

 (1)新建maven

 (2)添加依赖

三、编写MapReduce程序

(1)自定义数据类

(2)Mapper类

(3)Reduce类

(4)主类(入口类)

四、编译打包jar上传

五、拷贝数据集

 六、执行Jar包程序

 Gitee仓库Hadoop项目下载地址

其他系列技术教学、实战开发


前言:

在第一篇教学我们已经将主播的数据进行了初步的提取,但是数据看起来杂乱无章,这篇文章教大家如何对提取出来的数据进一步处理

一、流程简介

第一篇提取好的数据如下:

 通过文本数据我们可以发现一个主播的id号有多条数据记录,那么我们要做的就是要把这些相同的id将他对应数据进行累计求和处理操作,同时对id进行一个升序操作,看起来更加整洁

这次涉及到统计求和、排序等操作,用到了Reduce,整体项目流程如下:

 二、创建Maven工程项目

 (1)新建maven

打开IDEA,新建一个项目,,在左侧一栏选择maven工程,点击下一步

 添加项目名称,点击完成

 创建之后,右下角会弹出提示,选择Auto自动导入依赖(没有也没关系,待会添加依赖也会弹出)

 (2)添加依赖

展开项目目录,编辑pom.xml配置文件,添加如下依赖

<dependencies>
        <!-- Hadoop所需对应依赖包 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.1</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- 编译打包项目的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

 项目创建完毕

三、编写MapReduce程序

如果对Map和Reduce对应阶段的任务和处理结果不熟悉的可以看我之前那一篇 单词统计排序教程,有详细介绍

(1)自定义数据类

该类的作用:方便统计主播的指标数据,需要把这些字段整合到一个对象中,以便日后维护

新建一个VideoInfoWritable类继承Writable类,设置四个属性,自己补上get()和set()方法

public class VideoInfoWritable extends Writable {
    private long gold; //金币
    private long watchnumpv; //观看时长
    private long follower; //粉丝数量
    private long length; //总播放时长
}

复写Writable类的两个方法,一个readFileds输入数据,一个write输出数据

@Override
public void readFields(DataInput dataInput) throws IOException {
    this.gold = dataInput.readLong();
    this.watchnumpv = dataInput.readLong();
    this.follower = dataInput.readLong();
    this.length = dataInput.readLong();
}

@Override
public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(gold);
    dataOutput.writeLong(watchnumpv);
    dataOutput.writeLong(follower);
    dataOutput.writeLong(length);
}

(2)Mapper类

该类的作用:对数据进行按行读取,切割获取对应字段数据,封装字段数据到(1)自定义的类对象中

新建一个videoInfoMap类继承Mapper类,复写mapper方法

public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {
    @Override
    public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        String line = v1.toString(); //读取清先之后的每一行数据
        String[] fields = line.split("t"); //通过"t"对数据进行切割
        String id = fields[0]; //获取主播的id
        /**
         * 获取主播的其他数据
         * gold-------->>金币
         * watchnumpv-->>播放量
         * follower---->>粉丝
         * length------>>开播时长
         */
        long gold = Long.parseLong(fields[1]);
        long watchnumpv = Long.parseLong(fields[2]);
        long follower = Long.parseLong(fields[3]);
        long length = Long.parseLong(fields[4]);
        Text k2 = new Text();
        k2.set(id);
        // 封装到自定义的VideoInfoWritable类对象中
        VideoInfoWritable v2 = new VideoInfoWritable();
        v2.set(gold, watchnumpv, follower, length);
        context.write(k2, v2);
    }
}

(3)Reduce类

该类的作用:遍历具有相同主播id的VideoInfoWritable类对象,获取对应的四个字段值,进行求和

新建一个videoInfoReduce类,继承Reduce类,复写reduce方法

public class VideoInfoReduce extends Reducer<Text,VideoInfoWritable, Text,VideoInfoWritable> {
    @Override
    protected void reduce(Text k2,Iterable<VideoInfoWritable> v2s,Context context) throws IOException,InterruptedException{
        // 从v2s中把相同的k2的value取出来,进行遍历,进行累加求和。
        long goldsum=0;
        long watchnumpvsum=0;
        long followersum=0;
        long lengthsum=0;
        /**
         * v2s:相同主播id的对应对象集合(VideoInfoWritable类对象,有四个属性)
         * 遍历具有相同id的对象,获取对应四个字段值,进行求和
         */
        for(VideoInfoWritable v2:v2s){
            goldsum+=v2.getGold();
            watchnumpvsum+=v2.getWatchnumpv();
            followersum+=v2.getFollower();
            lengthsum+=v2.getLength();
        }
        // 将求和统计好的封装进来,写入context中,交由Job主类打印输出
        Text k3=k2;
        VideoInfoWritable v3=new VideoInfoWritable();
        v3.set(goldsum,watchnumpvsum,followersum,lengthsum);
        context.write(k3,v3);
    }
}

(4)主类(入口类)

这个没什么好说的了,运行jar包程序的入口类,对应代码注释我也标上了

public class VideoInfoJob {
    public static void main(String[] args) {
        try {
            // 运行jar包程序指令输入错误,直接退出程序
            if (args.length != 2) {
                System.exit(100);
            }
            Configuration conf = new Configuration();//job需要的配置参数
            Job job = Job.getInstance(conf);//创建一个job作业
            job.setJarByClass(VideoInfoJob.class);//设置入口类
            FileInputFormat.setInputPaths(job, new Path(args[0]));//指定输入路径(可以是文件,也可以是目录)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));//指定输出路径(只能是指定一个不存在的目录)
            // 指定Mapper阶段的相关类
            job.setMapperClass(VideoInfoMap.class);
            // 指定K2的输出数据类型
            job.setMapOutputKeyClass(Text.class);
            // 指定v2的输出数据类型
            job.setMapOutputValueClass(VideoInfoWritable.class);
            // 指定Reduce阶段的相关类
            job.setReducerClass(VideoInfoReduce.class);
            // 指定K3的输出数据类型
            job.setOutputKeyClass(Text.class);
            // 指定V3的输出数据类型
            job.setOutputValueClass(VideoInfoWritable.class);
            //提交作业job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

四、编译打包jar上传

点击右侧的maven,展开Lifecycle,双击clean清理一下,再双击package生成jar包

 运行结束,在项目目录会生成一个target文件夹,展开可以看到有一个jar包,右键复制jar包并且通过winscp连接虚拟机拷贝到虚拟机里 (不会的看第一篇)

我已经成功拷贝到虚拟机了

五、拷贝数据集

在第一篇文章里,我们已经初步处理好了数据,并且把结果数据集输出到了HDFS文件系统,现在我们将这个结果集拷贝到data目录,并且重新命名

输入指令,将part-m-00000文件拷贝到/zhubo/data/文件夹下,并命名为zhuboClean.log

hadoop fs -cp /zhubo/resultClean/part-m-00000 /zhubo/data/zhuboClean.log

 六、执行Jar包程序

最后我们执行jar包运行一下

zhuboCensus-1.0.jar 要执行的jar包名称
VideoInfoJob  主类(入口类)类名
/zhubo/data/zhuboClean.log 输入路径(数据所在位置)
/zhubo/resultCensus/ 输出路径(结果输出路径,要为不存在的文件夹)
hadoop jar zhuboCensus-1.0.jar VideoInfoJob /zhubo/data/zhuboClean.log /zhubo/resultCensus/

 刷新浏览器,将输出的结果数据集下载下来

以记事本方式打开,可以看到已经统计完毕了,并且自动按照id依次显示

 Gitee仓库Hadoop项目下载地址

Gitee仓库地址(Hadoop实战项目源码集合)

Hadoop实战项目源码集合: https://blog.csdn.net/weixin_47971206CSDN文章教学中的源码汇总集合

其他系列技术教学、实战开发

各大技术基础教学、实战开发教学(最新更新时间2021-11-28)_Bald programmer的博客-CSDN博客

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>