使用Spark计算PV、UV

浏览: 2902

摘文出处:使用Spark计算PV、UV

日志字段格式:

id,ip,url,ref,cookie,time_stamp

把日志文件放到HDFS。仅取了1000行。

hadoop fs -put 1000_log hdfs://localhost:9000/user/root/input  

直接在Scala Shell中读取文件并计算PV。

scala> val textFile = sc.textFile("hdfs://localhost:9000/user/root/input/1000_log")
scala> val textRDD = textFile.map(_.split("\t")).filter(_.length == 6)
scala> val result = textRDD.map(w => ((new java.net.URL(w(2))).getHost,1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false).map(item => item.swap)
scala> result.saveAsTextFile("hdfs://localhost:9000/user/root/out8.txt")

从HDFS上取回结果:

hadoop fs -get hdfs://localhost:9000/user/root/out8.txt

查看结果:

$ more out8.txt/part-00000 
(www.j1.com,126)
(tieba.baidu.com,65)
(113.105.174.5,60)
(www.baidu.com,54)
(mp.weixin.qq.com,46)
(119.147.106.188,40)
(bbs.caoav.net,31)
(i.ifeng.com,19)
(www.amazon.de,18)
(m.zhiyoula.com,18)
(www.360doc.com,16)
(br.pps.tv.iqiyi.com,14)
(www.1905.com,14)
(xa.btfs.ftn.qq.com,14)

如果是生成.snappy压缩格式的文件,则可以按如下方法重定向到本地文本文件。

hadoop fs -text part-r-00001.snappy > filename.txt

下面对同一日志文件计算UV(Unique View)。

UV一般认为不同cookie的访问则算不同的独立用户。

package org.asiainfo

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

/**
* @author:zhaohf@asiainfo.com
* @date:2015年1月27日 下午5:54:39
* @Description: TODO
*/
object UniqueViewCount {

def main(args: Array[String]): Unit = {
if(args.length < 4){
System.err.println("Usage:<input_file> <url_column_index> <output_file>")
System.exit(1)
}
val conf = new SparkConf().setAppName("UniqueViewApp")
val sc = new SparkContext(conf)
val url_index = args(1).toInt
val cookie_index = args(2).toInt
val textRDD = sc.textFile(args(0))
.map(_.split("\t"))
.map(line => (new java.net.URL(line(url_index)).getHost) + "\t" + line(cookie_index))
.distinct()
.map(line => (line.split("\t")(0),1))
.reduceByKey(_ + _)
.map(item => item.swap)
.sortByKey(false)
.map(item => item.swap)
textRDD.saveAsTextFile(args(3))
}

}

sbt package 编译打包。

生成jar文件,提交spark应用。

spark-submit --class main.UniqueViewCount target/scala-2.11/spark_2.11-1.0.jar hdfs://localhost:9000/user/root/intput/1000_log 2 4 hdfs://localhost:9000/user/root/output

结果:

more result.txt
(bbs.caoav.net,31)
(www.baidu.com,28)
(www.amazon.de,15)
(m.zhiyoula.com,15)
(www.360doc.com,14)
(m.sohu.com,11)
(mp.weixin.qq.com,11)
(www.kaixin001.com,10)
(www.zhiyoula.com,7)
(lolbox.duowan.com,7)

下面用shell来验证正确性:

先用python解析出url中的host:

#!/usr/bin/python
from urlparse import urlparse
import sys
with open(sys.argv[1],'r') as f:
for line in f.readlines():
splits = line.split('\t')
url,cookie = urlparse(splits[2]).netloc,splits[4]
print url + '\t' + cookie
 $ python check.py 1000_log > 1000_log_pre
 $ cat 1000_log_pre | sort | uniq | awk -F '\t' '{print $1}' | sort | uniq -c | sort -nr -k1|  head
31 bbs.caoav.net
28 www.baidu.com
15 www.amazon.de
15 m.zhiyoula.com
14 www.360doc.com
11 m.sohu.com
11 mp.weixin.qq.com
10 www.kaixin001.com
7 www.zhiyoula.com

结果正确!

推荐 1
本文由 GeorgeYao 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册