温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Elasticsearch reindex及Java使用sliceScorll优化查询的方法

发布时间:2021-06-28 16:17:56 来源:亿速云 阅读:348 作者:chen 栏目:大数据

这篇文章主要讲解了“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”吧!

Reindex会将一个索引的数据复制到另一个已存在的索引,但是并不会复制原索引的mapping(映射)、shard(分片)、replicas(副本)等配置信息。

简单实例如下

POST _reindex
{
  "source": {
    "remote": {
      "host": "http://otherhost:9200", // 远程es的ip和port列表
      "socket_timeout": "1m",
      "connect_timeout": "10s"  // 超时时间设置
    },
    "index": "my_index_name", // 源索引名称
    "query": {         // 满足条件的数据
      "match": {
        "test": "data"
      }
    }
  },
  "dest": {
    "index": "dest_index_name"  // 目标索引名称
  }
}

具体详细的使用参考

ElasticSearch 6.3版本 Document APIs之Reindex API

elasticsearch 基础 —— ReIndex

在java中对于reindexapi没有找到,于是作者采用了别名转换和全Index查询加上bulk插入的方式对于索引进行迁移。

但是转移数据实在太慢,所以使用了slice对scorll查询进行优化

Java slice scorll reindex 基于5.6版本

多线程reindex

具体开启线程数根据Index分片数进行调整,最好和主分片数相同,本例子为五个分片,同时还使用了别名转换对索引进行无缝衔接避免数据正常插入读取

 //建新索引
        createUserRecordIndex(newIndexName, typeName);

        //筛选时间
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        RangeQueryBuilder rangeQueryBuilder;
        rangeQueryBuilder = QueryBuilders.rangeQuery("createTime")
                .gte(DateUtil.format(DateUtil.parse(createBeginDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT))
                .lte(DateUtil.format(DateUtil.parse(createEndDate, DateUtil.LONG_WEB_FORMAT), DateUtil.LONG_WEB_FORMAT));
        boolQueryBuilder.must(rangeQueryBuilder);

        try {
        
           //多线程处理查询请求
           List<Future> list = new ArrayList<>();
            for (int i = 0; i < 5; i++) {
                SliceBuilder sliceBuilder = new SliceBuilder(i, 5);
                SearchResponse response = EsBuildersServiceUtil.getESClient()
                        .prepareSearch(userRecordAlias)
                        .setTypes(userRecordType)
                        .setQuery(boolQueryBuilder)
                        .setSize(1000).setScroll(new TimeValue(10000))
                        .slice(sliceBuilder)
                        .execute()
                        .actionGet();
                SliceQuery sliceQuery = new SliceQuery(newIndexName, typeName, response);
                Future submit = threadPoolTaskExecutor.submit(sliceQuery);
                list.add(submit);
            }
            for (Future future : list) {
                future.get();
            }

        } catch (Exception e) {
            log.error("reindex error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_INDEX_CONVERT_ERROR);
        }


        try {
            //别名转换
            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().removeAlias(oldIndexName, userRecordAlias).execute().actionGet();
            EsBuildersServiceUtil.getESClient().admin().indices().prepareAliases().addAlias(newIndexName, userRecordAlias).execute().actionGet();
        } catch (Exception e) {
            log.error(" convertAlias error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_ALIASES_CONVERT_ERROR);
        }

slice线程

class SliceQuery implements Callable {
        private String newIndexName;
        private String typeName;

        private SearchResponse response;

        private SliceQuery(String newIndexName, String typeName, SearchResponse response) {
            this.newIndexName = newIndexName;
            this.typeName = typeName;
            this.response = response;
        }

        @Override
        public Void call() {
            //获取总数量
            long totalCount = response.getHits().getTotalHits();
            //计算总次数,每次搜索数量为分片数*设置的size大小
            int page = (int) totalCount / 1000;
            operateRecordList(response, newIndexName, typeName);
            for (int i = 0; i < page; i++) {
                //再次发送请求,并使用上次搜索结果的ScrollId
                response = EsBuildersServiceUtil.getESClient().prepareSearchScroll(response.getScrollId())
                        .setScroll(new TimeValue(10000)).execute()
                        .actionGet();
                operateRecordList(response, newIndexName, typeName);
            }
            return null;
        }
    }

批量插入

  /**
     * 从查询数据中获取并批量插入Index
     *
     * @param response
     * @param indexName
     * @param typeName
     */
    private void operateRecordList(SearchResponse response, String indexName, String typeName) {
        try {
            SearchHits hits = response.getHits();
            List<AddUserRecordRequest> list = new ArrayList<>();
            for (SearchHit hit : hits) {
                String sourceAsString = hit.getSourceAsString();
                list.add(JSON.parseObject(sourceAsString, AddUserRecordRequest.class));
            }
            //批量插入
            saveBulkRecord(list, indexName, typeName);
        } catch (Exception e) {
            log.error("operateRecordList error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
        }
    }

    /**
     * 批量插入
     *
     * @param list
     * @param indexName
     * @param typeName
     */
    private void saveBulkRecord(List<AddUserRecordRequest> list, String indexName, String typeName) {
        try {
            BulkRequestBuilder bulkRequest = EsBuildersServiceUtil.getESClient().prepareBulk();
            for (AddUserRecordRequest recordRequest : list) {
                JSONObject json = JSONObject.fromObject(recordRequest);
                bulkRequest.add(EsBuildersServiceUtil.getESClient()
                        .prepareIndex(indexName, typeName)
                        .setSource(json));
            }
            if (list.size() > 0) {
                bulkRequest.execute().actionGet();
            }
        } catch (Exception e) {
            log.error("saveBulkRecord error =", e);
            throw new MembershipDataException(MembershipDataErrorCode.ES_DATA_ADD_ERROR);
        }
    }

感谢各位的阅读,以上就是“Elasticsearch reindex及Java使用sliceScorll优化查询的方法”的内容了,经过本文的学习后,相信大家对Elasticsearch reindex及Java使用sliceScorll优化查询的方法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI