spark2.0中求最值,平均值,TopN编写

浏览: 2414

对比MR,spark编写计算要简洁很多,代码如下:

import org.apache.spark.sql.SparkSession

object App {
def main(args: Array[String]): Unit = {
//测试最大最小值
// testMaxMin

//测试平均值
// testAvg

//测试Top N
testTopN
}

def testMaxMin:Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val data = sparkSession.sparkContext.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2))

// 方法一
val result = data.map(x =>("key",x)).groupByKey().map(x => {
var min = Integer.MAX_VALUE
var max = Integer.MIN_VALUE
for(num <- x._2){
if(num>max){
max = num
}
if(num<min){
min = num
}
}
(min,max)
}).collect().foreach(x => {
println("min : " + x._1)
println("max : " + x._2)
})

// 方法二
val max = data.reduce((a,b) => Math.max(a, b))
val min = data.reduce((a,b) => Math.min(a, b))

val test = data.reduce{
(a,b) => {
println(a,b) //输入对比如下:(10,7) (10,3)
Math.max(a, b)
}
}

println("max : " + max +"\n" + "min : " + min)

sparkSession.stop()
}

def testAvg() : Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val foo = sparkSession.sparkContext.parallelize(List(Tuple2("a", 2), Tuple2("a", 3), Tuple2("b", 2), Tuple2("b", 8)))

// a的平均值 = sum(v)/a的次数,构建 (a,sum(a),count(a))
val pairs = foo.map(x => (x,1)) //((a,2),1)
val pairSum = pairs.map(x => (x._1._1,(x._1._2,x._2)) ) //(a,(2,1))
val count = pairSum.reduceByKey((x,y) => (x._1+y._1,x._2+y._2)) //(b,(10,2))
val avg = count.map(x => (x._1,x._2._1/x._2._2.toDouble))
avg.collect().foreach(println)


val result = foo.combineByKey(
(v) => (v,1),
(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(k,v) => (k,v._1/v._2.toDouble)}
result.collect().foreach(println)
}

def testTopN():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val foo = sparkSession.sparkContext.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2)
))

// top 2
val groupSourt = foo.groupByKey().map(x =>{
val key = x._1
val values = x._2
val sortValue = values.toList.sortWith(_ > _).take(2)
(key,sortValue)
})
// groupSourt.collect().foreach(println) //(a,List(4, 3))
//数据转换
groupSourt.flatMap{
case(key,numbers) => numbers.map { x => (key,x) }
}.foreach(println)
}

}
推荐 1
本文由 平常心 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册