spark学习12(Wordcount程序之spark-shell)

浏览: 1623

在目录/home/hadoop/2016113012下有文件words.txt

hello scala
hello java
hello python
hello wujiadong

上传该文件到hdfs

hadoop@slave01:~/2016113012$ hadoop fs -put /home/hadoop/2016113012/words.txt /student/2016113012/spark
hadoop@slave01:~/2016113012$ hadoop fs -lsr /student/2016113012

启动spark shell


12为spark local模式,因为没有指定master地址

方式1:不设置任何参数
hadoop@master:~$ spark-shell
方式2:设置相关参数
hadoop@master:~$ spark-shell --executor-memory 2g --total-executor-cores 2 --executor-cores 1
方式3:指定master地址(暂时没用到过,用到再写)


注释:
--executor-memory 2g:指定每个worker可用内存为2g
--total-executor-cores 2:指定整个集群使用的cup核数为2
--executor-cores:每个executor使用的cpu核数


Spark Shell中已经默认将SparkContex类初始化为对象sc。用户代码如果需用到,直接使用sc即可

在spark shell中使用Scala编写spark程序

scala> val fileRDD = sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt")
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at :15

scala> val wordRDD = fileRDD.flatMap(_.split(" "))
wordRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at flatMap at :17

scala> val wordPair = wordRDD.map((_,1))
wordPair: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at :19

scala> val result = wordPair.reduceByKey(_+_)
17/03/04 21:08:37 INFO FileInputFormat: Total input paths to process : 1
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at :21

scala> result.sortBy(_._2,false)
res1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[9] at sortBy at :24

scala> result.sortBy(_._2,false).collect()
17/03/04 21:09:49 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/03/04 21:09:49 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/03/04 21:09:49 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/03/04 21:09:49 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/03/04 21:09:49 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
res2: Array[(String, Int)] = Array((hello,4), (scala,1), (wujiadong,1), (python,1), (java,1))
scala> result.sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/wordcount_out")
17/03/04 21:11:03 INFO FileOutputCommitter: Saved output of task 'attempt_201703042111_0005_m_000000_4' to hdfs://master:9000/wordcount_out/_temporary/0/task_201703042111_0005_m_000000


查看运行的结果
hadoop@master:~$ hadoop fs -ls hdfs://master:9000/wordcount_out
17/03/04 21:12:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2017-03-04 21:11 hdfs://master:9000/wordcount_out/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 54 2017-03-04 21:11 hdfs://master:9000/wordcount_out/part-00000
hadoop@master:~$ hadoop fs -text hdfs://master:9000/wordcount_out/part-00000
17/03/04 21:14:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(hello,4)
(scala,1)
(wujiadong,1)
(python,1)
(java,1)


一行写完
scala> sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res9: Array[(String, Int)] = Array((scala,1), (wujiadong,1), (python,1), (hello,4), (java,1))
//或者输出到hdfs
scala> sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://master:9000/spark_out")


说明:
sc是SparkContext对象,该对象是提交spark程序的入口。spark shell中已经默认将SparkContext类初始化为对象sc,可以直接使用sc
textFile()是hdfs中读取数据
flatMap(_.spli" ")) 先map再压平
map((_,1))将单词和1构成元组
reduceByKey(_+_)按照key进行reduce,并将value累加
sortBy(_._2,false):按值进行排序
saveAsTextFile("")将结果写入到hdfs中
推荐 0
本文由 邬家栋 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册