本篇文章给大家分享的是有关Connectors怎样连接ElasticSearch,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1elasticsearch:6.x
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.elasticsearch;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Map;
/**
* @Description 从elasticsearch中获取数据并输出到DataStream数据流中
*/
public class DataStreamSource {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<TUser> dataStream = env.addSource(new RichSourceFunction<TUser>(){
private RestClientBuilder builder = null;
//job开始执行,调用此方法创建数据源连接对象,该方法主要用于打开连接
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
builder = RestClient.builder(new HttpHost("192.168.110.35", 9200, "http"));
}
//执行查询并对数据进行封装
@Override
public void run(SourceContext<TUser> ctx) throws Exception {
Gson gson = new Gson();
RestHighLevelClient client = null;
//匹配查询
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchQuery("sex", 1));
//定义索引库
SearchRequest request = new SearchRequest();
request.types("doc");
request.indices("flink_demo");
request.source(sourceBuilder);
try {
client = new RestHighLevelClient(builder);
SearchResponse response = client.search(request, new Header[]{});
SearchHits hits = response.getHits();
System.out.println("查询结果有" + hits.getTotalHits() + "条");
for (SearchHit searchHits : hits ) {
Map<String,Object> dataMap = searchHits.getSourceAsMap();
TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
ctx.collect(user);
}
//ID查询
// GetRequest request = new GetRequest( "flink_demo","doc","NeMaoXQBElQ9wTD5MOfB");
// client = new RestHighLevelClient(builder);
// GetResponse getResponse = client.get(request, new Header[]{});
// Map<String,Object> dataMap = getResponse.getSourceAsMap();
// TUser user = gson.fromJson(gson.toJson(dataMap), TUser.class);
// ctx.collect(user);
}catch(IOException ioe){
ioe.printStackTrace();
}finally {
if (client != null){
client.close();
}
}
}
//Job结束时调用
@Override
public void cancel() {
try {
super.close();
} catch (Exception e) {
}
builder = null;
}
});
dataStream.print();
env.execute("flink es to data job");
}
}
数据流输出
DataStreamSink.java
package com.flink.examples.elasticsearch;
import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @Description 将DataStream数据流输出到elasticsearch中
*/
public class DataStreamSink {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/elasticsearch.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setParallelism(2);
//1.设置Elasticsearch连接,创建索引数据
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("192.168.110.35", 9200, "http"));
//创建数据源对象 ElasticsearchSink
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<String>(httpHosts,
new ElasticsearchSinkFunction<String>() {
@Override
public void process(String user, RuntimeContext ctx, RequestIndexer indexer) {
Gson gson = new Gson();
Map<String,Object> map = gson.fromJson(user, Map.class);
indexer.add(Requests.indexRequest()
.index("flink_demo")
.type("doc")
.source(map));
}
}
);
// 设置批量写数据的最大动作量,对批量请求的配置;这指示接收器在每个元素之后发出,否则它们将被缓冲
esSinkBuilder.setBulkFlushMaxActions(10);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(500);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(4000);
//2.写入数据到流中
//封装数据
TUser user = new TUser();
user.setId(9);
user.setName("wang1");
user.setAge(23);
user.setSex(1);
user.setAddress("CN");
user.setCreateTimeSeries(System.currentTimeMillis());
DataStream<String> input = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));
//3.将数据写入到Elasticearch中
input.addSink(esSinkBuilder.build());
env.execute("flink data to es job");
}
}
数据展示
以上就是Connectors怎样连接ElasticSearch,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/437309/blog/4943797