spark streaming支持zeromq消息源,看了官方文档只是提供一个Class ZeroMQUtils的介绍,不是很明白,有没有大神有这方面的经验?

0
已邀请:
0

regan - run! run! run! happy runner! 我是奔跑的小米~ 2017-02-21 回答

我只给你说思想:SparkStreaming中会通过ZeroMQUtils启动一个Receiver(或者多个),从ZeroMq消费拉取数据到Spark内存,每一个batchInterval提交Job。按理说不难,之前做过sparkStreaming消费kafka数据,至于zeroMQ应该类似。

val kafkaStream = KafkaUtils.createDirectStream[String, String]( stream, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val results = kafkaStream.map(record => (record.key, record.value)).map(data=>{ val record = data._2 record }).flatMap(data => data.split(" ")).map(data => (data, 1)).reduceByKey(_ + _).updateStateByKey[Int](updateFunction _)
看看官网例子应该能解决

要回复问题请先登录注册