这篇文章主要讲解了“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”吧!
Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。
简单实例如下
POST _reindex
{
"source": {
"remote": {
"host": "http://otherhost:9200", // 远程es的ip和port列表
"socket_timeout": "1m",
"connect_timeout": "10s" // 超时时间设置
},
"index": "my_index_name", // 源索引名称
"query": { // 满足条件的数据
"match": {
"test": "data"
}
}
},
"dest": {
"index": "dest_index_name" // 目标索引名称
}
}
具体详细的使用参考
ElasticSearch 6.3版本 Document APIs之Reindex API
elasticsearch 基础 —— ReIndex
在java中对于reindexapi没有找到,于是作者采用了别名转换和全Index查询加上bulk插入的方式对于索引进行迁移。
但是转移数据实在太慢,所以使用了slice对scorll查询进行优化
多线程reindex
具体开启线程数根据Index分片数进行调整,最好和主分片数相同,本例子为五个分片,同时还使用了别名转换对索引进行无缝衔接避免数据正常插入读取
//建新索引
createUserRecordIndex(newIndexName, typeName);
//筛选时间
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
RangeQueryBuilder rangeQueryBuilder;
rangeQueryBuilder = QueryBuilders.rangeQuery("createTime")
.gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT))
.lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT));
boolQueryBuilder.must(rangeQueryBuilder);
try {
//多线程处理查询请求
List<Future> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
SliceBuilder sliceBuilder = new SliceBuilder(i, 5);
SearchResponse response = EsBuildersServiceUtil.getESClient()
.prepareSearch(userRecordAlias)
.setTypes(userRecordType)
.setQuery(boolQueryBuilder)
.setSize(1000).setScroll(new TimeValue(10000))
.slice(sliceBuilder)
.execute()
.actionGet();
SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response);
Future submit = threadPoolTaskExecutor.submit(sliceQuery);
list.add(submit);
}
for (Future future : list) {
future.get();
}
} catch (Exception e) {
log.error("reindex error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR);
}
try {
//别名转换
EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet();
EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet();
} catch (Exception e) {
log.error(" convertAlias error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR);
}
slice线程
class SliceQuery implements Callable {
private String newIndexName;
private String typeName;
private SearchResponse response;
private SliceQuery(String newIndexName, String typeName, SearchResponse response) {
this.newIndexName = newIndexName;
this.typeName = typeName;
this.response = response;
}
@Override
public Void call() {
//获取总数量
long totalCount = response.getHits().getTotalHits();
//计算总次数,每次搜索数量为分片数*设置的size大小
int page = (int) totalCount / 1000;
operateRecordList(response, newIndexName, typeName);
for (int i = 0; i < page; i++) {
//再次发送请求,并使用上次搜索结果的ScrollId
response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId())
.setScroll(new TimeValue(10000)).execute()
.actionGet();
operateRecordList(response, newIndexName, typeName);
}
return null;
}
}
批量插入
/**
* 从查询数据中获取并批量插入Index
*
* @param response
* @param indexName
* @param typeName
*/
private void operateRecordList(SearchResponse response, String indexName, String typeName) {
try {
SearchHits hits = response.getHits();
List<AddUserRecordRequest> list = new ArrayList<>();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class));
}
//批量插入
saveBulkRecord(list, indexName, typeName);
} catch (Exception e) {
log.error("operateRecordList error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
}
}
/**
* 批量插入
*
* @param list
* @param indexName
* @param typeName
*/
private void saveBulkRecord(List<AddUserRecordRequest> list, String indexName, String typeName) {
try {
BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk();
for (AddUserRecordRequest recordRequest : list) {
JSONObject json = JSONObject.fromObject(recordRequest);
bulkRequest.add(EsBuildersServiceUtil.getESClient()
.prepareIndex(indexName, typeName)
.setSource(json));
}
if (list.size() > 0) {
bulkRequest.execute().actionGet();
}
} catch (Exception e) {
log.error("saveBulkRecord error =", e);
throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
}
}
感谢各位的阅读,以上就是“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”的内容了,经过本文的学习后,相信大家对Elasticsearch reindex及Java使用sliceScorll优化查询的方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/u/4054187/blog/3114436