kettle将一行数据处理为多行数据

浏览: 3099

需求

现有数据如表A,username是用户名,products是购买的商品名,如第一行数据表示为用户name1购买了apple,milk,eggs这三样物品。物品之间的分隔符为多样,这里有;,、这三种形式。

Clipboard Image.png

现要将该表进行处理,处理为表B形式,即一个用户名对应一件商品为一条

Clipboard Image.png

处理过程

1.新建一个转换

Clipboard Image.png

2.表输入

配置好数据库连接,并写上sql语句

select username,products from test.A 

3.脚本处理

采用java代码处理


import java.io.UnsupportedEncodingException;

//kettle中已定义好的行处理方法,每行记录都会执行一次
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{

//(1)获取到上一个步骤的输入行
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());

//(2)读取出参数变量值
String username= get(Fields.In,"username").getString(r);
String products= get(Fields.In,"products").getString(r);

//(3)处理数据
if(products!=null){
String[] productsArr=products.split("[,;、]"); //根据存储内容来分析按什么分割
for(int i=0;i<productsArr.length;i++){
Object[] clonedRow=getInputRowMeta().cloneRow(r); //复制输入行数据
clonedRow=createOutputRow(clonedRow,data.outputRowMeta.size());
products=productsArr[i];
//(4)把处理好的值放入到输出记录中
get(Fields.Out,"products").setValue(clonedRow,products);
putRow(data.outputRowMeta,clonedRow);
}

}



//(5)输出到下一个节点做处理
else putRow(data.outputRowMeta, r);
return true;
}

4.表输出

输出到指定表B。

表B的结构应该与A一致。


CREATE TABLE `B` (
`id` int(5) NOT NULL AUTO_INCREMENT,
`username` varchar(20) DEFAULT NULL,
`products` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=10 DEFAULT CHARSET=utf8



附上表A表结构和数据:

CREATE TABLE `A` (
`id` int(5) NOT NULL AUTO_INCREMENT,
`username` varchar(20) DEFAULT NULL,
`products` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=5 DEFAULT CHARSET=utf8;

insert into `A`(`id`,`username`,`products`) values (1,'name1','apple;milk;eggs;'),(2,'name2','potato,banana,milk;'),(3,'name3','shoes'),(4,'name4','clothes、hats');
推荐 3
本文由 _缘君_ 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

3 个评论

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
import java.io.UnsupportedEncodingException;

//kettle中已定义好的行处理方法,每行记录都会执行一次
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{

//(1)获取到上一个步骤的输入行
Object[] r = getRow();
if (r == null) {
setOutputDone();
return false;
}
r = createOutputRow(r, data.outputRowMeta.size());

//(2)读取出参数变量值
String username= get(Fields.In,"username").getString(r);
String products= get(Fields.In,"products").getString(r);
String LeaveBegin = get(Fields.In, "LeaveBegin").getString(r);
String LeaveEnd = get(Fields.In, "LeaveEnd").getString(r);
String LeaveHours = get(Fields.In, "LeaveHours").getString(r);
String LeaveDay = get(Fields.In, "LeaveDay").getString(r);

//(3)处理数据
if(LeaveBegin!=null){
//获取请假开始到请假结束间隔多少天,精确到以天为单位
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
Calendar cal = Calendar.getInstance();
long begintime = 0;
long endtime = 0;

try{
cal.setTime(sdf.parse(LeaveBegin));
begintime = cal.getTimeInMillis();
cal.setTime(sdf.parse(LeaveEnd));
endtime = cal.getTimeInMillis();
}catch(Exception e){
e.printStackTrace();
}
long between_days=((endtime-begintime)/(1000*3600*24))+1;

//强制转换为INT类型
Float DateNum=Float.parseFloat(String.valueOf(between_days));

//处理请假小时数
Float TempLeaveHours=Float.parseFloat(LeaveHours)/DateNum;
LeaveHours=String.valueOf(TempLeaveHours);

//处理请假天数
Float TempLeaveDay=Float.parseFloat(LeaveDay)/DateNum;
//Float TempLeaveDay=DateNum;
LeaveDay=String.valueOf(TempLeaveDay);

for(int i=0;i<DateNum;i++){
Object[] clonedRow=getInputRowMeta().cloneRow(r); //复制输入行数据
clonedRow=createOutputRow(clonedRow,data.outputRowMeta.size());

//处理请假日期
try{
SimpleDateFormat ldf=new SimpleDateFormat("yyyy-MM-dd");
Date LeaveNewDate = ldf.parse(LeaveBegin);
Calendar dal = Calendar.getInstance();
dal.setTime(LeaveNewDate); //注意此处的日期绑定字段
dal.add(Calendar.DATE,i); //在请假日期的基础上加一天,直至请假结束日期

//更新请假开始日期和请假结束日期
LeaveBegin=ldf.format(dal.getTime());
LeaveEnd=ldf.format(dal.getTime());

}catch(Exception d){
d.printStackTrace();
}

//(4)把处理好的值放入到输出记录中
get(Fields.Out,"username").setValue(clonedRow,username);
get(Fields.Out,"products").setValue(clonedRow,products);
get(Fields.Out,"LeaveBegin").setValue(clonedRow,LeaveBegin);
get(Fields.Out,"LeaveEnd").setValue(clonedRow,LeaveEnd);
get(Fields.Out,"LeaveHours").setValue(clonedRow,LeaveHours);
get(Fields.Out,"LeaveDay").setValue(clonedRow,LeaveDay);

putRow(data.outputRowMeta,clonedRow);
}

}

//(5)输出到下一个节点做处理
else putRow(data.outputRowMeta, r);
return true;
}
在博主代码的基础上改了一下,想要根据开始日期和结束日期拆分请假多日的单据拆分为以天为单位的请假单,可惜还是不行,在拆分日期的时候,出来的数据会跳过中间一些日期,还找不到原因。。。。
https://ask.hellobi.com/question/23920
大神,求帮忙~

要回复文章请先登录注册