这期内容当中小编将会给大家带来有关ShardingContent的功能有哪些,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
ShardingContent主要做了那些功能呢?主要有两部分:
数据源分片元数据
主要根据数据源连接获取对应的url,通过解析url参数来封装数据源分片元数据;数据源分片元数据主要后续SQL路由DCL(比如:授权、创建用户等)操作使用
表分片元数据
主要根据数据节点来获取真实表的元数据;而表分片元数据主要后续SQL解析填充使用
源码分析
1.ShardingContext构造,主要分析ShardingTableMetaData
public ShardingContext(final Map<String, DataSource> dataSourceMap, final ShardingRule shardingRule, final DatabaseType databaseType, final Properties props) throws SQLException { this.shardingRule = shardingRule; //获取数据源原始元数据信息 this.cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //数据源类型 this.databaseType = databaseType; //sharding 配置参数 //比如:sql打印、线程池大小配置等 shardingProperties = new ShardingProperties(null == props ? new Properties() : props); //Statement、PrepareStatement执行线程池大小 //一个分片数据源将使用独立的线程池,它不会在同一个JVM中共享线程池甚至不同的数据源 //默认无限制 int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); //执行引擎 executeEngine = new ShardingExecuteEngine(executorSize); //数据源分片元数据 //以mysql为例,建立连接获取mysql url,将解析后的url参数信息封装到ShardingDataSourceMetaData ShardingDataSourceMetaData shardingDataSourceMetaData = new ShardingDataSourceMetaData(getDataSourceURLs(dataSourceMap), shardingRule, databaseType); //表分片元数据 //以mysql为例,会建立连接获取表的元信息(字段、字段类型、索引) ShardingTableMetaData shardingTableMetaData = new ShardingTableMetaData(getTableMetaDataInitializer(dataSourceMap, shardingDataSourceMetaData).load(shardingRule)); //封装数据源分片元数据、表分片元数据 metaData = new ShardingMetaData(shardingDataSourceMetaData, shardingTableMetaData); //解析结果缓存 parsingResultCache = new ParsingResultCache(); } // private TableMetaDataInitializer getTableMetaDataInitializer(final Map<String, DataSource> dataSourceMap, final ShardingDataSourceMetaData shardingDataSourceMetaData) { return new TableMetaDataInitializer(shardingDataSourceMetaData, executeEngine, new JDBCTableMetaDataConnectionManager(dataSourceMap), shardingProperties.<Integer>getValue(ShardingPropertiesConstant.MAX_CONNECTIONS_SIZE_PER_QUERY), shardingProperties.<Boolean>getValue(ShardingPropertiesConstant.CHECK_TABLE_METADATA_ENABLED)); }
2.加载TableMetaDataInitializer#load
public TableMetaDataInitializer(final ShardingDataSourceMetaData shardingDataSourceMetaData, final ShardingExecuteEngine executeEngine, final TableMetaDataConnectionManager connectionManager, final int maxConnectionsSizePerQuery, final boolean isCheckingMetaData) { //数据源分片元数据 this.shardingDataSourceMetaData = shardingDataSourceMetaData; //数据源连接管理器 this.connectionManager = connectionManager; //表元数据加载器 tableMetaDataLoader = new TableMetaDataLoader(shardingDataSourceMetaData, executeEngine, connectionManager, maxConnectionsSizePerQuery, isCheckingMetaData); } /** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data */ @SneakyThrows public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) { return tableMetaDataLoader.load(logicTableName, shardingRule); } /** * Load all table meta data. * * @param shardingRule sharding rule * @return all table meta data */ @SneakyThrows public Map<String, TableMetaData> load(final ShardingRule shardingRule) { Map<String, TableMetaData> result = new HashMap<>(); //加载分片表 result.putAll(loadShardingTables(shardingRule)); //加载未分片表 result.putAll(loadDefaultTables(shardingRule)); return result; } private Map<String, TableMetaData> loadShardingTables(final ShardingRule shardingRule) throws SQLException { Map<String, TableMetaData> result = new HashMap<>(shardingRule.getTableRules().size(), 1); for (TableRule each : shardingRule.getTableRules()) { //加载逻辑表对应真实表的元数据 //逻辑表:表元数据 result.put(each.getLogicTable(), tableMetaDataLoader.load(each.getLogicTable(), shardingRule)); } return result; } private Map<String, TableMetaData> loadDefaultTables(final ShardingRule shardingRule) throws SQLException { Map<String, TableMetaData> result = new HashMap<>(shardingRule.getTableRules().size(), 1); //查询默认数据源,没有则查找主库 Optional<String> actualDefaultDataSourceName = shardingRule.findActualDefaultDataSourceName(); if (actualDefaultDataSourceName.isPresent()) { //获取所有表元数据 //真实表:表元数据 for (String each : getAllTableNames(actualDefaultDataSourceName.get())) { result.put(each, tableMetaDataLoader.load(each, shardingRule)); } } return result; } private Collection<String> getAllTableNames(final String dataSourceName) throws SQLException { Collection<String> result = new LinkedHashSet<>(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); try (Connection connection = connectionManager.getConnection(dataSourceName); ResultSet resultSet = connection.getMetaData().getTables(catalog, getCurrentSchemaName(connection), null, new String[]{"TABLE"})) { while (resultSet.next()) { String tableName = resultSet.getString("TABLE_NAME"); if (!tableName.contains("$") && !tableName.contains("/")) { result.add(tableName); } } } return result; } private String getCurrentSchemaName(final Connection connection) throws SQLException { try { return connection.getSchema(); } catch (final AbstractMethodError | SQLFeatureNotSupportedException ignore) { return null; } }
3.加载表元数据TableMetaDataLoader#load
/** * Load table meta data. * * @param logicTableName logic table name * @param shardingRule sharding rule * @return table meta data * @throws SQLException SQL exception */ public TableMetaData load(final String logicTableName, final ShardingRule shardingRule) throws SQLException { //获取表元数据 List<TableMetaData> actualTableMetaDataList = load(getDataNodeGroups(logicTableName, shardingRule), shardingRule.getShardingDataSourceNames()); //检查actualTableMetaDataList的元数据 checkUniformed(logicTableName, actualTableMetaDataList); return actualTableMetaDataList.iterator().next(); } private List<TableMetaData> load(final Map<String, List<DataNode>> dataNodeGroups, final ShardingDataSourceNames shardingDataSourceNames) throws SQLException { //将封装好的数据节点组提交给执行引擎执行 return executeEngine.groupExecute(getDataNodeGroups(dataNodeGroups), new ShardingGroupExecuteCallback<DataNode, TableMetaData>() { @Override public Collection<TableMetaData> execute(final Collection<DataNode> dataNodes, final boolean isTrunkThread, final Map<String, Object> shardingExecuteDataMap) throws SQLException { String dataSourceName = dataNodes.iterator().next().getDataSourceName(); DataSourceMetaData dataSourceMetaData = shardingDataSourceMetaData.getActualDataSourceMetaData(dataSourceName); String catalog = null == dataSourceMetaData ? null : dataSourceMetaData.getSchemaName(); return load(shardingDataSourceNames.getRawMasterDataSourceName(dataSourceName), catalog, dataNodes); } }); } private Collection<TableMetaData> load(final String dataSourceName, final String catalog, final Collection<DataNode> dataNodes) throws SQLException { Collection<TableMetaData> result = new LinkedList<>(); try (Connection connection = connectionManager.getConnection(dataSourceName)) { for (DataNode each : dataNodes) { //获取表元数据 result.add(createTableMetaData(connection, catalog, each.getTableName())); } } return result; } private Map<String, List<DataNode>> getDataNodeGroups(final String logicTableName, final ShardingRule shardingRule) { //根据逻辑表获取对应的数据源:真实表数据节点 //比如: //ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] //ds_1 -> [ds_1.t_order_0, ds_1.t_order_1] Map<String, List<DataNode>> result = shardingRule.getTableRule(logicTableName).getDataNodeGroups(); //默认false,设置为true会处理所有数据节点真实表 if (isCheckingMetaData) { return result; } //返回一个数据节点即可 String firstKey = result.keySet().iterator().next(); return Collections.singletonMap(firstKey, Collections.singletonList(result.get(firstKey).get(0))); } /** * 将数据节点组封装成分片执行组 * * @param dataNodeGroups 数据节点组 * <pre> * ds_0 -> [ds_0:t_order_0, ds_0:t_order_1] * </pre> * @return */ private Collection<ShardingExecuteGroup<DataNode>> getDataNodeGroups(final Map<String, List<DataNode>> dataNodeGroups) { Collection<ShardingExecuteGroup<DataNode>> result = new LinkedList<>(); //遍历对应数据源下的数据节点 for (Entry<String, List<DataNode>> entry : dataNodeGroups.entrySet()) { //封装分片执行组ShardingExecuteGroup result.addAll(getDataNodeGroups(entry.getValue())); } return result; } private Collection<ShardingExecuteGroup<DataNode>> getDataNodeGroups(final List<DataNode> dataNodes) { Collection<ShardingExecuteGroup<DataNode>> result = new LinkedList<>(); //maxConnectionsSizePerQuery最大查询连接数默认为1 //将dataNodes换分Math.max份 for (List<DataNode> each : Lists.partition(dataNodes, Math.max(dataNodes.size() / maxConnectionsSizePerQuery, 1))) { result.add(new ShardingExecuteGroup<>(each)); } return result; } private TableMetaData createTableMetaData(final Connection connection, final String catalog, final String actualTableName) throws SQLException { //判断表是否存在 if (isTableExist(connection, catalog, actualTableName)) { //封装表元数据 return new TableMetaData(getColumnMetaDataList(connection, catalog, actualTableName), getLogicIndexes(connection, catalog, actualTableName)); } return new TableMetaData(Collections.<ColumnMetaData>emptyList(), Collections.<String>emptySet()); } private boolean isTableExist(final Connection connection, final String catalog, final String actualTableName) throws SQLException { try (ResultSet resultSet = connection.getMetaData().getTables(catalog, null, actualTableName, null)) { return resultSet.next(); } } /** * 获取表字段元数据 * * @param connection 连接 * @param catalog schema * @param actualTableName 真实表 * @return * @throws SQLException */ private List<ColumnMetaData> getColumnMetaDataList(final Connection connection, final String catalog, final String actualTableName) throws SQLException { List<ColumnMetaData> result = new LinkedList<>(); Collection<String> primaryKeys = getPrimaryKeys(connection, catalog, actualTableName); try (ResultSet resultSet = connection.getMetaData().getColumns(catalog, null, actualTableName, "%")) { while (resultSet.next()) { String columnName = resultSet.getString("COLUMN_NAME"); String columnType = resultSet.getString("TYPE_NAME"); result.add(new ColumnMetaData(columnName, columnType, primaryKeys.contains(columnName))); } } return result; } /** * 获取表主键 */ private Collection<String> getPrimaryKeys(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection<String> result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getPrimaryKeys(catalog, null, actualTableName)) { while (resultSet.next()) { result.add(resultSet.getString("COLUMN_NAME")); } } return result; } /** * 获取表索引 */ private Collection<String> getLogicIndexes(final Connection connection, final String catalog, final String actualTableName) throws SQLException { Collection<String> result = new HashSet<>(); try (ResultSet resultSet = connection.getMetaData().getIndexInfo(catalog, catalog, actualTableName, false, false)) { while (resultSet.next()) { Optional<String> logicIndex = getLogicIndex(resultSet.getString("INDEX_NAME"), actualTableName); if (logicIndex.isPresent()) { result.add(logicIndex.get()); } } } return result; } private Optional<String> getLogicIndex(final String actualIndexName, final String actualTableName) { //索引要以`_tableName`命名,比如: //idx_t_order String indexNameSuffix = "_" + actualTableName; if (actualIndexName.contains(indexNameSuffix)) { return Optional.of(actualIndexName.replace(indexNameSuffix, "")); } return Optional.absent(); }
4.执行ShardingExecuteEngine#groupExecute
/** * Execute for group. * * @param inputGroups input groups * @param callback sharding execute callback * @param <I> type of input value * @param <O> type of return value * @return execute result * @throws SQLException throw if execute failure */ public <I, O> List<O> groupExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { return groupExecute(inputGroups, null, callback, false); } /** * Execute for group. * * @param inputGroups input groups * @param firstCallback first sharding execute callback * @param callback sharding execute callback * @param serial whether using multi thread execute or not * @param <I> type of input value * @param <O> type of return value * @return execute result * @throws SQLException throw if execute failure */ public <I, O> List<O> groupExecute( final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback, final boolean serial) throws SQLException { if (inputGroups.isEmpty()) { return Collections.emptyList(); } //serial: 串行 //parallel: 并行 return serial ? serialExecute(inputGroups, firstCallback, callback) : parallelExecute(inputGroups, firstCallback, callback); } private <I, O> List<O> serialExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator(); ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next(); //单独执行第一个组 //当firstCallback不为空时使用firstCallback,否则使用callback List<O> result = new LinkedList<>(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback)); //遍历执行 for (ShardingExecuteGroup<I> each : Lists.newArrayList(inputGroupsIterator)) { result.addAll(syncGroupExecute(each, callback)); } return result; } private <I, O> List<O> parallelExecute(final Collection<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> firstCallback, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { Iterator<ShardingExecuteGroup<I>> inputGroupsIterator = inputGroups.iterator(); //获取第一个组 ShardingExecuteGroup<I> firstInputs = inputGroupsIterator.next(); //将剩余组提交到线程池中执行 Collection<ListenableFuture<Collection<O>>> restResultFutures = asyncGroupExecute(Lists.newArrayList(inputGroupsIterator), callback); //执行第一个组,合并同步执行、异步执行结果 return getGroupResults(syncGroupExecute(firstInputs, null == firstCallback ? callback : firstCallback), restResultFutures); } /** * 异步执行 */ private <I, O> Collection<ListenableFuture<Collection<O>>> asyncGroupExecute(final List<ShardingExecuteGroup<I>> inputGroups, final ShardingGroupExecuteCallback<I, O> callback) { Collection<ListenableFuture<Collection<O>>> result = new LinkedList<>(); for (ShardingExecuteGroup<I> each : inputGroups) { result.add(asyncGroupExecute(each, callback)); } return result; } private <I, O> ListenableFuture<Collection<O>> asyncGroupExecute(final ShardingExecuteGroup<I> inputGroup, final ShardingGroupExecuteCallback<I, O> callback) { final Map<String, Object> dataMap = ShardingExecuteDataMap.getDataMap(); //提交到线程池 return executorService.submit(new Callable<Collection<O>>() { @Override public Collection<O> call() throws SQLException { return callback.execute(inputGroup.getInputs(), false, dataMap); } }); } /** * 同步执行 */ private <I, O> Collection<O> syncGroupExecute(final ShardingExecuteGroup<I> executeGroup, final ShardingGroupExecuteCallback<I, O> callback) throws SQLException { return callback.execute(executeGroup.getInputs(), true, ShardingExecuteDataMap.getDataMap()); } private <O> List<O> getGroupResults(final Collection<O> firstResults, final Collection<ListenableFuture<Collection<O>>> restFutures) throws SQLException { List<O> result = new LinkedList<>(firstResults); for (ListenableFuture<Collection<O>> each : restFutures) { try { result.addAll(each.get()); } catch (final InterruptedException | ExecutionException ex) { return throwException(ex); } } return result; }
上述就是小编为大家分享的ShardingContent的功能有哪些了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。