Flink-流拆分和流合并

浏览: 9307

如果我们有一个kafka-topic写了不同的数据,我们需要根据这个输入数据进行不同的处理逻辑,但是最终结果需要合并在一个流里面,那么这个时候即可用这么一波操作。

1、使用split方法进行数据分流,把分流信息存到新建的OutputSelector对象里面

2、调用split stream的select方法把 split stream 转成 data stream

3、调用data stream的union方法进行流合并,你可以输入多个data stream

按照这几个步骤基本就完成了流的拆分和合并,每个单独的流你可以自定义自己的operator

package flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.List;

/**
* @author luyufeng
* @date 2019-01-23 11:04
* @description ……
*/
public class SplitUnionCount {

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream text;
//判断输入文件,否则直接获取words类的内容
if(params.has("input")){
text = env.readTextFile(params.get("input"));
}else {
text = env.fromElements(WordsData.WORDS);
}

SplitStream<Tuple2<String,Integer>> split = text.flatMap(new FlatMapFunction<String,Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String,Integer>> collector) throws Exception {
//按照单词进行分割
String [] words = s.split("\\W+");
for(String word:words){
collector.collect(new Tuple2<>(word,1));
}
}
}).split(new OutputSelector<Tuple2<String,Integer>>() {
@Override
public Iterable<String> select(Tuple2<String, Integer> values) {
//使用arraylist存储分流信息
List<String> output = new ArrayList<>();
//按照第一列单词是否包含the进行分组
if(values.f0.equals("the")){
output.add("a");
}else {
output.add("b");
}
return output;
}
});

//把分组a的split流转成data stream流
DataStream<Tuple2<String, Integer>> stream1= split.select("a").keyBy(0).sum(1);
//把分组b的split流转成data stream流
DataStream<Tuple2<String, Integer>> stream2 = split.select("b").keyBy(0).sum(1);

//union合并a、b两个数据流
DataStream<Tuple2<String, Integer>> stream = stream1.union(stream2).keyBy(0).sum(1);

if(params.has("output")){
stream.writeAsText(params.get("output"));
}else {
//输出包含the的统计
stream1.print();
//输出合并流之后的所有统计
stream.print();
}
env.execute("split union stream");
}
}

执行上面的代码,优先输出分流后stream1的数据,然后继续输出合并之后的stream数据

image.png

推荐 0
本文由 brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册