spark的RDD存储到Hbase

浏览: 2131

1.DataFrame转化为RDD

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable


def runOldSaveDataFrame():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()


//hbase
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass) //org.apache.hadoop.mapred.JobConf
jobConf.setOutputFormat(classOf[TableOutputFormat]) //org.apache.hadoop.hbase.mapred.TableOutputFormat
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")

//第一种:采用DataFrame方式
val inputData = sparkSession.read.json("D:/code/spark_code/course/data/data.json")
// inputData.printSchema()

//删除空值和NaN
// inputData.na.drop()
//删除指定列的空值
// inputData.na.drop(Array("age"))
//替换指定列空值
// inputData.na.fill(0l, cols=Array("age"))
//多列匹配
// inputData.na.fill(Map("age" -> 0l,"name" -> "N"))


// For implicit conversions like converting RDDs to DataFrames
// import sparkSession.implicits._
//
val filterData = inputData.filter("age is not null and name is not null")
val saveData = filterData.rdd.map{ x =>
val age = x.get(0)
val name = x.get(1)
// println(age.toString() + ":" + name.toString())
val rowKey = name+"_"+age;
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name.toString()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age.toString()))
(new ImmutableBytesWritable,put)
}

// filterData.saveAsHadoopDataset(jobConf) //Dataset不含有该方法,RDD的方法
saveData.saveAsHadoopDataset(jobConf)

sparkSession.stop()
}
}

2.使用RDD存储json文件,要解析json文件

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import java.util.regex.Pattern
import org.apache.commons.lang3.StringUtils


def runOldSaveRDD():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()

//hbase 没有加载core-site,hdfs-site报UNKONOW host
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass) //org.apache.hadoop.mapred.JobConf
jobConf.setOutputFormat(classOf[TableOutputFormat]) //org.apache.hadoop.hbase.mapred.TableOutputFormat
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")

val inputData = sparkSession.sparkContext.textFile("file:///D:/code/spark_code/course/data/data.json") //json文件,不好解析

val regex = "[^{}]+";
val pattern = Pattern.compile(regex)

val saveData = inputData.map{ x =>
var name:String = null;
var age:String = null;
val m = pattern.matcher(x)
if (m.find()) {
val strs = m.group()
val datas = strs.split(",")
if (datas.length <= 1) {
val str = datas(0)
if (str.contains("name")) {
name = str.split(":")(1).replaceAll("\"", "")
}
if (str.contains("age")) {
age = str.split(":")(1).replaceAll("\"", "")
}
}else if (datas.length > 1) {
name = datas(0).split(":")(1).replaceAll("\"", "")
age = datas(1).split(":")(1).replaceAll("\"", "")
}
}
val rowKey = name+"_"+age;
println(rowKey)
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
if (StringUtils.isNoneBlank(name)) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
}
if (StringUtils.isNotBlank(age)) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
}
(new ImmutableBytesWritable,put)
}
saveData.saveAsHadoopDataset(jobConf)
}

这里将hbase-site.xml配置文件引入到classpath,主要使用的是旧的saveAsHadoopDataset,目前有新接口saveAsNewAPIHadoopDataset:

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job


def runNewSaveRDD():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
//hbase
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass)
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

val inputData = sparkSession.read.json("D:/code/spark_code/course/data/data.json")
val filterData = inputData.filter("age is not null and name is not null")
val saveData = filterData.rdd.map{ x =>
val age = x.get(0)
val name = x.get(1)
// println(age.toString() + ":" + name.toString())
val rowKey = name+"_"+age;
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name.toString()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age.toString()))
(new ImmutableBytesWritable,put)
}

saveData.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
推荐 1
本文由 平常心 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册