Hadoop MapReduce 实现多表连接,和表自连接,用多个Mapper输入实现

浏览: 1638

有如下2张表

SQL> select * from emp;

EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
---------- ---------- --------- ---------- -------------- ---------- ---------- ----------
7369 SMITH CLERK 7902 17-12月-80 800 20
7499 ALLEN SALESMAN 7698 20-2月 -81 1600 300 30
7521 WARD SALESMAN 7698 22-2月 -81 1250 500 30
7566 JONES MANAGER 7839 02-4月 -81 2975 20
7654 MARTIN SALESMAN 7698 28-9月 -81 1250 1400 30
7698 BLAKE MANAGER 7839 01-5月 -81 2850 30
7782 CLARK MANAGER 7839 09-6月 -81 2450 10
7839 KING PRESIDENT 17-11月-81 5000 10
7844 TURNER SALESMAN 7698 08-9月 -81 1500 0 30
7900 JAMES CLERK 7698 03-12月-81 950 30
7902 FORD ANALYST 7566 03-12月-81 3000 20
7934 MILLER CLERK 7782 23-1月 -82 1300 10

已选择12行。

SQL> select * from dept;

DEPTNO DNAME LOC
---------- -------------- -------------
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON

现在分别求解下面2个问题

1) 求各个部门的总工资

5) 列出工资比上司高的员工姓名及其工资


下面是实现步骤

1)   求各个部门的总工资

代码思路:输入两个Mapper,输出为自定义的类,EmpMapper的标记为0,DeptMapper的标记为1,在Reducer中按标记来区分取数

package com.jack.hadoop.tablelink;

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeptSalay extends Configured implements Tool{

public static class EmpMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(EmpMapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of Multiple1Mapper");
}

@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String info = new String(value.getBytes(),"UTF-8");
String[] values = info.split(delimiter);
for(int i = 0; i < values.length; i ++){
values[i] = values[i].trim();
}

log.info("key-->"+values[7]+"=========value-->"+"[0,"+values[5]+"]");
cxt.write(new Text(values[7]), new FlagStringDataType(0,values[5]));
}
}

public static class DeptMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(EmpMapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of DeptMapper");
}

@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String info = new String(value.getBytes(),"UTF-8");
String[] values = info.split(delimiter);
for(int i = 0; i < values.length; i ++){
values[i] = values[i].trim();
}

log.info("key-->"+values[0]+"=========value-->"+"[0,"+values[1]+"]");
cxt.write(new Text(values[0]), new FlagStringDataType(1,values[1]));
}
}

public static class LinkReducer extends Reducer<Text,FlagStringDataType,Text,DoubleWritable>{
private Logger log = LoggerFactory.getLogger(LinkReducer.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
}
@Override
public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
log.info("================");
log.info(" =======");
log.info(" ==");
String[] value = new String[3];
value[0]=key.toString();
String deptName = "";
Double salarySums = 0d;
//计算工资
for(FlagStringDataType v : values){
int index = v.getFlag();
if(index == 1){
deptName = v.get();
}
if(index == 0){
//标记为1的,存放的是工资
salarySums += Double.valueOf(v.get());
}
log.info("index:"+index+"-->value:"+v.get());
value[index]= v.get();
}
log.info(" ==");
log.info(" =======");
log.info("================");
cxt.write(new Text(deptName),new DoubleWritable(salarySums));
}
}

@Override
public int run(String[] arg0) throws Exception {
//删除目录
try{
File file = new File("D:/tmp/deptSalaryOut");
if(file.isDirectory()){
String[] children = file.list();
for(String c : children){
File file1 = new File(file,c);
file1.delete();
}
}
file.delete();
}catch(Exception e){
e.printStackTrace();
}

String input = "/tmp/demo.txt";
String input1 = "/tmp/demo1.txt";
String output = "/tmp/deptSalaryOut";
Configuration conf = getConf();
Job job = new Job(conf, "DeptSalay"); //任务名
job.setJarByClass(DeptSalay.class); //指定Class

//FileInputFormat.addInputPath( job, new Path(input) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(output) );
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlagStringDataType.class);
job.setReducerClass (LinkReducer.class );
//输入两个Mapper,EmpMapper的标记为0,DeptMapper的标记为1,在Reducer中按标记来区分取数
MultipleInputs.addInputPath(job, new Path(input), TextInputFormat.class, EmpMapper.class);
MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, DeptMapper.class);

