对比MR,spark编写计算要简洁很多,代码如下:
import org.apache.spark.sql.SparkSession
object App {
def main(args: Array[String]): Unit = {
//测试最大最小值
// testMaxMin
//测试平均值
// testAvg
//测试Top N
testTopN
}
def testMaxMin:Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val data = sparkSession.sparkContext.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2))
// 方法一
val result = data.map(x =>("key",x)).groupByKey().map(x => {
var min = Integer.MAX_VALUE
var max = Integer.MIN_VALUE
for(num <- x._2){
if(num>max){
max = num
}
if(num<min){
min = num
}
}
(min,max)
}).collect().foreach(x => {
println("min : " + x._1)
println("max : " + x._2)
})
// 方法二
val max = data.reduce((a,b) => Math.max(a, b))
val min = data.reduce((a,b) => Math.min(a, b))
val test = data.reduce{
(a,b) => {
println(a,b) //输入对比如下:(10,7) (10,3)
Math.max(a, b)
}
}
println("max : " + max +"\n" + "min : " + min)
sparkSession.stop()
}
def testAvg() : Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val foo = sparkSession.sparkContext.parallelize(List(Tuple2("a", 2), Tuple2("a", 3), Tuple2("b", 2), Tuple2("b", 8)))
// a的平均值 = sum(v)/a的次数,构建 (a,sum(a),count(a))
val pairs = foo.map(x => (x,1)) //((a,2),1)
val pairSum = pairs.map(x => (x._1._1,(x._1._2,x._2)) ) //(a,(2,1))
val count = pairSum.reduceByKey((x,y) => (x._1+y._1,x._2+y._2)) //(b,(10,2))
val avg = count.map(x => (x._1,x._2._1/x._2._2.toDouble))
avg.collect().foreach(println)
val result = foo.combineByKey(
(v) => (v,1),
(acc:(Int,Int),v) =>(acc._1+v,acc._2+1),
(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(k,v) => (k,v._1/v._2.toDouble)}
result.collect().foreach(println)
}
def testTopN():Unit = {
val sparkSession = SparkSession.builder()
.appName("App")
.master("local[*]")
.config("spark.sql.warehouse.dir", "file:///D:/code/spark_code/course/")
.getOrCreate()
val foo = sparkSession.sparkContext.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2)
))
// top 2
val groupSourt = foo.groupByKey().map(x =>{
val key = x._1
val values = x._2
val sortValue = values.toList.sortWith(_ > _).take(2)
(key,sortValue)
})
// groupSourt.collect().foreach(println) //(a,List(4, 3))
//数据转换
groupSourt.flatMap{
case(key,numbers) => numbers.map { x => (key,x) }
}.foreach(println)
}
}