本篇内容介绍了“flinksql env的定义”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
1、编写 pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>flinksqldemo</artifactId> <version>1.0-SNAPSHOT</version> <properties> <!-- Encoding --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <scala.binary.version>2.11</scala.binary.version> <scala.version>2.11.8</scala.version> <kafka.version>0.10.2.1</kafka.version> <flink.version>1.12.0</flink.version> <hadoop.version>2.7.3</hadoop.version> <!-- scope 本地调试时注销 设定为默认的 compile 打包时设定为 provided --> <setting.scope>compile</setting.scope> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--flink start--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!--<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>--> <!-- flink end--> <!-- kafka start --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- kafka end--> <!-- hadoop start --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <scope>${setting.scope}</scope> </dependency> <!-- hadoop end --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.72</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>29.0-jre</version> </dependency> </dependencies> </project>
2、编写代码
package com.jd.data; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.BatchTableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkTableApiDemo { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt"); // 1、创建表执行环节 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // ============================================== // 1.1 老版本planner的流式查询 EnvironmentSettings set = EnvironmentSettings.newInstance() .useOldPlanner() //用老版本 .inStreamingMode() //流式处理 .build(); // 老版本的流式处理执行环境 StreamTableEnvironment oldStreamingEnv = StreamTableEnvironment.create(env, set); // 1.2 老版本批处理环境 ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment); // ========================================================= // 1.3 blink 版本的流式查询 EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment blinkTableEnv = StreamTableEnvironment.create(env, settings); // 1.4 blink 版本的批处理查询 EnvironmentSettings bsettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build(); TableEnvironment blinkBatchTableEnvironment = TableEnvironment.create(settings); } }
“flinksql env的定义”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。