job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( DoubleWritable.class ); //指定输出的VALUE的格式

job.waitForCompletion(true);

return job.isSuccessful() ? 0 : 1;
}

public static void main(String[] args) {
Configuration conf = new Configuration();
try {
ToolRunner.run(conf, new DeptSalay(), args);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

其中demo.txt

7369, SMITH ,     CLERK     ,      7902, 17-12月-80,            800,          ,         20
7499, ALLEN , SALESMAN , 7698, 20-2月 -81, 1600, 300, 30
7521, WARD , SALESMAN , 7698, 22-2月 -81, 1250, 500, 30
7566, JONES , MANAGER , 7839, 02-4月 -81, 2975, , 20
7654, MARTIN, SALESMAN , 7698, 28-9月 -81, 1250, 1400, 30
7698, BLAKE , MANAGER , 7839, 01-5月 -81, 2850, , 30
7782, CLARK , MANAGER , 7839, 09-6月 -81, 2450, , 10
7839, KING , PRESIDENT , , 17-11月-81, 5000, , 10
7844, TURNER, SALESMAN , 7698, 08-9月 -81, 1500, 0, 30
7900, JAMES , CLERK , 7698, 03-12月-81, 950, , 30
7902, FORD , ANALYST , 7566, 03-12月-81, 3000, , 20
7934, MILLER, CLERK , 7782, 23-1月 -82, 1300, , 10

demo1.txt

10, ACCOUNTING,     NEW YORK
20, RESEARCH , DALLAS
30, SALES , CHICAGO
40, OPERATIONS, BOSTON

5) 列出工资比上司高的员工姓名及其工资

代码思路:

2个Mapper都读入EMP表,第一个MAPPER以EMP为KEY,Value是工资

第二个mapper已Mgr上级为key,以下属和工资为value

在reducer中计算比较自己和自己的下属工资,即可得出比上级工资高的员工。

注意:2个Mapper要读入2个相同文件,不能同时读入一个文件

package com.jack.hadoop.tablelink;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SalaryMoreThanMgr extends Configured implements Tool{

public static class EmpSalaryMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(EmpSalaryMapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of Multiple1Mapper");
}

@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String info = new String(value.getBytes(),"UTF-8");
String[] values = info.split(delimiter);
for(int i = 0; i < values.length; i ++){
values[i] = values[i].trim();
}

log.info("emp:-->"+values[0]+"=========value-->"+"[0,"+values[5]+"]");
cxt.write(new Text(values[0]), new FlagStringDataType(0,values[5]));
}
}

public static class MgrMapper extends Mapper<LongWritable,Text,Text,FlagStringDataType>{
private Logger log = LoggerFactory.getLogger(MgrMapper.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
log.info("This is the begin of DeptMapper");
}

@Override
public void map(LongWritable key,Text value,Context cxt) throws IOException,InterruptedException{
String info = new String(value.getBytes(),"UTF-8");
String[] values = info.split(delimiter);
for(int i = 0; i < values.length; i ++){
values[i] = values[i].trim();
}

log.info("Mgr-->"+values[3]+"=========value-->"+"["+values[0] + "," + values[1] + "," + values[5] +"]");
cxt.write(new Text(values[3]), new FlagStringDataType(1,values[0] + "," + values[1] + "," + values[5]));
}
}

