合理设置Spark数据分区

浏览: 2599

在Hadoop的map-reduce编程模型中,框架要做的第一步事情,也是对数据进行分块切分,然后对每块数据调用mapper程序进行处理,mapper处理完将结果交给reducer进行第二阶段的处理。

在Spark中,也是同样的逻辑。Spark会先对数据进行分区处理,然后在每个分区数据上调用一个task执行线程来执行相应的计算。

在使用Python调用Spark的API的时候,首先就是加载数据,使用sc.textFile()方法来调用。sc.textFile()除了文件路径外,还有另外两个非常实用的参数:

minPartitions: 最少分区数目 
use_unicode: 文件使用的编码

尤其是minPartitions这个参数,可以强行指定数据最少的分区数目,增加这个数值,就会降低每个分区的数据量。对于小型数据,但大运算量的程序(机器学习中常用的算法挖掘),如果不手动指定最少分区数,系统会根据默认的数据分区策略,可能只会将数据分成很少的一个或者几个分区,此时尽管还有大量空闲可用CPU资源,系统也会根据相应的分区数目来调用相应的一个或者几个task来执行。

此时,并没有达到真正将计算并行的效果。就像现在有一块非常大的石头,要从A地搬运到B地,老板派了几十个人来做这件事情,可是按照默认的分块机制,只将当前的大石头分成了两个小块,由两个人来搬运(其它人也帮不上忙)。剩下的几十个人只能干望着,使不上力。而那两个人却一直非常累。如果将大石头分成二十个小块,由二十个人同时搬运,此时每个人估计就只有非常小的一块了,基本上可以说在路上飞起来了,自然总的搬运时间就会快很多。

在yarn模式下,Spark一般会根据Hadoop文件的块数据大小(fs.local.block.size)来划分默认的分区数目,关于详细的实验,请参考文章:http://www.bigsynapse.com/spark-input-output 中的实验。

除在在读取文件的时候指定分区数目,还可以在RDD数据集的基础上调用方法来设置分区数目,对于从Hive或者其它数据源中读取的数据,可以使用这种方式。

Spark提供了两个修改分区的方法:

coalesce: 合并分区 
repartition: 重新分配分区数目

coalesce在Hive和SparkSQL中也出现过,相当于MySQL中ifnull的判断,取出第一个非null的值,但应用在RDD上,却是一个合并分区的transofrm。从名字上可知,合并分区主要是降低分区数目。一般会用在filter方法之后,因为filter之后,以前一些分区数据可能已经很少了,此时执行合并有利用资源的利用。

除此之外,两者还有几点异同:

  1. 两者都能用于增加或者减少分区数目,coalesce要增加分区数目,必须设置其第二个参数shuffle=True。
  2. repartition(num)实际上就相当于coalesce(num, shuffle=True)。
  3. 因为要shuffle数据,开销比较大,所以如果只是减少分区,最好用coalesce,这样避免数据shuffle操作。


公众号支持改名了,改成“全栈数据”了。以前的名字“云戒云”太小众化了。这次把Logo也换了,欢迎各位继续关注,后面尽可能会时常更新一些简短的内容,慢慢积累。

主题还是没有变,全栈数据技术,涉及:Linux、Python、SQL、大数据、Hadoop、Spark、数据分析、数据挖掘、机器学习、深度学习、Mac、Emacs。

qr_code_new.jpg

推荐 0
本文由 云戒 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册