看了下es-hadoop插件的源码:
发现ES导入数据重试情况的发生,除了在es.batch.write.retry.policy参数默认开启且es-hadoop插件向ES集群发送bulk写入请求接受到503响应码会重试3次之外。
本身执行http请求时,也会存在重试(hadoop/rest/NetworkClient.java):
public Response execute(Request request) {
Response response = null;
boolean newNode;
do {
SimpleRequest routedRequest = new SimpleRequest(request.method(), null, request.path(), request.params(), request.body());
newNode = false;
try {
response = currentTransport.execute(routedRequest);
ByteSequence body = routedRequest.body();
if (body != null) {
stats.bytesSent += body.length();
}
} catch (Exception ex) {
// configuration error - including SSL/PKI - bail out
if (ex instanceof EsHadoopIllegalStateException) {
throw (EsHadoopException) ex;
}
// issues with the SSL handshake, bail out instead of retry, for security reasons
if (ex instanceof javax.net.ssl.SSLException) {
throw new EsHadoopTransportException(ex);
}
// check for fatal, non-recoverable network exceptions
if (ex instanceof BindException) {
throw new EsHadoopTransportException(ex);
}
if (log.isTraceEnabled()) {
log.trace(
String.format(
"Caught exception while performing request [%s][%s] - falling back to the next node in line...",
currentNode, request.path()), ex);
}
String failed = currentNode;
failedNodes.put(failed, ex);
newNode = selectNextNode();
log.error(String.format("Node [%s] failed (%s); "
+ (newNode ? "selected next node [" + currentNode + "]" : "no other nodes left - aborting..."),
failed, ex.getMessage()));
if (!newNode) {
throw new EsHadoopNoNodesLeftException(failedNodes);
}
}
} while (newNode);
return response;
}
当请求出现超时的情况时,es-hadoop插件会再请求一个ES节点发送写入请求。即导入插件认为当前插入节点超时了(默认是一分钟)就视为该节点不可用,就换下一个节点,其实是ES在一分钟内没有处理完插入任务。
将超时时间es.http.timeout参数调大之后,给ES留下充足的入库时间,就不会再发生这个问题了。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。