基于Flume -> Kafka -> Stream(Spark)的架构日志收集demo搭建(part2)

浏览: 5174

4.安装Spark并配置

 由于也是从头开始学习,从Spark的sampl开始,使用Python做1、2个简单例子,完成示例中的wordcount和SparkStream的count

 具体参考 http://spark.apache.org/docs/latest/spark-standalone.html

(1)下载并配置JAVA环境

 下载 Spark 1.5.1,注意选择使用集成hadoop的版本,不然默认启动不了,不需要安装预先安装scala

 机器名为sparktest

 

 解压到$HOME/spark-1.5.1-bin-hadoop2.6目录
 cd $HOME/spark-1.5.1-bin-hadoop2.6
 cp conf/spark-env.sh.template conf/spark-env.sh
  增加
 export JAVA_HOME=/home/biuser/jdk1.7.0_17

(2)启动spark

cd $HOME/spark-1.5.1-bin-hadoop2.6
./sbin/start-master.sh
./sbin/start-slave.sh spark://sparktest:7077
打开 http://sparknode:8080/检测

(3)启动python shell执行woorcount smaple

  cd $HOME/spark-1.5.1-bin-hadoop2.6
  ./bin/pyspark

  由于我的机器上Python是2.7的版本,存在问题,改成使用python3

修改./bin/pyspark
if hash python2.7 2>/dev/null; then
  # Attempt to use Python 2.7, if installed:
  #DEFAULT_PYTHON="python2.7"
  DEFAULT_PYTHON="python3"
else
      DEFAULT_PYTHON="python"
fi

根据 QuickStart编辑一个test/wordcount.py

内容如下:

import sys
from pyspark import SparkContext
if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    sc = SparkContext(appName="Rongwordcount")
    textFile = sc.textFile("README.md")
    filecount=textFile.count()
    print("start-----xxxxxxxxxxxxxxxxxxxx-------------------------")
    print(filecount)
    print("end------xxxxxxxxxxxxxxxxxxx-----------------------------")
    textFile.first()
    linesWithSpark = textFile.filter(lambda line: "Spark" in line)
    textFile.filter(lambda line: "Spark" in line).count()
    print("-----------------------------------------------")
    sc.stop()
bin/spark-submit test/wordcount.py sparktest:9092 test1
      

5.使用Spark Stream连接Kafka

使用已有的examples/src/main/python/streaming/kafka_wordcount

调用的命令如下,其中sparktest:9092为Kafka的broker地址 test为subject名字

cd $HOME/spark-1.5.1-bin-hadoop2.6
bin/spark-submit --jars \
      external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.5.1.jar \
      examples/src/main/python/streaming/kafka_wordcount.py \
      sparktest:9092 test

      

      

其中spark-streaming-kafka-assembly_2.10-1.5.1.jar 在如下地址需要下载

http://search.maven.org/#search|ga|1|spark-streaming-kafka-assembly

注意选择2.10版本的,目前的spark版本使用scala2.10版本编译,不能使用2.11对应的jar包。

在console会有很多信息,其中有些信息就将Kafka的信息打印出来了。

我修改了conf/log4j.properties将日志级别从INFO改成WARN,就可以看得比较清楚了

#log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console
推荐 1
本文由 seng 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册