1.DataFrame转化为RDD
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
def runOldSaveDataFrame():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
//hbase
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass) //org.apache.hadoop.mapred.JobConf
jobConf.setOutputFormat(classOf[TableOutputFormat]) //org.apache.hadoop.hbase.mapred.TableOutputFormat
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")
//第一种:采用DataFrame方式
val inputData = sparkSession.read.json("D:/code/spark_code/course/data/data.json")
// inputData.printSchema()
//删除空值和NaN
// inputData.na.drop()
//删除指定列的空值
// inputData.na.drop(Array("age"))
//替换指定列空值
// inputData.na.fill(0l, cols=Array("age"))
//多列匹配
// inputData.na.fill(Map("age" -> 0l,"name" -> "N"))
// For implicit conversions like converting RDDs to DataFrames
// import sparkSession.implicits._
//
val filterData = inputData.filter("age is not null and name is not null")
val saveData = filterData.rdd.map{ x =>
val age = x.get(0)
val name = x.get(1)
// println(age.toString() + ":" + name.toString())
val rowKey = name+"_"+age;
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name.toString()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age.toString()))
(new ImmutableBytesWritable,put)
}
// filterData.saveAsHadoopDataset(jobConf) //Dataset不含有该方法,RDD的方法
saveData.saveAsHadoopDataset(jobConf)
sparkSession.stop()
}
}
2.使用RDD存储json文件,要解析json文件
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import java.util.regex.Pattern
import org.apache.commons.lang3.StringUtils
def runOldSaveRDD():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
//hbase 没有加载core-site,hdfs-site报UNKONOW host
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass) //org.apache.hadoop.mapred.JobConf
jobConf.setOutputFormat(classOf[TableOutputFormat]) //org.apache.hadoop.hbase.mapred.TableOutputFormat
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")
val inputData = sparkSession.sparkContext.textFile("file:///D:/code/spark_code/course/data/data.json") //json文件,不好解析
val regex = "[^{}]+";
val pattern = Pattern.compile(regex)
val saveData = inputData.map{ x =>
var name:String = null;
var age:String = null;
val m = pattern.matcher(x)
if (m.find()) {
val strs = m.group()
val datas = strs.split(",")
if (datas.length <= 1) {
val str = datas(0)
if (str.contains("name")) {
name = str.split(":")(1).replaceAll("\"", "")
}
if (str.contains("age")) {
age = str.split(":")(1).replaceAll("\"", "")
}
}else if (datas.length > 1) {
name = datas(0).split(":")(1).replaceAll("\"", "")
age = datas(1).split(":")(1).replaceAll("\"", "")
}
}
val rowKey = name+"_"+age;
println(rowKey)
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
if (StringUtils.isNoneBlank(name)) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
}
if (StringUtils.isNotBlank(age)) {
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age))
}
(new ImmutableBytesWritable,put)
}
saveData.saveAsHadoopDataset(jobConf)
}
这里将hbase-site.xml配置文件引入到classpath,主要使用的是旧的saveAsHadoopDataset,目前有新接口saveAsNewAPIHadoopDataset:
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HConstants, HBaseConfiguration}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
def runNewSaveRDD():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
//hbase
val hbaseConf = HBaseConfiguration.create()
val jobConf = new JobConf(hbaseConf,this.getClass)
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "sparkHbase")
val job = Job.getInstance(jobConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val inputData = sparkSession.read.json("D:/code/spark_code/course/data/data.json")
val filterData = inputData.filter("age is not null and name is not null")
val saveData = filterData.rdd.map{ x =>
val age = x.get(0)
val name = x.get(1)
// println(age.toString() + ":" + name.toString())
val rowKey = name+"_"+age;
val put = new Put(Bytes.toBytes(rowKey.hashCode()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name.toString()))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age.toString()))
(new ImmutableBytesWritable,put)
}
saveData.saveAsNewAPIHadoopDataset(job.getConfiguration)
}