public static class LinkReducer extends Reducer<Text,FlagStringDataType,Text,DoubleWritable>{
private Logger log = LoggerFactory.getLogger(LinkReducer.class);
private String delimiter=null; // default is comma
@Override
public void setup(Context cxt){
delimiter= cxt.getConfiguration().get("delimiter", ",");
}
@Override
public void reduce(Text key, Iterable<FlagStringDataType> values,Context cxt) throws IOException,InterruptedException{
log.info("================");
log.info(" =======");
log.info(" ==");
String[] value = new String[3];
value[0]=key.toString();
Double mgrSalary = 0d;
String empName = null;
Double findEmpSalary = 0d;
List<String> childList = new ArrayList<String>();

for(FlagStringDataType v : values){
int index = v.getFlag();
if(index == 0){
mgrSalary = Double.parseDouble(v.get());
}
if(index == 1){
//标记为1的,存放的是下级的员工及工资
childList.add(v.get());
}
//log.info(key.toString() + "=index:"+index+"-->value:"+v.get());
value[index]= v.get();
}
log.info(key + ":" + mgrSalary + ";child:" + childList);
log.info(" ==");
log.info(" =======");
log.info("================");

for(String c : childList){
String[] s = c.split(",");
Double empSalary = Double.parseDouble(s[2]);
if(empSalary > mgrSalary){
empName = s[1];
findEmpSalary = empSalary;
}
}

if(empName != null && mgrSalary != 0){

cxt.write(new Text(empName),new DoubleWritable(findEmpSalary));
}
}
}

@Override
public int run(String[] arg0) throws Exception {
//删除目录
try{
File file = new File("D:/tmp/SalaryMoreThanMgrOut");
if(file.isDirectory()){
String[] children = file.list();
for(String c : children){
File file1 = new File(file,c);
file1.delete();
}
}
file.delete();
}catch(Exception e){
e.printStackTrace();
}

String input = "/tmp/demo.txt";
String input1 = "/tmp/demoSame.txt";
String output = "/tmp/SalaryMoreThanMgrOut";
Configuration conf = getConf();
Job job = new Job(conf, "SalaryMoreThanMgr"); //任务名
job.setJarByClass(SalaryMoreThanMgr.class); //指定Class

//FileInputFormat.addInputPath( job, new Path(input) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(output) );
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlagStringDataType.class);
job.setReducerClass (LinkReducer.class );

MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, MgrMapper.class);
MultipleInputs.addInputPath(job, new Path(input), TextInputFormat.class, EmpSalaryMapper.class);

job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( DoubleWritable.class ); //指定输出的VALUE的格式

job.waitForCompletion(true);

return job.isSuccessful() ? 0 : 1;
}

public static void main(String[] args) {
Configuration conf = new Configuration();
try {
ToolRunner.run(conf, new SalaryMoreThanMgr(), args);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

demoSame.txt内容和demo.txt 一样,但是不能读同一个文件。


自定义的Mapper输出类

package com.jack.hadoop.tablelink;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.primitives.Ints;

public class FlagStringDataType implements WritableComparable<FlagStringDataType> {
private Logger log = LoggerFactory.getLogger(FlagStringDataType.class);
private String value;
private int flag;

public FlagStringDataType() {
}

public FlagStringDataType(int flag, String value) {
this.value = value;
this.flag = flag;
}

public String get() {
return value;
}

public void set(String value) {
this.value = value;
}

@Override
public boolean equals(Object other) {
return other != null && getClass().equals(other.getClass()) && ((FlagStringDataType) other).get() == value
&& ((FlagStringDataType) other).getFlag() == flag;
}

@Override
public int hashCode() {
return Ints.hashCode(flag) + value.hashCode();
}

@Override
public int compareTo(FlagStringDataType other) {

if (flag >= other.flag) {
if (flag > other.flag) {
return 1;
}
} else {
return -1;
}
return value.compareTo(other.value);
}

@Override
public void write(DataOutput out) throws IOException {
log.info("in write()::" + "flag:" + flag + ",vlaue:" + value);
out.writeInt(flag);
out.writeUTF(value);
}

@Override
public void readFields(DataInput in) throws IOException {
log.info("in read()::" + "flag:" + flag + ",vlaue:" + value);
flag = in.readInt();
value = in.readUTF();
log.info("in read()::" + "flag:" + flag + ",vlaue:" + value);
}

public int getFlag() {
return flag;
}

public void setFlag(int flag) {
this.flag = flag;
}

public String toString() {
return flag + ":" + value;
}

}
推荐 0
本文由 策马行空 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册