Flink-基础计数wordcount

浏览: 2001

flink最简单的程序就是wordcount,即对单词进行计次计算。flink分流处理和批处理,流处理调用DataStream-api进行处理,批处理调用DataSet-api进行处理,DataSet、DataStream都是是flink底层的api。

一、流式处理:

新建一个数据流,使用StreamExecutionEnvironment的方法创建一个数据流:

package flink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author luyufeng
* @date 2019-01-18 14:17
* @description ……
*/
public class WordsData {
public static final String[] WORDS = new String[] {
"To be, or not to be,that is the question:",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks"
};

//DataStream 数据源 流式处理
public static DataStream<String> getDefaultTextLine(StreamExecutionEnvironment env){
return env.fromElements(WORDS);
}
}

写主程序,进行计数

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author luyufeng
* @date 2019-01-18 14:19
* @description ……
*/
public class WordsCount {

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataStream<String> text;

if(params.has("input")){
text = env.readTextFile(params.get("input"));
}else {
text = WordsData.getDefaultTextLine(env);
}

DataStream<Tuple2<String,Integer>> count = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String values, Collector<Tuple2<String, Integer>> out) throws Exception {
String [] words = values.split(" ");
for(String word:words){
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(0).sum(1);

if(params.has("output")){
count.writeAsText(params.get("output"));
}else {

count.print();
}
//流处理打印需要调用execute
env.execute("Word Count");
}
}

主要是调用DataStream的flatMap方法进行数据拆分,然后使用 keyBy(0)进行分组,其中0表示Tuple2的第一列,然后sum(1)对第二列进行求和。

实时处理的结果是不断更新的,所以会存在多条输出结果。

image.png

二、批量处理:

文本数据会批量塞到DataSet里面,这时候的DataSet就是一次性的数据流。

package flink;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
/**
* @author luyufeng
* @date 2019-01-18 14:17
* @description ……
*/
public class WordsData {
public static final String[] WORDS = new String[] {
"To be, or not to be,that is the question:",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,",
"No more; and by a sleep to say we end",
"The heartache, and the thousand natural shocks"
};

//DataSet 数据源 批处理
public static DataSet<String> getDefaultTextLine(ExecutionEnvironment env){
return env.fromElements(WORDS);
}
}

对批量数据进行计数处理:

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
/**
* @author luyufeng
* @date 2019-01-18 14:19
* @description ……
*/
public class WordsCount {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text;

if(params.has("input")){
text = env.readTextFile(params.get("input"));
}else {
text = WordsData.getDefaultTextLine(env);
}
DataSet<Tuple2<String,Integer>> count = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String [] words = s.split(" ");
for(String word:words){
collector.collect(new Tuple2<String,Integer>(word,1));
}
}
}).groupBy(0).sum(1);

if(params.has("output")){
count.writeAsText(params.get("output"));
env.execute("Word Count");
}else {
//批处理print不需要再调用execute
count.print();
}
}
}

同样是调用DataSet的flatMap方法进行数据映射,然后再进行分组,这时候分组使用的是groupBy(0)进行分组,这个是和实时api有差异的地方。然后继续进行sum(1)。

离线处理的结果每个key只有一个结果。

image.png

flink入门的demo非常容易上手,看完瞬间就会开发流式处理和批处理了。

推荐 0
本文由 brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册