【Hadoop】【MapReduce】实现实例:FriendRecommendation
MapReduce
author:zoxiii
FriendRecommendation
1、新建Java工程
1.1、工程名为:Friend
,点击Finish
1.2、为Java工程配置HadoopJar包路径
参考博客的第【2】步
2、新建Java class
源码地址:代码如下(有很小的修改)
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
/**
* Created with IntelliJ IDEA.
* User: dbtsai
* Date: 1/16/13
* Time: 5:32 PM
*/
public class FriendRecommendation {
//Friend
static public class FriendCountWritable implements Writable {
public Long user;
public Long mutualFriend;
public FriendCountWritable(Long user, Long mutualFriend) {
this.user = user;
this.mutualFriend = mutualFriend;
}
public FriendCountWritable() {
this(-1L, -1L);
}
public void write(DataOutput out) throws IOException {
out.writeLong(user);
out.writeLong(mutualFriend);
}
public void readFields(DataInput in) throws IOException {
user = in.readLong();
mutualFriend = in.readLong();
}
@Override
public String toString() {
return " toUser: "
+ Long.toString(user) + " mutualFriend: " + Long.toString(mutualFriend);
}
}
//Map
public static class Map extends Mapper<LongWritable, Text, LongWritable, FriendCountWritable> {
@SuppressWarnings("unused")
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line[] = value.toString().split("t");
Long fromUser = Long.parseLong(line[0]);
List<Long> toUsers = new ArrayList<Long>();
if (line.length == 2) {
StringTokenizer tokenizer = new StringTokenizer(line[1], ",");
while (tokenizer.hasMoreTokens()) {
Long toUser = Long.parseLong(tokenizer.nextToken());
toUsers.add(toUser);
context.write(new LongWritable(fromUser), new FriendCountWritable(toUser, -1L));
}
for (int i = 0; i < toUsers.size(); i++) {
for (int j = i + 1; j < toUsers.size(); j++) {
context.write(new LongWritable(toUsers.get(i)), new FriendCountWritable((toUsers.get(j)), fromUser));
context.write(new LongWritable(toUsers.get(j)), new FriendCountWritable((toUsers.get(i)), fromUser));
}
}
}
}
}
//Reduce
public static class Reduce extends Reducer<LongWritable, FriendCountWritable, LongWritable, Text> {
@SuppressWarnings("serial")
@Override
public void reduce(LongWritable key, Iterable<FriendCountWritable> values, Context context)
throws IOException, InterruptedException {
// key is the recommended friend, and value is the list of mutual friends
final java.util.Map<Long, List<Long>> mutualFriends = new HashMap<Long, List<Long>>();
for (FriendCountWritable val : values) {
final Boolean isAlreadyFriend = (val.mutualFriend == -1);
final Long toUser = val.user;
final Long mutualFriend = val.mutualFriend;
if (mutualFriends.containsKey(toUser)) {
if (isAlreadyFriend) {
mutualFriends.put(toUser, null);
} else if (mutualFriends.get(toUser) != null) {
mutualFriends.get(toUser).add(mutualFriend);
}
} else {
if (!isAlreadyFriend) {
mutualFriends.put(toUser, new ArrayList<Long>() {
{
add(mutualFriend);
}
});
} else {
mutualFriends.put(toUser, null);
}
}
}
java.util.SortedMap<Long, List<Long>> sortedMutualFriends = new TreeMap<Long, List<Long>>(new Comparator<Long>() {
public int compare(Long key1, Long key2) {
Integer v1 = mutualFriends.get(key1).size();
Integer v2 = mutualFriends.get(key2).size();
if (v1 > v2) {
return -1;
} else if (v1.equals(v2) && key1 < key2) {
return -1;
} else {
return 1;
}
}
});
for (java.util.Map.Entry<Long, List<Long>> entry : mutualFriends.entrySet()) {
if (entry.getValue() != null) {
sortedMutualFriends.put(entry.getKey(), entry.getValue());
}
}
Integer i = 0;
String output = "";
for (java.util.Map.Entry<Long, List<Long>> entry : sortedMutualFriends.entrySet()) {
if (i == 0) {
output = entry.getKey().toString() + " (" + entry.getValue().size() + ": " + entry.getValue() + ")";
} else {
output += "," + entry.getKey().toString() + " (" + entry.getValue().size() + ": " + entry.getValue() + ")";
}
++i;
}
context.write(key, new Text(output));
}
}
//main
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis(); //获取开始时间
Configuration conf = new Configuration();
Job job = new Job(conf, "FriendRecommendation");
job.setJarByClass(FriendRecommendation.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(FriendCountWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem outFs = new Path(args[1]).getFileSystem(conf);
outFs.delete(new Path(args[1]), true);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
long endTime = System.currentTimeMillis();//获取结束时间
System.out.println("程序运行时间:" + (endTime - startTime) + "ms"); //输出程序运行时间
}
}
3、生成JAR包
参考博客的第【4】步
4、上传测试文件到HDFS中
# hadoop fs -put soc-test.txt hdfs://localhost:9000/FriendInput
# hadoop fs -put soc-LiveJournal1Adj.txt hdfs://localhost:9000/FriendInput2
删除文件
# hadoop fs -rm -f hdfs://localhost:9000/FriendInput # 删除文件夹-f换成-r
访问http://192.168.184.114:50070
5、运行测试
5.1、运行soc-test.txt
# time hadoop jar FriendRecommendation.jar FriendRecommendation /FriendInput /FriendOutput
得到结果
# hadoop fs -cat /FriendOutput/part-r-00000 # 查看结果
# hadoop fs -get /FriendOutput/part-r-00000 Output.txt # 下载文件
5.2、运行soc-LiveJournal1Adj.txt
# time hadoop jar FriendRecommendation.jar FriendRecommendation /FriendInput2 /FriendOutput2
查看HDFS的文件
# hadoop fs -ls hdfs://localhost:9000/
得到结果
# hadoop fs -cat /FriendOutput2/part-r-00000 # 查看结果
# hadoop fs -get /FriendOutput2/part-r-00000 Output2.txt # 下载文件
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
二维码