这几天在测试SparkStreaming,连接Kafka一直报这个错,
18/08/30 21:09:00 ERROR Utils: Uncaught exception in thread stdout writer for python
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.<init>(Ljava/io/InputStream;Z)V
at org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:122)
at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:163)
at org.apache.spark.serializer.SerializerManager.wrapStream(SerializerManager.scala:124)
at org.apache.spark.shuffle.BlockStoreShuffleRjareader$$anonfun$2.apply(BlockStoreShuffleReader.scala:50)
连接同事用java写进去的都OK,用client端或NiFi都抱着个错.
开始一直怀疑jar包版本的问题, 用了最新的spark-streaming-kafka-0-8-assembly_2.10-2.2.2.jar 也是出错.
最后查到这个文档,设置这个参数就可以了:
.config("spark.io.compression.codec", "snappy")
修改了wordcount例子就可以了:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
#sc = SparkContext(appName="PythonStreamingKafkaWordCount")
conf = SparkConf().setAppName("PythonStreamingKafkaWordCount").set('spark.io.compression.codec','snappy')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 20)
调用命令:
bin/spark-submit --jars /pythontest/scripts/spark-streaming-kafka-0-8-assembly_2.10-2.2.2.jar examples/src/main/python/streaming/kafka_wordcount30.py testnode:2181 sengtest