【Hadoop】【MapReduce】实现实例:FriendRecommendation


MapReduce
author:zoxiii


参考地址

1、新建Java工程

1.1、工程名为:Friend,点击Finish

在这里插入图片描述

1.2、为Java工程配置HadoopJar包路径

参考博客的第【2】步

2、新建Java class

在这里插入图片描述
源码地址:代码如下(有很小的修改)

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 * User: dbtsai
 * Date: 1/16/13
 * Time: 5:32 PM
 */

public class FriendRecommendation {
	 //Friend
    static public class FriendCountWritable implements Writable {
        public Long user;
        public Long mutualFriend;

        public FriendCountWritable(Long user, Long mutualFriend) {
            this.user = user;
            this.mutualFriend = mutualFriend;
        }

        public FriendCountWritable() {
            this(-1L, -1L);
        }

        public void write(DataOutput out) throws IOException {
            out.writeLong(user);
            out.writeLong(mutualFriend);
        }

        public void readFields(DataInput in) throws IOException {
            user = in.readLong();
            mutualFriend = in.readLong();
        }

        @Override
        public String toString() {
            return " toUser: "
                    + Long.toString(user) + " mutualFriend: " + Long.toString(mutualFriend);
        }
    }
    //Map
    public static class Map extends Mapper<LongWritable, Text, LongWritable, FriendCountWritable> {
        @SuppressWarnings("unused")
		private Text word = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line[] = value.toString().split("t");
            Long fromUser = Long.parseLong(line[0]);
            List<Long> toUsers = new ArrayList<Long>();

            if (line.length == 2) {
                StringTokenizer tokenizer = new StringTokenizer(line[1], ",");
                while (tokenizer.hasMoreTokens()) {
                    Long toUser = Long.parseLong(tokenizer.nextToken());
                    toUsers.add(toUser);
                    context.write(new LongWritable(fromUser), new FriendCountWritable(toUser, -1L));
                }

                for (int i = 0; i < toUsers.size(); i++) {
                    for (int j = i + 1; j < toUsers.size(); j++) {
                        context.write(new LongWritable(toUsers.get(i)), new FriendCountWritable((toUsers.get(j)), fromUser));
                        context.write(new LongWritable(toUsers.get(j)), new FriendCountWritable((toUsers.get(i)), fromUser));
                    }
                }
            }
        }
    }
    //Reduce
    public static class Reduce extends Reducer<LongWritable, FriendCountWritable, LongWritable, Text> {
        @SuppressWarnings("serial")
		@Override
        public void reduce(LongWritable key, Iterable<FriendCountWritable> values, Context context)
                throws IOException, InterruptedException {

            // key is the recommended friend, and value is the list of mutual friends
            final java.util.Map<Long, List<Long>> mutualFriends = new HashMap<Long, List<Long>>();

            for (FriendCountWritable val : values) {
                final Boolean isAlreadyFriend = (val.mutualFriend == -1);
                final Long toUser = val.user;
                final Long mutualFriend = val.mutualFriend;

                if (mutualFriends.containsKey(toUser)) {
                    if (isAlreadyFriend) {
                        mutualFriends.put(toUser, null);
                    } else if (mutualFriends.get(toUser) != null) {
                        mutualFriends.get(toUser).add(mutualFriend);
                    }
                } else {
                    if (!isAlreadyFriend) {
                        mutualFriends.put(toUser, new ArrayList<Long>() {
                            {
                                add(mutualFriend);
                            }
                        });
                    } else {
                        mutualFriends.put(toUser, null);
                    }
                }
            }

            java.util.SortedMap<Long, List<Long>> sortedMutualFriends = new TreeMap<Long, List<Long>>(new Comparator<Long>() {
                public int compare(Long key1, Long key2) {
                    Integer v1 = mutualFriends.get(key1).size();
                    Integer v2 = mutualFriends.get(key2).size();
                    if (v1 > v2) {
                        return -1;
                    } else if (v1.equals(v2) && key1 < key2) {
                        return -1;
                    } else {
                        return 1;
                    }
                }
            });

            for (java.util.Map.Entry<Long, List<Long>> entry : mutualFriends.entrySet()) {
                if (entry.getValue() != null) {
                    sortedMutualFriends.put(entry.getKey(), entry.getValue());
                }
            }

            Integer i = 0;
            String output = "";
            for (java.util.Map.Entry<Long, List<Long>> entry : sortedMutualFriends.entrySet()) {
                if (i == 0) {
                    output = entry.getKey().toString() + " (" + entry.getValue().size() + ": " + entry.getValue() + ")";
                } else {
                    output += "," + entry.getKey().toString() + " (" + entry.getValue().size() + ": " + entry.getValue() + ")";
                }
                ++i;
            }
            context.write(key, new Text(output));
        }
    }
    //main
    @SuppressWarnings("deprecation")
	public static void main(String[] args) throws Exception {
    	long startTime = System.currentTimeMillis(); //获取开始时间
      Configuration conf = new Configuration();

        
		Job job = new Job(conf, "FriendRecommendation");
      job.setJarByClass(FriendRecommendation.class);
      job.setOutputKeyClass(LongWritable.class);
      job.setOutputValueClass(FriendCountWritable.class);

      job.setMapperClass(Map.class);
      job.setReducerClass(Reduce.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileSystem outFs = new Path(args[1]).getFileSystem(conf);
      outFs.delete(new Path(args[1]), true);

      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      job.waitForCompletion(true);
      long endTime = System.currentTimeMillis();//获取结束时间
      System.out.println("程序运行时间:" + (endTime - startTime) + "ms"); //输出程序运行时间
    }
}

3、生成JAR包

参考博客的第【4】步

4、上传测试文件到HDFS中

# hadoop fs -put soc-test.txt hdfs://localhost:9000/FriendInput
# hadoop fs -put soc-LiveJournal1Adj.txt hdfs://localhost:9000/FriendInput2

删除文件

# hadoop fs -rm -f hdfs://localhost:9000/FriendInput     # 删除文件夹-f换成-r

访问http://192.168.184.114:50070
在这里插入图片描述

5、运行测试

5.1、运行soc-test.txt

# time hadoop jar FriendRecommendation.jar FriendRecommendation /FriendInput /FriendOutput

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
得到结果

# hadoop fs -cat /FriendOutput/part-r-00000                # 查看结果
# hadoop fs -get /FriendOutput/part-r-00000  Output.txt    # 下载文件

在这里插入图片描述

5.2、运行soc-LiveJournal1Adj.txt

# time hadoop jar FriendRecommendation.jar FriendRecommendation /FriendInput2 /FriendOutput2

在这里插入图片描述
在这里插入图片描述

查看HDFS的文件

# hadoop fs -ls hdfs://localhost:9000/

得到结果

# hadoop fs -cat /FriendOutput2/part-r-00000                # 查看结果
# hadoop fs -get /FriendOutput2/part-r-00000  Output2.txt    # 下载文件

在这里插入图片描述
在这里插入图片描述

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>