这篇文章主要讲解了“flink连接消费kafka实例”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“flink连接消费kafka实例”吧!
package flink.streaming import java.util.Properties import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.windowing.time.Time object StreamingTest { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些属性 kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092") //所在的消费组 kafkaProps.setProperty("group.id", "group1") //获取当前的执行环境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //kafka的consumer,test1是要消费的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema,kafkaProps) //设置从最新的offset开始消费 kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的时间间隔 evn.enableCheckpointing(5000) //添加consumer val stream = evn.addSource(kafkaSource) stream.setParallelism(3) val text = stream.flatMap{ _.toLowerCase().split("\\W+")filter { _.nonEmpty} } .map{(_,1)} .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) text.print() //启动执行 evn.execute("kafkawd") } }
//
pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>hgs</groupId> <artifactId>flink_lesson</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>flink_lesson</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.1</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.7.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.7.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.7.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.7.1</version> </dependency> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.32.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <!-- 我运行这个jar所运行的主类 --> <mainClass>hgs.flink_lesson.WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef> <!-- 必须是这样写 --> jar-with-dependencies </descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.0</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <!-- <arg>-make:transitive</arg> --> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <!-- If you have classpath issue like NoDefClassError,... --> <!-- useManifestOnlyJar>false</useManifestOnlyJar --> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> </plugins> </build> </project>
感谢各位的阅读,以上就是“flink连接消费kafka实例”的内容了,经过本文的学习后,相信大家对flink连接消费kafka实例这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。