MapReduce数据处理[经典面试题02]

浏览: 1818

互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:

一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:

1、用户信息文件

用户id 用户名称  设备id
1 kedai 1
1 yinwen 2
2 haha 3
3 zhenhao 4

2、设备访问日志

设备id 访问日期  访问ip
1 2019-01-05 01:01:48 192.168.1.39
3 2019-01-04 02:20:13 192.168.1.79
3 2019-01-04 01:55:16 192.168.1.32
9 2019-01-04 06:16:10 192.168.1.83
1 2019-01-03 23:01:43 192.168.1.70
4 2019-01-04 23:16:42 192.168.1.75
5 2019-01-05 01:49:02 192.168.1.56
6 2019-01-04 05:56:26 192.168.1.37

取每个用户最近一次访问记录信息,一个用户可能有多个设备id。

思路一:

一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次

二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。

思路二:

一、使用两个job进行处理,job01阶段 map分别读取两个文件,按照设备id作为key进行输出

二、job01阶段 reduce按照设备id进行合并记录

三、job02 map阶段,使用新建排序bean类,把bean实例作为key输出,用户id作为value。

四、按照value进行过滤获取前20;

思路一适合大小文件处理,思路二更适合一般处理过程。

思路一代码:

package hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;


public class UserVisitSort {
public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> {

private HashMap<String,String> user = new HashMap<>();

@Override
protected void setup(Context context) throws IOException {
String filename ="/mapreduce/src/main/resources/joinfiles/user.txt";

BufferedReader br = new BufferedReader(new FileReader(filename));
String line;
while ((line=br.readLine())!=null){
String[] strings =line.split("\t");
user.put(strings[2],strings[0]+"\t"+strings[1]);
}
br.close();
}

@Override
protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
VisitBean visitBean = null;
String lines = values.toString();
String[] fields = lines.split("\t");
if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){
String [] userInfo = user.get(fields[0]).split("\t");
visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]);
context.write(visitBean,new Text(userInfo[0]));
}

}
}

public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> {
HashMap<String,Integer> mapCount = new HashMap<>();

@Override
protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text v:values){
if(mapCount.get(v.toString())==null){
context.write(v,key);
mapCount.put(v.toString(),1);
}else if(mapCount.get(v.toString())<10){
context.write(v,key);
mapCount.put(v.toString(),mapCount.get(v.toString())+1);
}
}
}
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt");
Path outPath = new Path("/mapreduce/src/main/resources/out");

FileSystem fileSystem = FileSystem.get(conf);

if(fileSystem.isDirectory(outPath)){
fileSystem.delete(outPath,true);
}

job.setJarByClass(UserVisitSort.class);

job.setMapperClass(UserVisitSortMap.class);

job.setReducerClass(UserVisitSortReduce.class);

FileInputFormat.setInputPaths(job,inPath);
FileOutputFormat.setOutputPath(job,outPath);

job.setMapOutputKeyClass(VisitBean.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VisitBean.class);

boolean res =job.waitForCompletion(true);

System.exit(res?0:1);

}

}

bean类

package hadoop;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class VisitBean implements WritableComparable<VisitBean> {

private Integer userId;
private Integer deviceId;
private String datetime;
private String name;
private String ip;

//需要空构造
public VisitBean(){}

public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) {
this.userId = userId;
this.deviceId = deviceId;
this.datetime = datetime;
this.ip = ip;
this.name = name;
}

@Override
public int compareTo(VisitBean o) {
if(this.userId==o.getUserId()){
//按照日期降序
return this.datetime.compareTo(o.getDatetime())>0?-1:1;
}else {
//按照用户id升序
return this.userId>o.getUserId()?1:-1;
}
}

//write、readFields的顺序需要一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.userId);
dataOutput.writeInt(this.deviceId);
dataOutput.writeUTF(this.datetime);
dataOutput.writeUTF(this.ip);
dataOutput.writeUTF(this.name);
}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.userId = dataInput.readInt();
this.deviceId = dataInput.readInt();
this.datetime = dataInput.readUTF();
this.ip = dataInput.readUTF();
this.name = dataInput.readUTF();
}

@Override
public String toString(){
return JSON.toJSONString(this);
}

public Integer getUserId() {
return userId;
}

public void setUserId(Integer userId) {
this.userId = userId;
}

public Integer getDeviceId() {
return deviceId;
}

public void setDeviceId(Integer deviceId) {
this.deviceId = deviceId;
}

public String getDatetime() {
return datetime;
}

public void setDatetime(String datetime) {
this.datetime = datetime;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}

注意reducetask数量需要设置成1,如果需要分组文件需要设置Partitioner

执行结果如下:

1	{"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1}
1 {"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1}
1 {"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1}
1 {"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1}
1 {"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1}
1 {"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1}
1 {"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}
1 {"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1}
1 {"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1}
1 {"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}
2 {"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2}
2 {"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2}
2 {"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2}
2 {"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2}
2 {"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2}
2 {"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2}
2 {"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2}
2 {"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2}
2 {"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2}
2 {"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}

测试数据见附件

推荐 0
本文由 brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册