这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
通过使用Flink DataStream Connectors 数据流连接器连接到RabbitMq消息队列中间件,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSource; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 从MQ中获取数据并输出到DataStream流中 */ public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); final DataStream<String> stream = env .addSource(new RMQSource<String>( connectionConfig, "test", true, new SimpleStringSchema())) .setParallelism(1); stream.print(); env.execute("flink rabbitMq source"); } }
数据流输出
DataStreamSink.java
package com.flink.examples.rabbitmq; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.rabbitmq.RMQSink; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; /** * @Description 将DataStream流中的数据输出到rabbitMq队列中 */ public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder() .setHost("127.0.0.1") .setPort(5672) .setUserName("admin") .setPassword("admin") .setVirtualHost("datastream") .build(); String [] words = new String[]{"props","student","build","name","execute"}; final DataStream<String> stream = env.fromElements(words); stream.addSink(new RMQSink<String>(connectionConfig,"test",new SimpleStringSchema())); env.execute("flink rabbitMq sink"); } }
数据展示
感谢各位的阅读!关于“Flink中Connectors如何连接RabbitMq”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。