Flink开发前基础准备

浏览: 2610

一、基础开发环境准备:

idea、maven、kafka本地环境安装配置好了即可。

idea配置pom.xml文件,flink的主要依赖包加上

      <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink-version}</version>
<!--<scope>provided</scope>-->
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink-version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink-version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.9 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>${flink-version}</version>
</dependency>

<!--CEP-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>${flink-version}</version>
</dependency>

二、flink程序的基础组成部分

1.创建或者设置运行环境:

StreamExecutionEnvironment类对应3个方法都可以设置运行环境

getExecutionEnvironment()

createLocalEnvironment()

createRemoteEnvironment(String host, int port, String... jarFiles)

使用getExecutionEnvironment()即可,如果你在本地ide内部运行一个flink程序,它会创建本地环境,如果是在集群上运行flink程序,它会设置集群运行模式。

2.加载、创建初始化的数据,或者引用数据源:

addSource、readTextFile 等方法读取文件、消息队列

3.特定此数据的转换:

使用map、reduce、join等操作数据,进行数据转换

4.指定放置计算结果的位置:

将处理好的数据存储至指定位置 writeAsText(path: String) 或者 addSink() 到指定数据库 

5.触发执行程序:

datastream.print() 或者 env.execute();

三、熟悉source、Transformation、sink对应api

1.Sources:

基于文件:

readCsvFile(path) - 读取CSV文件(仅dataSet)
readTextFile(path) - 读取文本文件
readFile(fileInputFormat, path) - 根据指定的文件输入格式读取文件(一次)

基于集合:

fromCollection(Collection) - 从Java的Java.util.Collection创建数据流
fromCollection(Iterator, Class) - 从一个迭代器中创建数据流。Class指定了该迭代器返回元素的类型。
fromElements(T ...) - 从给定的对象序列中创建数据流。所有对象类型必须相同。
fromParallelCollection(SplittableIterator, Class) - 从一个迭代器中创建并行数据流。Class指定了该迭代器返回元素的类型
generateSequence(from, to) -创建一个生成指定区间范围内的数字序列的并行数据流。

基于 Socket:

socketTextStream() - 从socket读取。元素可以用分隔符切分(仅dataStream)

自定义源:

addSource() -添加一个新数据流源(仅dataStream)

2.Transformation:

transform过程有很多操作,主要是看业务操作需求,以下是DataStream的一些api,对应DataSet也有相应的api但不一定与DataStream的一致:

一般使用到比较多的是:FilterMapReduceKeyBy

Map :DataStream → DataStream

读入一个元素,返回转换后的一个元素。一个把输入流转换中的数值翻倍的map function:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});

FlatMap :DataStream → DataStream

读入一个元素,返回转换后的0个、1个或者多个元素。一个将句子切分成单词的flatmap function:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});

Filter :DataStream → DataStream

对读入的每个元素执行boolean函数,并保留返回true的元素。一个过滤掉零值的filter:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});

KeyBy :DataStream → KeyedStream

逻辑上将流分区为不相交的分区,每个分区包含相同key的元素。在内部通过hash分区来实现。
该transformation返回一个KeyedDataStream。与dataSet映射的分组api为:groupBy
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple

Reduce:KeyedStream → DataStream

在一个KeyedStream上不断进行reduce操作。将当前元素与上一个reduce后的值进行合并,再返回新合并的值。 
一个构造局部求和流的reduce function:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});

Fold:KeyedStream → DataStream

在一个KeyedStream上基于初始值不断进行变换操作,将当前值与上一个变换后的值进行变换,再返回新变换的值。 
在序列(1,2,3,4,5)上应用如下的fold function,返回的序列依次是“start-1”,“start-1-2”,“start-1-2-3”, ...:
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});

Aggregations:KeyedStream → DataStream

在一个KeyedStream上不断聚合。min和minBy的区别是min返回最小值,而minBy返回在该字段上值为最小值的所有元素(对于max和maxBy相同)。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

