浅谈如何控制Mapreduce的map个数

浏览: 3470
1.Block 块
文件上传到HDFS中,第一步就是数据的划分,这个是真实物理上的划分,数据文件上传到HDFS后,要把文件划分成一块一块,每块的大小按照hdfs-site.xml里配置选项进行划分,数据一般需要备份,默认的dfs.replication为3。  
namenode中配置文件路径为/etc/hadoop/ 
hdfs-site.xml中没有设置dfs.block.size,则默认大小为64M
   
    若要进行配置修改(当前集群不够大,所以不需要调整默认块大小),需要如下:<property> 
   <name>dfs.block.size</name> 
   <value>67108864</value> 
   <description>The defaultblock sizefornew files.</description>
</property>  <property> 
   <name>dfs.replication</name> 
   <value>3</value> 
   <description>
     Defaultblock replication.The actual number ofreplications can be specified
     whenthe file iscreated.The defaultisused if replication isnotspecified increatetime. 
   </description>
</property>
  getSpits方法中通过如下方式获得:long blockSize = file.getBlockSize();

2.Split分块文件上传至HDFS之前,InputFormat接口的getSplits方法会将文件划分切割成为若干个可序列化的split


1.png



一个大数据的文件一般要划分为若干个split,因此,处理一个split的时间远远小于处理整个大数据文件的时间,

根据木桶效应,整个Map处理的速度则是由群集中所有运行map节点的最慢的那个节点决定,如果将splits分成

较为细粒度的数据大小,而同时对不同的节点计算机根据其速度分配splits个数,可以获得更好的负载均衡。




(1) 一般大多数的split与HDFS中的block大小相同,都为64M,这样做的好处是使得Map可以在存储有当前数据的

节点上运行本地的任务,而不需要通过网络进行跨节点的任务调度。

(2) 如果一个Map中所需要的数据大于一个block的大小64M的时候,那么部分数据很可能存储到别的节点上,处理

的时候必然会通过网络跨节点数据传输,则Map无疑是增加了等待时间从而降低了Map处理效率

(3)如果一个Map中所需要的数据小于block的大小64M的时候,那么会对当前节点block容量的浪费,同时是增加了split的个数

,Map对split进行计算并且上报结果,关闭当前计算打开新的split均需要耗费资源,这样无疑也降低了Map处理效率

3.InputFormat计算Split
调整Split思路如下图:


2.png


疑问:hive表强制分桶后现在有80个约120M的HDFS文件,可能偏大或者偏小,假如当前split分片为128M。
         当前HDFS文件在不足128M的情况下,分片会不会跨两个文件?
         当前HDFS文件在大大超过128M的情况下,分片会分是否会分成多个文件?

下面由InputFileFormat接口进行确认:
 a.获取最小设置
   getSplits方法:long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
   //从配置文件和job设置中寻找最大的数据作为minSize
 
3.png


   默认大小为1
   可以在mr主类中进行设置,设置每个task处理的文件大小,但是这个大小只有在大于block_size的时候才会生效。
   FileInputFormat.setMinInputSplitSize(conf, 1048576); //1M
  b.获取最大设置  
  getSplits方法: long maxSize = getMaxSplitSize(job);
 
4.png


   默认大小为long类型数据的MAX_VALUE-- 9223372036854775807
   可以在mr主类中进行设置,暂无具体优化作用
   FileInputFormat.setMaxInputSplitSize(conf, 1048576); //1M
  c.计算分片大小
   下例为FileInputFormat.class中的computeSplitSize
   
   
5.png


10000000
134217728
  D.文件的Split切分 
  getSplits方法:中的如下代码说明,输入的文件路径下的文件对象作循环后,按照单个文件内的状态等因素进行Split处理的。
  在处理这个文件之前,需要判断该文件是否大小为0,文件状态是否为LocatedFileStatus的实例,同时还需要判断该文件是否可以切块。
  


6.png



  
   getSplits方法:中的如下代码说明,先通过computeSplitSize方法计算splitsize。在通过循环,如果当剩余文件大小/splitsize>SPLIT_SLOP
   (private static final double SPLIT_SLOP = 1.1;   // 10% slop),则从当前文件上切分出split大小为splitsize的分块。当若干次循环后,
   剩余文件大小/splitsize<1.1 则将剩余的部分作为一个split
    
7.png

 

4.结论和优化方案
  关于split的简单的结论
  (1)一个split大于等于1的整数个Block
  (2) 一个split不会包含两个File的Block,不会跨越File边
  (3) split和Block的关系是一对多的关系,默认一对一
  (4) maptasks的个数最终决定于splits的个数
  
  
  初步的设想:
  现在hive T表表分桶,共有80个hdfs文件,每个大小120M左右,我们可以暂且将Splitsize设置为128M,并控制分桶个数进一步细化,
  比如分100个桶,也就是100个hdfs文件,进一步减少每个文件大大小,为以后文件数据量的扩展打好基础。这样我们就保证每个
  HDFS文件的大小小于一个Splitsize,这样就可以保证每个HDFS文件可以交给一个maptask来执行。
  
现将桶数增加到100/160/200,桶字段为分支机构号+商户号+卡号 均发现有部分分桶文件数目较大,是一般文件的两倍。
大小在118M~89M左右,考虑到现在block_size = 64M ,splitsize要远远大于file_szie
11.png

说明split的速度大增


12.png




商户号先行进行处理,进行字段统一
先按照机构号+商户号+卡号做顺序排列,并过滤掉一分部数据
按照机构号+商户号+卡号做分桶
保证一个机构号、一个商户号、一个卡号的数据在一个分桶,也就是一个hdfs文件当中。

分桶的个数,需要参考实际数据量的大小和map中计算存在内存中的大小进行操作。
同时也要考虑块大小和map的个数。
最好做到一个map处理一个hdfs文件,也就是一个分桶文件,这样所有的map中,
因为分桶设计的原因,不会出现一个机构号,一个商户号,一个卡号的数据在两个桶中,
自然也不会出现在两个数据文件中,也不会丢给2个map文件进行处理,故可以直接针对
该商户下的卡号进行count.

然后在map函数中设置变量
在buket中设置变量,若卡号和之前的变量有差距,则替换改变量,
在每次做切换的时候,将卡号数据量记录1,若不变换的时候,则记录为0


在1800W数据一天的情况下,数据处理的时间约为25秒~~

 
推荐 2
本文由 曾力 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册