[Apache Spark][基本架构] RDD特性(二)- 转换和行动

浏览: 1961

image.png

其实Spark架构不大,但是真的一环扣着一环,拆开来讲会有很多问题,但是当每个环节都搞懂后就觉得一切设计的又是非常合理(非常绕口的一段话).这几篇基本架构的文章都是根据Spark的原始论文拆出来讲的,试图将Spark的基本精神跟各位分享.

        Spark主要是透过RDD来处理资料,对于RDD的操作大致可以分为两类:转换(Transformations)以及行动(Actions).转换指的是不同RDD之间的变化,例如map(对于每个RDD内地元素执行相同的指令),filter(筛选RDD内的元素),特色是输入和输出的物件都是RDD.另外一种操作则是“提取”RDD内的元素出来,像是count(计算RDD内的元素个数),reduce(计算key-value),特色是输入的物件是RDD,但是输出的结果可能是list,int,数值,或是存成HDFS Files.(完整转换和行动如下表):

image.png

转换和行动除了输入和输出上的差别之外,也区分了Spark在运作时的排程.如果透过REPL介面来执行Spark(也就是输入spark-shell后进入的介面),会明显发现,RDD的运作并非是即时生效的--当我们的指令是转换时,Spark只是记录每个转换前后RDD的依赖关系(dependency),一定要等等到行动发生,Spark才会将从行动开始往前推的所有步骤(包括从HDFS读入档案)一起compile,一起执行,例如下面范例:

scala> val data = sc.parallelize(1 to 100)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :12
scala> val data_map = data.map(_ + 1)
data_map: org.apache.spark.rdd.RDD[Int] = MappedRDD[1] at map at :14
scala> val data_filter = data_map.filter(_ < 50)
data_filter: org.apache.spark.rdd.RDD[Int] = FilteredRDD[2] at filter at :16

我们这边一开始创建了一个RDD,里面放了1到100,Spark时候只告诉我们创建了一个ParallelCollectionRDD(Spark基本上会帮我们判断要建立哪种RDD).接着透过map建立了一个MappedRDD,最后是一个FilterRDD.这三个RDD之间互有依赖关系,但是都还没有实际建立.直到我们下了reduce指令:

scala> val data_result = data_filter.reduce(_ + _)

14/12/02 00:23:03 INFO SparkContext: Starting job: reduce at :18
14/12/02 00:23:03 INFO DAGScheduler: Got job 0 (reduce at :18) with 4 output partitions (allowLocal=false)
14/12/02 00:23:03 INFO DAGScheduler: Final stage: Stage 0(reduce at :18)
14/12/02 00:23:03 INFO DAGScheduler: Parents of final stage: List()
14/12/02 00:23:03 INFO DAGScheduler: Missing parents: List()
14/12/02 00:23:03 INFO DAGScheduler: Submitting Stage 0 (FilteredRDD[2] at filter at :16), which has no missing parents
14/12/02 00:23:03 INFO MemoryStore: ensureFreeSpace(1936) called with curMem=0, maxMem=278019440
14/12/02 00:23:03 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1936.0 B, free 265.1 MB)
14/12/02 00:23:03 INFO DAGScheduler: Submitting 4 missing tasks from Stage 0 (FilteredRDD[2] at filter at :16)
14/12/02 00:23:03 INFO TaskSchedulerImpl: Adding task set 0.0 with 4 tasks
14/12/02 00:23:03 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, localhost, PROCESS_LOCAL, 1152 bytes)
14/12/02 00:23:03 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
14/12/02 00:23:03 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
14/12/02 00:23:03 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/12/02 00:23:03 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
14/12/02 00:23:03 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 600 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 701 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 600 bytes result sent to driver
14/12/02 00:23:03 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 701 bytes result sent to driver
14/12/02 00:23:03 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 49 ms on localhost (1/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 55 ms on localhost (2/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 57 ms on localhost (3/4)
14/12/02 00:23:04 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 69 ms on localhost (4/4)
14/12/02 00:23:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/12/02 00:23:04 INFO DAGScheduler: Stage 0 (reduce at :18) finished in 0.083 s
14/12/02 00:23:04 INFO SparkContext: Job finished: reduce at :18, took 0.373505684 s

data_result: Int = 1224

上图Highlight红色的地方是reduce的指令和结果,中间是Spark运作的过程,这篇文章先不细节进去谈中间到底怎样运作.重点在于,前面的RDD之间的转换Spark都是采取懒惰模式(lazy operation),一直要到Action才会开始执行整个程式.所以一支Spark从读入档案到产出结果,可以有好几个档案来源生成不同RDD,这些RDD可以互相Join,Map,或Filter但是只会有一个Action,Spark一碰到Action的指令就会将前后相关的RDD转换一并做处理.

        Spark采取懒惰模式的理由在于为了能更妥善分配系统的资源,将可以原地处理的资料尽量原地处理,减少档案搬运的时间.所以才会等所有RDD流程(Transformations部分)都结束后(亦即进入Action的程序),才开始执行.

推荐 1
本文由 bryanyang 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册