spark SQL学习(数据源之parquet)

浏览: 1186

Parquet是面向分析型业务得列式存储格式

编程方式加载数据

代码示例

package wujiadong_sparkSQL

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by Administrator on 2017/2/3.
*/

object ParquetLoadData {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ParquetLoadData")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val usersDF = sqlContext.read.parquet("hdfs://master:9000/student/2016113012/spark/users.parquet")
usersDF.registerTempTable("t_users")
//查询name
val usersNameDF = sqlContext.sql("select name from t_users")
//转换成RDD并执行相关操作
usersNameDF.rdd.map(row => "Name:"+row(0)).collect().foreach(username => println(username))

}

}

运行结果

hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.ParquetLoadData  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
17/02/03 14:36:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/03 14:36:02 INFO Slf4jLogger: Slf4jLogger started
17/02/03 14:36:03 INFO Remoting: Starting remoting
17/02/03 14:36:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:40895]
17/02/03 14:36:07 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17/02/03 14:36:20 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
17/02/03 14:36:21 INFO CodecPool: Got brand-new decompressor [.snappy]
Name:Alyssa
Name:Ben
17/02/03 14:36:21 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/02/03 14:36:21 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

自动分区

hadoop@master:~$ hadoop fs -mkdir /student/2016113012/spark/users
hadoop@master:~$ hadoop fs -mkdir /student/2016113012/spark/users/gender=male/
hadoop@master:~$ hadoop fs -mkdir /student/2016113012/spark/users/gender=male/country=us
hadoop@master:~/wujiadong$ hadoop fs -put users.parquet /student/2016113012/spark/users/gender=male/country=us
hadoop@master:~/wujiadong$ spark-submit --class wujiadong_sparkSQL.ParquetPartitionTest  --executor-memory 500m --total-executor-cores 2 /home/hadoop/wujiadong/wujiadong.spark.jar
17/02/03 15:13:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/02/03 15:13:43 INFO Slf4jLogger: Slf4jLogger started
17/02/03 15:13:43 INFO Remoting: Starting remoting
17/02/03 15:13:44 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.131:37709]
17/02/03 15:13:46 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
17/02/03 15:13:59 INFO deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
17/02/03 15:13:59 INFO CodecPool: Got brand-new decompressor [.snappy]
+------+--------------+----------------+------+-------+
| name|favorite_color|favorite_numbers|gender|country|
+------+--------------+----------------+------+-------+
|Alyssa| null| [3, 9, 15, 20]| male| us|
| Ben| red| []| male| us|
+------+--------------+----------------+------+-------+

17/02/03 15:14:00 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
17/02/03 15:14:00 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

自动推断出了性别和国家

合并元数据

1)读取parquet文件时,将数据源的选项mergeSchema,设置为true
2)使用SQLContext.setConf()方法,将spark.sql.parquet.mergeSchema参数设置为true

案例:合并学生的基本信息和成绩的元数据

推荐 0
本文由 邬家栋 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册