MapReduce数据处理[经典面试题01]

浏览: 2837

最近有了解到一些数据开发的面试提会要求开发进行手动编写map-reduce;记录一下使用map-reduce进行数据处理的过程及逻辑;

现在版本的map-reduce过程基本都依赖新的jar包,之前的mapred相关的类后面不被推荐使用,使用mapreduce相关的类进行开发:

题目:

有一个好友列表,其中第一列是用户,后面的列是用户关注的好友。要求 找出每两个好友的共同好友。

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J,K

代码及注释:

思路

1、先map遍历列表,获取 用户->好友 的映射关系 输出 好友->用户 一对一关系

2、reduce循环遍历,获取 好友(共同的好友)->用户list

3、再进行一次map,对用户列表遍历,获取两两用户的key 和 好友value 例如 A-B (key) : E (value) 输出到reduce

4、reduce进行迭代组合根据两两用户(A-B)合并 好友(E)的列表.

package hadoop;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;


public class CommonFriends {

/*
* @param LongWritable map根据输入的偏移量作为key
* @param Text 第一个是 文件的记录
* @param Text 第二个是 map输出的key类型
* @param Text 第三个是 map输出的value类型
* */
public static class commonFriendMap01 extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] per_friend = line.split(":");
String per = per_friend[0];
String[] friends = per_friend[1].split(",");
for(String f:friends){
context.write(new Text(f),new Text(per));
}
}
}

/*
* @param Text 第一个是 reduce输入的key类型
* @param Text 第二个是 reduce输入的value类型
* @param Text 第三个是 reduce输出的key类型
* @param Text 第四个是 reduce输出的value类型
* */
public static class commonFriendReduce01 extends Reducer<Text,Text,Text,Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
Set<String> set = new HashSet<>();
//使用set进行去重操作
for(Text f:values){
if(!set.contains(f.toString())){
set.add(f.toString());
}
}
for(String f:set){
sb.append(f).append(",");
}
sb.deleteCharAt(sb.length()-1);
context.write(key,new Text(sb.toString()));
}
}

/*
* @param LongWritable map根据输入的偏移量作为key
* @param Text 第一个是 文件的记录
* @param Text 第二个是 map输出的key类型
* @param Text 第三个是 map输出的value类型
* */
public static class commonFriendMap02 extends Mapper<LongWritable,Text,Text,Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String [] per_fr = value.toString().split("\t");
String friend = per_fr[0];
String[] per = per_fr[1].split(",");
//如果存在乱序,对用户进行排序
Arrays.sort(per);
for(int i=0;i<per.length;i++){
for(int j=i+1;j<per.length;j++){
context.write(new Text(per[i]+"-"+per[j]+":"),new Text(friend));
}
}
}
}

/*
* @param Text 第一个是 reduce输入的key类型
* @param Text 第二个是 reduce输入的value类型
* @param Text 第三个是 reduce输出的key类型
* @param Text 第四个是 reduce输出的value类型
* */
public static class commonFriendReduce02 extends Reducer<Text,Text,Text,Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for(Text v:values){
sb.append(v).append(",");
}
sb.deleteCharAt(sb.length()-1);
context.write(key,new Text(sb.toString()));
}
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

// 设置job相关参数
Job job01 = Job.getInstance(conf);

// 设置jar、map、reduce相关类
job01.setJarByClass(CommonFriends.class);
job01.setMapperClass(commonFriendMap01.class);
job01.setReducerClass(commonFriendReduce01.class);

// 设置业务逻辑Reducer类输出key、value的数据类型
job01.setOutputKeyClass(Text.class);
job01.setOutputValueClass(Text.class);

// 指定输入输出路径
FileInputFormat.setInputPaths(job01,new Path("/mapreduce/src/main/resources/source"));
FileOutputFormat.setOutputPath(job01,new Path("/mapreduce/src/main/resources/output"));

Job job02 = Job.getInstance(conf);
job02.setJarByClass(CommonFriends.class);
job02.setMapperClass(commonFriendMap02.class);
job02.setReducerClass(commonFriendReduce02.class);


job02.setOutputKeyClass(Text.class);
job02.setOutputValueClass(Text.class);

//设定ReduceTask数量 一般等于Partitioner数量
job02.setNumReduceTasks(1);

// 指定输入输出路径
FileInputFormat.setInputPaths(job02,new Path("/mapreduce/src/main/resources/output"));
FileOutputFormat.setOutputPath(job02,new Path("/mapreduce/src/main/resources/final"));

//job提交
boolean res01 = job01.waitForCompletion(true);
boolean res02 = job02.waitForCompletion(true);
System.exit(res02&&res01?0:1);
}
}

最后的输出结果:

A-B:    E,C
A-C: D,F
A-D: E,F
A-E: B,C,D
A-F: C,E,O,D,B
A-G: E,F,C,D
A-H: C,D,E,O
A-I: O
A-J: O,B
A-K: C,D
A-L: F,D,E
A-M: F,E
B-C: A
B-D: A,E
B-E: C
B-F: C,A,E
B-G: E,C,A
B-H: E,C,A
B-I: A
B-K: A,C
B-L: E
B-M: E
B-O: A,K
C-D: A,F
C-E: D
C-F: A,D
C-G: A,D,F
C-H: D,A
C-I: A
C-K: A,D
C-L: D,F
C-M: F
C-O: I,A
D-E: L
D-F: A,E
D-G: E,A,F
D-H: A,E
D-I: A
D-K: A
D-L: E,F
D-M: F,E
D-O: A
E-F: D,M,C,B
E-G: C,D
E-H: C,D
E-J: B
E-K: C,D
E-L: D
F-G: D,C,A,E
F-H: A,D,O,E,C
F-I: O,A
F-J: B,O
F-K: D,C,A
F-L: E,D
F-M: E
F-O: A
G-H: D,C,E,A
G-I: A
G-K: D,A,C
G-L: D,F,E
G-M: E,F
G-O: A
H-I: O,A
H-J: O
H-K: A,C,D
H-L: D,E
H-M: E
H-O: A
I-J: O
I-K: A
I-O: A
K-L: D
K-O: A
L-M: E,F
推荐 0
本文由 brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册