spark学习13(spark RDD)

浏览: 1772

RDD及其特点

1)RDD(Resillient Distributed Dataset)弹性分布式数据集,是spark提供的核心抽象。它代表一个不可变、可分区、里面的元素可并行计算的集合

2)RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作(分布式数据集)

3)RDD通常通过hadoop上的文件,即hdfs文件或者hive表来进行创建,有时也可以通过应用程序中的集合来创建。

4)RDD最重要的特性就是提供了容错性,可以自动从节点失败中恢复过来,即某节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己数据来源重新计算该partition,这一切对使用者是透明的

5)RDD的数据默认情况下存在内存中,但是在内存资源不足是,spark会自动将RDD数据写入磁盘(弹性)

注意:RDD的每个partition,在spark节点上存储时,默认都是放在内存中的,但是如果说内存放不下这么多数据,比如每个节点最多放5w数据,结果每个partition市10w数据,那么就会把partition中的部分数据写入磁盘,进行保存。而上述这一切,对于用户来说,都是完全透明的,也就是不用去管RDD的数据存放在内存还是磁盘,只要关注你针对RDD来进行计算和处理等操作即可。所以说RDD这种自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特点所在

RDD的属性

1)
一组分片(patition)。即数据集的基本单位。用户可在创建RDD时指定RDD的分片个数,若没指定,则使用默认值,为程序所分配到的CPU Core的数目

2)一个计算每个分区的函数。spark中RDD的计算是以分片为单位,每个RDD都会实现compute函数以达到这个目的。

3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间会形成类似流水线一样的前后依赖关系。在分区数丢失时,spark通过这个依赖关系重新计算丢失的分区数据,而不是对RDD所有分区进行重新计算

4)一个partitioner。即RDD的分片函数。当前spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于于key-value的RDD才会有partitioner,非key-value的RDD的partitioner的值是None

5)一个列表,存储存取每个partition的优先位置。对于一个hdfs文件来说,这个列表保存的就是每个partition所在的块位置

创建RDD

进行spark核心编程时,先要创建一个初始的RDD,该RDD中,通常就代表和包含了spark应用程序的输入源数据,然后在创建了初识的RDD之后,才可以通过spark core提供的transformation算子,对RDD进行转换,来获取其他的RDD。

spark core提供了二种创建RDD的方式,包括

1)由已经存在的Scala集合创建RDD

先要启动spark-shell
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24
求和
scala> val sum = rdd1.reduce(_+_)
sum: Int = 55

2)由外部存储系统的数据集创建

使用本地文件创建RDD  

使用HDFS文件创建RDD
scala> val rdd3 = sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt")
rdd3: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/student/2016113012/spark/words.txt MapPartitionsRDD[46] at textFile at :24
统计文本字数
scala> val count = rdd3.map(lines => lines.length()).reduce(_+_)
count: Int = 48

文件内容如下
hello scala
hello java
hello python
hello wujiadong

spark默认会为hdfs的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比他更少

两种RDD操作

spark支持两种RDD操作,transformation和action。transformation操作会针对已有的RDD创建一个新的RDD,而action则主要是对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序

例如:
map是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。

reduce就是一种action操作,它用于对RDD中所有元素尽心聚合操作,并获取一个最终的结果,然后返回给Driver程序

transformation的特点:lazy特性
如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行,即transformation是不会触发spark程序的执行的,只是记录了对RDD所做的操作,但是不会自发的执行,只有当transformation之后,接着执行了了一个action操作,那么所有的transformation才会执行。

spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。

action的特性
action的执行会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,

Transformation

map(func) :返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])) :
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

Action

reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)

RDD及其特点

1)RDD(Resillient Distributed Dataset)弹性分布式数据集,是spark提供的核心抽象。它代表一个不可变、可分区、里面的元素可并行计算的集合

2)RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作(分布式数据集)

3)RDD通常通过hadoop上的文件,即hdfs文件或者hive表来进行创建,有时也可以通过应用程序中的集合来创建。

