互联网公司处理的比较多的数据就是日志数据,其中访问日志处理要求就比较多一点,一般来说会有要求按照用户进行分组提取相关数据:
一个比较常见的需求就是,取用户最近的访问记录,例如有两分文件:
1、用户信息文件
用户id 用户名称 设备id
1 kedai 1
1 yinwen 2
2 haha 3
3 zhenhao 4
2、设备访问日志
设备id 访问日期 访问ip
1 2019-01-05 01:01:48 192.168.1.39
3 2019-01-04 02:20:13 192.168.1.79
3 2019-01-04 01:55:16 192.168.1.32
9 2019-01-04 06:16:10 192.168.1.83
1 2019-01-03 23:01:43 192.168.1.70
4 2019-01-04 23:16:42 192.168.1.75
5 2019-01-05 01:49:02 192.168.1.56
6 2019-01-04 05:56:26 192.168.1.37
取每个用户最近一次访问记录信息,一个用户可能有多个设备id。
思路一:
一、只需要一个job进行处理,使用mapjoin进行操作,把文件放到内存里面进行初始化一次
二、使用map-reduce进行处理即可,新建排序bean类,把bean类作为map的输出key,再reduce阶段进行限制输出数。
思路二:
一、使用两个job进行处理,job01阶段 map分别读取两个文件,按照设备id作为key进行输出
二、job01阶段 reduce按照设备id进行合并记录
三、job02 map阶段,使用新建排序bean类,把bean实例作为key输出,用户id作为value。
四、按照value进行过滤获取前20;
思路一适合大小文件处理,思路二更适合一般处理过程。
思路一代码:
package hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
public class UserVisitSort {
public static class UserVisitSortMap extends Mapper<LongWritable, Text,VisitBean, Text> {
private HashMap<String,String> user = new HashMap<>();
@Override
protected void setup(Context context) throws IOException {
String filename ="/mapreduce/src/main/resources/joinfiles/user.txt";
BufferedReader br = new BufferedReader(new FileReader(filename));
String line;
while ((line=br.readLine())!=null){
String[] strings =line.split("\t");
user.put(strings[2],strings[0]+"\t"+strings[1]);
}
br.close();
}
@Override
protected void map(LongWritable key,Text values,Context context) throws IOException, InterruptedException {
VisitBean visitBean = null;
String lines = values.toString();
String[] fields = lines.split("\t");
if(user.get(fields[0])!=null&&!"".equals(user.get(fields[0]))){
String [] userInfo = user.get(fields[0]).split("\t");
visitBean = new VisitBean(new Integer(userInfo[0]),new Integer(fields[0]),fields[1],userInfo[1],fields[2]);
context.write(visitBean,new Text(userInfo[0]));
}
}
}
public static class UserVisitSortReduce extends Reducer<VisitBean,Text,Text,VisitBean> {
HashMap<String,Integer> mapCount = new HashMap<>();
@Override
protected void reduce(VisitBean key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
for(Text v:values){
if(mapCount.get(v.toString())==null){
context.write(v,key);
mapCount.put(v.toString(),1);
}else if(mapCount.get(v.toString())<10){
context.write(v,key);
mapCount.put(v.toString(),mapCount.get(v.toString())+1);
}
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
Path inPath = new Path("/mapreduce/src/main/resources/joinfiles/visit_log.txt");
Path outPath = new Path("/mapreduce/src/main/resources/out");
FileSystem fileSystem = FileSystem.get(conf);
if(fileSystem.isDirectory(outPath)){
fileSystem.delete(outPath,true);
}
job.setJarByClass(UserVisitSort.class);
job.setMapperClass(UserVisitSortMap.class);
job.setReducerClass(UserVisitSortReduce.class);
FileInputFormat.setInputPaths(job,inPath);
FileOutputFormat.setOutputPath(job,outPath);
job.setMapOutputKeyClass(VisitBean.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VisitBean.class);
boolean res =job.waitForCompletion(true);
System.exit(res?0:1);
}
}
bean类
package hadoop;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class VisitBean implements WritableComparable<VisitBean> {
private Integer userId;
private Integer deviceId;
private String datetime;
private String name;
private String ip;
//需要空构造
public VisitBean(){}
public VisitBean(Integer userId, Integer deviceId, String datetime, String ip,String name) {
this.userId = userId;
this.deviceId = deviceId;
this.datetime = datetime;
this.ip = ip;
this.name = name;
}
@Override
public int compareTo(VisitBean o) {
if(this.userId==o.getUserId()){
//按照日期降序
return this.datetime.compareTo(o.getDatetime())>0?-1:1;
}else {
//按照用户id升序
return this.userId>o.getUserId()?1:-1;
}
}
//write、readFields的顺序需要一致
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(this.userId);
dataOutput.writeInt(this.deviceId);
dataOutput.writeUTF(this.datetime);
dataOutput.writeUTF(this.ip);
dataOutput.writeUTF(this.name);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.userId = dataInput.readInt();
this.deviceId = dataInput.readInt();
this.datetime = dataInput.readUTF();
this.ip = dataInput.readUTF();
this.name = dataInput.readUTF();
}
@Override
public String toString(){
return JSON.toJSONString(this);
}
public Integer getUserId() {
return userId;
}
public void setUserId(Integer userId) {
this.userId = userId;
}
public Integer getDeviceId() {
return deviceId;
}
public void setDeviceId(Integer deviceId) {
this.deviceId = deviceId;
}
public String getDatetime() {
return datetime;
}
public void setDatetime(String datetime) {
this.datetime = datetime;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
注意reducetask数量需要设置成1,如果需要分组文件需要设置Partitioner
执行结果如下:
1 {"datetime":"2019-01-05 11:31:21","deviceId":1,"ip":"kedai","name":"192.168.1.94","userId":1}
1 {"datetime":"2019-01-05 11:15:52","deviceId":1,"ip":"kedai","name":"192.168.1.23","userId":1}
1 {"datetime":"2019-01-05 11:05:06","deviceId":1,"ip":"kedai","name":"192.168.1.40","userId":1}
1 {"datetime":"2019-01-05 09:12:39","deviceId":1,"ip":"kedai","name":"192.168.1.52","userId":1}
1 {"datetime":"2019-01-05 09:01:30","deviceId":1,"ip":"kedai","name":"192.168.1.11","userId":1}
1 {"datetime":"2019-01-05 08:51:52","deviceId":1,"ip":"kedai","name":"192.168.1.33","userId":1}
1 {"datetime":"2019-01-05 08:47:36","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}
1 {"datetime":"2019-01-05 08:43:59","deviceId":1,"ip":"kedai","name":"192.168.1.38","userId":1}
1 {"datetime":"2019-01-05 05:39:51","deviceId":1,"ip":"kedai","name":"192.168.1.51","userId":1}
1 {"datetime":"2019-01-05 02:23:49","deviceId":1,"ip":"kedai","name":"192.168.1.89","userId":1}
2 {"datetime":"2019-01-05 11:49:04","deviceId":3,"ip":"haha","name":"192.168.1.33","userId":2}
2 {"datetime":"2019-01-05 10:44:18","deviceId":3,"ip":"haha","name":"192.168.1.95","userId":2}
2 {"datetime":"2019-01-05 09:35:54","deviceId":3,"ip":"haha","name":"192.168.1.74","userId":2}
2 {"datetime":"2019-01-05 08:56:08","deviceId":3,"ip":"haha","name":"192.168.1.84","userId":2}
2 {"datetime":"2019-01-05 08:24:55","deviceId":3,"ip":"haha","name":"192.168.1.91","userId":2}
2 {"datetime":"2019-01-05 06:46:20","deviceId":3,"ip":"haha","name":"192.168.1.32","userId":2}
2 {"datetime":"2019-01-05 06:34:02","deviceId":3,"ip":"haha","name":"192.168.1.81","userId":2}
2 {"datetime":"2019-01-05 06:02:43","deviceId":3,"ip":"haha","name":"192.168.1.73","userId":2}
2 {"datetime":"2019-01-05 04:08:02","deviceId":3,"ip":"haha","name":"192.168.1.26","userId":2}
2 {"datetime":"2019-01-05 03:01:34","deviceId":3,"ip":"haha","name":"192.168.1.92","userId":2}
测试数据见附件
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。