4.安装Spark并配置
由于也是从头开始学习,从Spark的sampl开始,使用Python做1、2个简单例子,完成示例中的wordcount和SparkStream的count
具体参考 http://spark.apache.org/docs/latest/spark-standalone.html
(1)下载并配置JAVA环境
下载 Spark 1.5.1,注意选择使用集成hadoop的版本,不然默认启动不了,不需要安装预先安装scala
机器名为sparktest
解压到$HOME/spark-1.5.1-bin-hadoop2.6目录
cd $HOME/spark-1.5.1-bin-hadoop2.6
cp conf/spark-env.sh.template conf/spark-env.sh
增加
export JAVA_HOME=/home/biuser/jdk1.7.0_17
(2)启动spark
cd $HOME/spark-1.5.1-bin-hadoop2.6
./sbin/start-master.sh
./sbin/start-slave.sh spark://sparktest:7077
打开 http://sparknode:8080/检测
(3)启动python shell执行woorcount smaple
cd $HOME/spark-1.5.1-bin-hadoop2.6
./bin/pyspark
由于我的机器上Python是2.7的版本,存在问题,改成使用python3
修改./bin/pyspark
if hash python2.7 2>/dev/null; then
# Attempt to use Python 2.7, if installed:
#DEFAULT_PYTHON="python2.7"
DEFAULT_PYTHON="python3"
else
DEFAULT_PYTHON="python"
fi
根据 QuickStart编辑一个test/wordcount.py
内容如下:
import sys
from pyspark import SparkContext
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
sc = SparkContext(appName="Rongwordcount")
textFile = sc.textFile("README.md")
filecount=textFile.count()
print("start-----xxxxxxxxxxxxxxxxxxxx-------------------------")
print(filecount)
print("end------xxxxxxxxxxxxxxxxxxx-----------------------------")
textFile.first()
linesWithSpark = textFile.filter(lambda line: "Spark" in line)
textFile.filter(lambda line: "Spark" in line).count()
print("-----------------------------------------------")
sc.stop()
bin/spark-submit test/wordcount.py sparktest:9092 test1
5.使用Spark Stream连接Kafka
使用已有的examples/src/main/python/streaming/kafka_wordcount
调用的命令如下,其中sparktest:9092为Kafka的broker地址 test为subject名字
cd $HOME/spark-1.5.1-bin-hadoop2.6
bin/spark-submit --jars \
external/kafka-assembly/target/spark-streaming-kafka-assembly_2.10-1.5.1.jar \
examples/src/main/python/streaming/kafka_wordcount.py \
sparktest:9092 test
其中spark-streaming-kafka-assembly_2.10-1.5.1.jar 在如下地址需要下载
http://search.maven.org/#search|ga|1|spark-streaming-kafka-assembly
注意选择2.10版本的,目前的spark版本使用scala2.10版本编译,不能使用2.11对应的jar包。
在console会有很多信息,其中有些信息就将Kafka的信息打印出来了。
我修改了conf/log4j.properties将日志级别从INFO改成WARN,就可以看得比较清楚了
#log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console