Window:KeyedStream → WindowedStream

Windows可定义在已分区的KeyedStreams上。Windows会在每个key对应的数据上根据一些特征(例如,在最近5秒内到达的数据)进行分组。 有关Windows的完整说明请参阅windows。
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data

Window也会包含reduce、fold等函数就不列出来了。

Union:DataStream* → DataStream

联合(Union)两个或多个数据流,创建一个包含来自所有流的所有元素的新的数据流。注意:如果DataStream和自身联合,那么在结果流中每个元素你会拿到两份。
dataStream.union(otherStream1, otherStream2, ...);

Window Join:DataStream,DataStream → DataStream

在给定的key和公共窗口上连接(Join)两个DataStream。
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});

Window CoGroup:DataStream,DataStream → DataStream

在给定的key和公共窗口上CoGroup两个DataStream。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});

Connect:DataStream,DataStream → ConnectedStreams

“串联”(Connect)两个DataStream并保留各自类型。串联允许两个流之间共享状态。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);

Split:DataStream → SplitStream

根据一些标准将流分成两个或更多个流。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});

Select:SplitStream → DataStream

在一个SplitStream上选择一个或多个流。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");

Extract Timestamps:DataStream → DataStream

对于按照事件时间进行统计比较关键的一个api,从记录中提取时间戳,以便在窗口中使用事件时间语义。请参阅Event Time。
stream.assignTimestamps (new TimeStampExtractor() {...});

Iterate:DataStream → IterativeStream → DataStream

通过将一个operator的输出重定向到某个先前的operator,在流中创建“反馈”循环。这对于需要不断更新模型的算法特别有用。 以下代码以流开始,并持续应用迭代体。大于0的元素将回送到反馈通道,其余元素发往下游。相关完整描述请参阅iterations。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Integer value) throws Exception {
return value <= 0;
}
});

这些都是DataStream的相关接口和函数,如果要了解批处理的相关函数,可以再去了解一下DataSet

3.Sink

数据最后进行持久化,sink 消费 DataStream 的数据,并将结果写入文件、socket、外部系统或进行打印,这些都在DataStream之后进行操作:

writeAsText() / TextOutputFormat - 将元素以字符串形式按行写入。通过调用每个元素的 toString() 方法获得字符串。
writeAsCsv(...) / CsvOutputFormat - 将元组写入逗号分隔的csv文件。行和字段隔符均可配置。通过调用每个元素的 toString() 方法获得每个字段的字符串。
print() / printToErr() - 打印每个元素的 toString() 值到标准输出/错误输出流。可以配置前缀信息添加到输出,以区分不同 print 的结果。如果并行度大于1,则 task id 也会添加到输出结果的前缀上。
writeUsingOutputFormat() / FileOutputFormat - 自定义文件输出的方法/基类。支持自定义的对象到字节的转换。
writeToSocket - 根据 SerializationSchema 把元素写到 socket 。
addSink - 调用自定义 sink function 。
Flink自带了很多连接其他系统的 connectors(如 Apache Kafka ),这些connectors都实现了 sink function 。

最后物理分区:

在一个transformation之后,可以对DataStream进行流分区
使用一个用户自定义Partitioner确定每个元素对应的目标task。
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
按照均匀分布以随机的方式对元素进行分区。
dataStream.shuffle();
以round-robin方式对元素进行分区,使得每个分区负载均衡。在数据倾斜的情况下进行性能优化(全局)
dataStream.rebalance();
以round-robin方式对元素分区到下游operations。如果你想从source的每个并行实例分散到若干个mappers以负载均衡(局部本地操作)
dataStream.rescale();
广播元素到每个分区。
dataStream.broadcast();

以上了解差不多的话,可以开始准备第一个flink程序的开发了(要具备scala或者java的基本功)

推荐 0
本文由 brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。
转载、引用前需联系作者,并署名作者且注明文章出处。
本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。

0 个评论

要回复文章请先登录注册