spark collect(),当数据量比较大时,卡死怎么解决?

0

初学spark,自己尝试写了个矩阵乘法的小程序。 pair1,pair2分别是两个二元组,记录着一组矩阵的值和编号(矩阵数据是从文件逐行读入, 文件格式是每行有一个数值,要生成行主序的矩阵。 本例中从文件读入100行数据(10*10矩阵),并逐行依次编号0~99, 对应产生100个pair1,2的二元组。 接下来映射成为行号,列号和值的三元组,进一步map,reduce并行的来算举证乘法。 经测试当矩阵很小时,或者是文件很小,像10*10的是没有问题的和串行的矩阵运算结果一致。 但是当计算1000*1000,总共有1000000行数据时,collet()和sortByKey()基本是卡死的。 这个测试程序是放在单机上IDEA上来跑。 同样的问题还发现在搭建的集群上运行其他spark程序,数据量大时, 一回收(collect())也出现类似情况(不是内存溢出。 就这个程序,需要怎么改可以计算1000*1000矩阵,我特意测了个时间发现到reduceByKey()都是 很快的,foreach也可以在终端打印出最后一小部分结果(因为数据量比较大)。 可是一继续后面就会卡死,求解决。 val rdd_pairs1 = sc.parallelize(pairs1) val rdd_pairs2 = sc.parallelize(pairs2) var beg = System.currentTimeMillis() val coordinateAndvalue1 = rdd_pairs1.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value())) val coordinateAndvalue2 = rdd_pairs2.map(a => (a.get_id() / 10, a.get_id() % 10, a.get_value())) val k_v1 = coordinateAndvalue1.map(b => (b._2, b._1, b._3)) val cart = k_v1.cartesian(coordinateAndvalue2).filter(x => (x._1._1 == x._2._1)) val mul = cart.map(y => (y._1._2 + "" + y._2._2, y._1._3 * y._2._3)) val add = mul.reduceByKey((x, y) => x + y) println("time" + (System.currentTimeMillis() - beg)) val sort = add.sortByKey() val col = sort.collect() println("half done") val output_path = "result.txt" val writer = new FileWriter(new File(output_path) , false) for (i <- 0 to 100 - 1) { writer.write(col(i)._1 + " " + col(i)._2 + "\n") } writer.flush() writer.close() println("Finish")
 
已邀请:
1

MarsJ - 大数据玩家~DS 2016-04-08 回答

collect是直接将执行collect的这个RDD是数据加载到内存中执行,如果数据量太大必然会卡死,所以唯一能建议的就是合理使用collect,不要滥用。
0

新叶 2019-09-05 回答

我也是碰到这个问题了

要回复问题请先登录注册