4)RDD最重要的特性就是提供了容错性,可以自动从节点失败中恢复过来,即某节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己数据来源重新计算该partition,这一切对使用者是透明的

5)RDD的数据默认情况下存在内存中,但是在内存资源不足是,spark会自动将RDD数据写入磁盘(弹性)

注意:RDD的每个partition,在spark节点上存储时,默认都是放在内存中的,但是如果说内存放不下这么多数据,比如每个节点最多放5w数据,结果每个partition市10w数据,那么就会把partition中的部分数据写入磁盘,进行保存。而上述这一切,对于用户来说,都是完全透明的,也就是不用去管RDD的数据存放在内存还是磁盘,只要关注你针对RDD来进行计算和处理等操作即可。所以说RDD这种自动进行内存和磁盘之间权衡和切换的机制,就是RDD的弹性的特点所在

RDD的属性

1)
一组分片(patition)。即数据集的基本单位。用户可在创建RDD时指定RDD的分片个数,若没指定,则使用默认值,为程序所分配到的CPU Core的数目

2)一个计算每个分区的函数。spark中RDD的计算是以分片为单位,每个RDD都会实现compute函数以达到这个目的。

3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间会形成类似流水线一样的前后依赖关系。在分区数丢失时,spark通过这个依赖关系重新计算丢失的分区数据,而不是对RDD所有分区进行重新计算

4)一个partitioner。即RDD的分片函数。当前spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另一个是基于范围的RangePartitioner。只有对于于key-value的RDD才会有partitioner,非key-value的RDD的partitioner的值是None

5)一个列表,存储存取每个partition的优先位置。对于一个hdfs文件来说,这个列表保存的就是每个partition所在的块位置

创建RDD

进行spark核心编程时,先要创建一个初始的RDD,该RDD中,通常就代表和包含了spark应用程序的输入源数据,然后在创建了初识的RDD之后,才可以通过spark core提供的transformation算子,对RDD进行转换,来获取其他的RDD。

spark core提供了二种创建RDD的方式,包括

1)由已经存在的Scala集合创建RDD

先要启动spark-shell
scala> val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24
求和
scala> val sum = rdd1.reduce(_+_)
sum: Int = 55

2)由外部存储系统的数据集创建

使用本地文件创建RDD  

使用HDFS文件创建RDD
scala> val rdd3 = sc.textFile("hdfs://master:9000/student/2016113012/spark/words.txt")
rdd3: org.apache.spark.rdd.RDD[String] = hdfs://master:9000/student/2016113012/spark/words.txt MapPartitionsRDD[46] at textFile at :24
统计文本字数
scala> val count = rdd3.map(lines => lines.length()).reduce(_+_)
count: Int = 48

文件内容如下
hello scala
hello java
hello python
hello wujiadong

spark默认会为hdfs的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比他更少

两种RDD操作

spark支持两种RDD操作,transformation和action。transformation操作会针对已有的RDD创建一个新的RDD,而action则主要是对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序

例如:
map是一种transformation操作,它用于将已有RDD的每个元素传入一个自定义函数,并获取一个新的元素,然后将所有的新元素组成一个新的RDD。

reduce就是一种action操作,它用于对RDD中所有元素尽心聚合操作,并获取一个最终的结果,然后返回给Driver程序

transformation的特点:lazy特性
如果一个spark应用中只定义了transformation操作,那么即使你执行该应用,这些操作也不会执行,即transformation是不会触发spark程序的执行的,只是记录了对RDD所做的操作,但是不会自发的执行,只有当transformation之后,接着执行了了一个action操作,那么所有的transformation才会执行。

spark通过这种lazy特性,来进行底层的spark应用执行的优化,避免产生过多中间结果。

action的特性
action的执行会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,

Transformation

map(func) :返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func):返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func)
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed)
union(otherDataset)
intersection(otherDataset)
distinct([numTasks])) :
groupByKey([numTasks])
reduceByKey(func, [numTasks])
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)

Action

reduce(func)
collect()
count()
first()
take(n)
takeSample(withReplacement, num, [seed])
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)
推荐 0
本文由 邬家栋 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册