小编给大家分享一下Hive Metastore客户端自动重连机制的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
本文基于Hive2.1.0的Apache社区版,目的是为了探究Metastore和底层RDBMS和底层服务变更(例如版本升级、服务迁移等运维操作)对客户端和用户的影响。Hive提供了在客户端对Metastore连接超时自动重连的容错机制,允许我们通过调整参数配置调整停服时间限制,在规定时间内重启服务对用户无显著影响。由于Metastore底层RDBMS我们采用的是业内通用的Mysql,因此后面以Mysql来替代RDBMS进行描述和验证
参数 | 默认值 | 说明 | 配置范围 |
---|---|---|---|
hive.metastore.connect.retries | 3 | 客户端建立与metastore连接时的重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.failure.retries | 1 | 客户端访问metastore的失败重试次数 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.connect.retry.delay | 1s | Metastore客户端重连/重试等待的时间 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.timeout | 600s | Metastore客户端socket超时时间,传递给底层Socket,超时之后底层Socket会自动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.metastore.client.socket.lifetime | 0 | socket存活时间,超时之后客户端在下一次访问Metastore时会主动断开现有连接并重新建立连接,0表示不主动断开 | Metastore客户端,如CLI、Hiveserver2等 |
hive.hmshandler.retry.attempts | 10 | 在JDO数据存储出现错误后尝试连接的次数 | Metastore |
hive.hmshandler.retry.interval | 2000ms | JDO连接尝试间隔,单位:ms | Metastore |
hive.server2.thrift.client.connect.retry.limit | 1 | 客户端建立与Hiveserver2连接的重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.limit | 1 | 客户端访问Hiveserver2的失败重试次数 | Hiveserver2的客户端,如Beeline等 |
hive.server2.thrift.client.retry.delay.seconds | 1s | Hiveserver2客户端重连/重试等待的时间 | Hiveserver2的客户端,如Beeline等 |
为了弄清这两个参数的区别,让我们通过源码来确认一下,ps:为了方便阅读后面会用......省略掉无关的代码逻辑
CLI和Hiveserver2都是通过org.apache.hadoop.hive.ql.metadata.Hive类与Metastore的交互的。首先让我们以createDatabase(Database, boolean)方法为例来看看具体的交互过程
/** * Create a database * @param db * @param ifNotExist if true, will ignore AlreadyExistsException exception * @throws AlreadyExistsException * @throws HiveException */ public void createDatabase(Database db, boolean ifNotExist) throws AlreadyExistsException, HiveException { try { getMSC().createDatabase(db); } catch (AlreadyExistsException e) { if (!ifNotExist) { throw e; } } catch (Exception e) { throw new HiveException(e); } } /** * @return the metastore client for the current thread * @throws MetaException */ @LimitedPrivate(value = {"Hive"}) @Unstable public synchronized IMetaStoreClient getMSC( boolean allowEmbedded, boolean forceCreate) throws MetaException { if (metaStoreClient == null || forceCreate) { ...... try { metaStoreClient = createMetaStoreClient(allowEmbedded); } catch (RuntimeException ex) { ...... } ...... } return metaStoreClient; }
Hive类维护了一个IMetaStoreClient对象,通过getMSC()方法获取,getMSC()方法在这里采用了懒汉模式去创建,接下来看下Hive是如何创建一个IMetaStoreClient对象的
// org.apache.hadoop.hive.ql.metadata.Hive.java private IMetaStoreClient createMetaStoreClient(boolean allowEmbedded) throws MetaException { ...... if (conf.getBoolVar(ConfVars.METASTORE_FASTPATH)) { return new SessionHiveMetaStoreClient(conf, hookLoader, allowEmbedded); } else { return RetryingMetaStoreClient.getProxy(conf, hookLoader, metaCallTimeMap, SessionHiveMetaStoreClient.class.getName(), allowEmbedded); } }
if后面的分支用于创建客户端内置的本地Metastore,这主要用于开发调试阶段,因此我们只关注else后面的逻辑,即通过RetryingMetaStoreClient.getProxy方法创建一个IMetaStoreClient对象。RetryingMetaStoreClient.getProxy方法通过几次简单地调用重载函数,最终来到下面的方法
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public static IMetaStoreClient getProxy(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, String mscClassName) throws MetaException { @SuppressWarnings("unchecked") Class<? extends IMetaStoreClient> baseClass = (Class<? extends IMetaStoreClient>)MetaStoreUtils.getClass(mscClassName); RetryingMetaStoreClient handler = new RetryingMetaStoreClient(hiveConf, constructorArgTypes, constructorArgs, metaCallTimeMap, baseClass); return (IMetaStoreClient) Proxy.newProxyInstance( RetryingMetaStoreClient.class.getClassLoader(), baseClass.getInterfaces(), handler); }
可以看到,这里利用Java代理机制创建并返回了一个IMetaStoreClient的代理——RetryingMetaStoreClient,此后对IMetaStoreClient对象的调用都委托给RetryingMetaStoreClient.invoke 处理,接下来让我们看下RetryingMetaStoreClient.invoke方法是如何处理用户对IMetastoreClient对象的操作的
// org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.java public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object ret = null; int retriesMade = 0; TException caughtException = null; while (true) { try { reloginExpiringKeytabUser(); // 1. 检查是否重连,重连的场景包括: // a) 上一次循环访问Metastore异常,且异常类型支持自动重试访问 // b) 底层socket超时,超时参数:hive.metastore.client.socket.lifetime if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) { base.reconnect(); lastConnectionTime = System.currentTimeMillis(); } if (metaCallTimeMap == null) { ret = method.invoke(base, args); } else { // need to capture the timing long startTime = System.currentTimeMillis(); ret = method.invoke(base, args); long timeTaken = System.currentTimeMillis() - startTime; addMethodTime(method, timeTaken); } // 2. 访问Metastore正常,返回结果给上层调用并结束循环,用户不主动结束的情况下底层与Metastore的连接持续保持着 break; // 3. 处理访问Metastore过程中出现的异常,异常主要分三类: // a) 用户操作异常或元数据异常,将异常抛给用户处理并结束循环 // b) 底层连接异常,例如网络问题、Metastore服务异常(停服、连接超限等)等支持自动重连,进入步骤4 // c) 其他未知异常,抛给用户处理并结束循环 } catch (UndeclaredThrowableException e) { throw e.getCause(); } catch (InvocationTargetException e) { Throwable t = e.getCause(); if (t instanceof TApplicationException) { TApplicationException tae = (TApplicationException)t; switch (tae.getType()) { case TApplicationException.UNSUPPORTED_CLIENT_TYPE: case TApplicationException.UNKNOWN_METHOD: case TApplicationException.WRONG_METHOD_NAME: case TApplicationException.INVALID_PROTOCOL: throw t; default: caughtException = tae; } } else if ((t instanceof TProtocolException) || (t instanceof TTransportException)) { caughtException = (TException)t; } else if ((t instanceof MetaException) && t.getMessage().matches( "(?s).*(JDO[a-zA-Z]*|TProtocol|TTransport)Exception.*") && !t.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = (MetaException)t; } else { throw t; } } catch (MetaException e) { if (e.getMessage().matches("(?s).*(IO|TTransport)Exception.*") && !e.getMessage().contains("java.sql.SQLIntegrityConstraintViolationException")) { caughtException = e; } else { throw e; } } // 4. 对于支持自动重试的异常,会记录重试次数并验证次数是否超限,是则返回异常并结束循环,否则将以warn形式输出异常日志提醒并等等一段时间后开始下一次循环自动重试访问Metastore。这里用到的重试次数参数和等待时间参数分别是 hive.metastore.failure.retries,hive.metastore.client.connect.retry.delay if (retriesMade >= retryLimit) { throw caughtException; } retriesMade++; Thread.sleep(retryDelaySeconds * 1000); } return ret; } protected RetryingMetaStoreClient(HiveConf hiveConf, Class<?>[] constructorArgTypes, Object[] constructorArgs, ConcurrentHashMap<String, Long> metaCallTimeMap, Class<? extends IMetaStoreClient> msClientClass) throws MetaException { this.retryLimit = hiveConf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES); this.retryDelaySeconds = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); this.metaCallTimeMap = metaCallTimeMap; this.connectionLifeTimeInMillis = hiveConf.getTimeVar( HiveConf.ConfVars.METASTORE_CLIENT_SOCKET_LIFETIME, TimeUnit.MILLISECONDS); ...... this.base = (IMetaStoreClient) MetaStoreUtils.newInstance( msClientClass, constructorArgTypes, constructorArgs); }
从 RetryingMetaStoreClient 的构造函数中可以发现,RetryingMetaStoreClient 维护了一个 HiveMetaStoreClient 对象,用户在上层调用一次 RetryingMetaStoreClient 对象操作,例如第一步的 createDatabase 方法,会经过 RetryingMetaStoreClient.invoke 的封装最终调用HiveMetaStoreClient类中的同名方法进行操作。在 RetryingMetaStoreClient.invoke 中封装了自动重试的逻辑,在底层与Metastore的连接过程中出现异常的情况下会自动重试而不影响上层用户的操作。
这里我们在注释中标注了 invoke 方法中主要的操作步骤,可以看到,重试次数由参数hive.metastore.failure.retries控制,两次重试之间的等待时间由hive.metastore.client.connect.retry.delay控制。
注意,这里我们说的是“重试”,而不是“重连”,一次重试中与Metastore的交互有两步:1. 建立与Metastore的会话 2. 执行用户请求。我们继续看下客户端是怎么建立与Metastore的会话的
// org.apache.hadoop.hive.metastore.HiveMetaStoreClient.java @Override public void reconnect() throws MetaException { ...... close(); // 当配置了多个Metastore时,会随机调整Metastore顺序 promoteRandomMetaStoreURI(); open(); } private void open() throws MetaException { isConnected = false; ...... // hive.metastore.client.socket.timeout int clientSocketTimeout = (int) conf.getTimeVar( ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS); for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { for (URI store : metastoreUris) { try { transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout); ...... try { transport.open(); isConnected = true; } catch (TTransportException e) { ...... } ...... } catch (MetaException e) { ...... } if (isConnected) { break; } } // Wait before launching the next round of connection retries. if (!isConnected && retryDelaySeconds > 0) { try { Thread.sleep(retryDelaySeconds * 1000); } catch (InterruptedException ignore) {} } } if (!isConnected) { throw new MetaException("Could not connect to meta store using any of the URIs provided." + " Most recent failure: " + StringUtils.stringifyException(tte)); } ...... } public HiveMetaStoreClient(HiveConf conf, HiveMetaHookLoader hookLoader, Boolean allowEmbedded) throws MetaException { ...... // hive.metastore.connect.retries retries = HiveConf.getIntVar(conf, HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES); // hive.metastore.client.connect.retry.delay retryDelaySeconds = conf.getTimeVar( ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS); ...... // 初始化一个HiveMetaStoreClient对象时会尝试建立与Metastore的长会话 open(); }
同上一步的重试逻辑类似,与Metastore的连接支持自动重连,由 hive.metastore.connect.retries 控制重连次数,hive.metastore.client.connect.retry.delay 控制重连等待时间,底层利用Thrift提供的RPC通信服务。
如果配置了多个Metastore地址,每一次重连的时候会按顺序遍历所有的Metastore并尝试与之建立会话,直到有一个会话建立成功为止。
此外,初始化一个HiveMetaStoreClient对象时会调用open()方法尝试建立一个与Metastore的长会话,供后面的用户请求使用
HiveMetaStoreClient.open() 方法建立一个与Metastore的会话,该方法中会在连接失败的情况下自动重连,重连次数、重连等待时间分别由参数 hive.metastore.connect.retries 、 hive.metastore.client.connect.retry.delay 控制。且每次重连时会遍历用户配置的所有的Metastore直到成功建立一个会话
用户新建一个Metastore客户端(例如启动一个CLI、Hiveserver2进程)时,会初始化并维护一个IMetaStoreClient对象,在初始化时调用 *HiveMetaStoreClient.open()*方法建立一个与Metastore的长会话
用户每次调用IMetaStoreClient中的方法进行业务操作,实际上委托给 RetryingMetaStoreClient.invoke 方法操作,在遇到与Metastore连接等异常时会进行自动重试,重试次数、重试等待时间分别由参数 hive.metastore.failure.retries 、 hive.metastore.client.connect.retry.delay 控制
RetryingMetaStoreClient.invoke 中每次重试会尝试调用 HiveMetaStoreClient.reconnect() 方法重连Metastore,HiveMetaStoreClient.reconnect() 方法内会调用 HiveMetaStoreClient.open() 去连接Metastore。因此,invoke方法实际上在重试循环中嵌套了循环重连Metastore的操作
所以 hive.metastore.failure.retries 参数实际上仅用于在已经建立了Metastore的会话的基础上进行正常的业务访问过程中遇到连接异常等问题时的重试次数限制,而 hive.metastore.connect.retries 则是更底层自动重连Metastore的次数限制
此外,hive.server2.thrift.client.connect.retry.limit 同 hive.server2.thrift.client.retry.limit 的区别也与hive.metastore.connect.retries 和 hive.metastore.failure.retries的区别类似,这里就不再赘述,有兴趣的同学可以参照本篇文档去研究下源码
看完了这篇文章,相信你对“Hive Metastore客户端自动重连机制的示例分析”有了一定的了解,如果想了解更多相关知识,欢迎关注亿速云行业资讯频道,感谢各位的阅读!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。