spark集群中的节点可以只处理自身独立数据库里的数据,然后汇总吗?

0
我将spark搭建在两台机器上,其中一台既是master又是slave,另一台是slave,两台机器上均装有独立的mongodb数据库。我是否可以让它们只统计自身数据库的内容,然后将结果汇总到一台服务器上的数据库里?目前我的代码如下,但是最终只统计了master里的数据,另一个worker没有统计上。

val config = new Configuration() //以下代码表示只统计本机数据库上的数据,猜测问题可能出在这里 config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/local.test") //统计结果输出到服务器上 config.set("mongo.output.uri", "mongodb://103.25.23.80:60013/test_hao.result") val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) // Input contains tuples of (ObjectId, BSONObject) val countsRDD = mongoRDD.flatMap(arg => { var str = arg._2.get("type").toString str = str.toLowerCase().replaceAll("[.,!?\n]", " ") str.split(" ") }) .map(word => (word, 1)) .reduceByKey((a, b) => a + b) // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null val saveRDD = countsRDD.map((tuple) => { var bson = new BasicBSONObject() bson.put("word", tuple._1) bson.put("count", tuple._2.toString() ) (null, bson) }) // Only MongoOutputFormat and config are relevant saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
 
已邀请:

要回复问题请先登录注册