datastream是flink提供给用户使用的用于进行流计算和批处理的api,是对底层流式计算模型的api封装,便于用户编程。
一个完整的datastream运行模型一般由三部分组成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取(也就是从数据源读取,可以批数据源,也可以是流式数据数据源),Transformation主要负责对属于的转换操作(也就是正常的业务处逻辑),Sink负责最终数据的输出(计算结果的导出)。
一般来说,使用datastream api编写flink程序,包括以下流程:
1、获得一个执行环境;(Execution Environment)
2、加载/创建初始数据;(Source)
3、指定转换这些数据;(Transformation)
4、指定放置计算结果的位置;(Sink)
5、触发程序执行(这是流式计算必须的操作,如果是批处理则不需要)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkDemo</groupId>
<artifactId>SparkDemoTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
<!--flink-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<!--下面这是maven打包scala的插件,一定要,否则直接忽略scala代码-->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
有三种类型的执行环境:
1、StreamExecutionEnvironment.getExecutionEnvironment()
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
2、StreamExecutionEnvironment.createLocalEnvironment()
返回本地执行环境,需要在调用时指定默认的并行度。
3、StreamExecutionEnvironment.createRemoteEnvironment()
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
1、env.readTextFile(path)
一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取文件作为数据源
DataStreamSource<String> fileSource = env.readTextFile("/tmp/test.txt");
//3、打印数据
fileSource.print();
//4、启动任务执行
env.execute("test file source");
}
}
2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式来读取文件。这里的fileinputformat可以自定义类
package flinktest;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取文件作为数据源
DataStreamSource<String> fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");
//3、打印数据
fileSource.print();
//4、启动任务执行
env.execute("test file source");
}
}
socketTextStream(host,port)
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、创建环境对象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、读取socket作为数据源
DataStreamSource<String> sourceSocket = env.socketTextStream("127.0.0.1", 1000);
//3、打印数据
sourceSocket.print();
//4、启动任务执行
env.execute("test socket source");
}
}
1、fromCollection(Collection)
从集合中创建一个数据流,集合中所有元素的类型是一致的。
List<String> list = new ArrayList<>();
DataStreamSource<String> sourceCollection = env.fromCollection(list);
2、fromCollection(Iterator)
从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。
3、fromElements(Object)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的
4、generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。读取一定范围的sequnce对象
env.addSource(SourceFuntion)
自定义一个数据源实现类,然后 addSource 到到env中。比如场景的从kafka读取数据,从mysql读取数据
Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。Flink有许多封装在DataStream操作里的内置输出格式。
1、 writeAsText
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。
2、WriteAsCsv
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。
3、print/printToErr
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。
4、 writeUsingOutputFormat
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。
5、writeToSocket
根据SerializationSchema 将元素写入到socket中。
6、stream.addSink(SinkFunction)
使用自定义的sink类
DataStream → DataStream:输入一个参数经过处理产生一个新的参数
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
//这里将每个参数 * 2,然后返回
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
//切割字符串,将处理之后的数据放到 collector 中。
for(String word: value.split(" ")){
out.collect(word);
}
}
});
DataStream → DataStream:计算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
DataStream → KeyedStream:要求输入是tuple,或者是一个复合对象,里面有多个属性(例如student类,里面有name、age等2个以上的属性),反正就是必须有作为key和value的数据。根据key进行分区,相同key的在同一个分区,在内部使用hash实现。
//有不同方式指定key
dataStream.keyBy("someKey") // 指定key的字段名称,常用于复合对象中
dataStream.keyBy(0) // 指定key的位置,常用于tuple中
KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果,也就是每一次聚合的结果都会返回,直到最后一次聚合结束,所以不是只返回最后一个聚合结果。
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
KeyedStream → DataStream
一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
运行结果为:
假设数据源为 (1,2,3,4,5)
结果为:start-1,start-1-2......
KeyedStream →DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。
1、connect:
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
2、coMap、coFlatMap
ConnectedStreams → DataStream:专门用于connect之后的stream操作的map和flatmap算子。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
split:
DataStream → SplitStream:将一个数据流拆分成两个或者多个数据流.并且会给每个数据流起一个别名
select:SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
split.select("even").print();
split.select("odd").print();
DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。这和connect不一样,connect并没有合并多个stream
dataStream.union(otherStream1, otherStream2, ...);
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。