这篇文章主要介绍“怎么使用sharding-jdbc读写分离”,在日常操作中,相信很多人在怎么使用sharding-jdbc读写分离问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用sharding-jdbc读写分离”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
核心概念
主库:添加、更新以及删除数据操作
从库:查询数据操作所使用的数据库,可支持多从库
一主多从读写分离,多主多从需用使用sharding
源码分析
1.启动入口:
public class JavaConfigurationExample { // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES; // private static ShardingType shardingType = ShardingType.SHARDING_TABLES; // private static ShardingType shardingType = ShardingType.SHARDING_DATABASES_AND_TABLES; private static ShardingType shardingType = ShardingType.MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_MASTER_SLAVE; // private static ShardingType shardingType = ShardingType.SHARDING_VERTICAL; public static void main(final String[] args) throws SQLException { DataSource dataSource = DataSourceFactory.newInstance(shardingType); CommonService commonService = getCommonService(dataSource); commonService.initEnvironment(); commonService.processSuccess(); commonService.cleanEnvironment(); } private static CommonService getCommonService(final DataSource dataSource) { return new CommonServiceImpl(new OrderRepositoryImpl(dataSource), new OrderItemRepositoryImpl(dataSource)); } }
2.以sharding-jdbc为例,配置主从读写分离代码如下:
@Override public DataSource getDataSource() throws SQLException { //主从配置 MasterSlaveRuleConfiguration masterSlaveRuleConfig = new MasterSlaveRuleConfiguration(/*主从命名*/"demo_ds_master_slave", /*主库*/"demo_ds_master", /*从库*/Arrays.asList("demo_ds_slave_0", "demo_ds_slave_1")); //打印sql Properties props = new Properties(); props.put("sql.show", true); //创建MasterSlaveDataSource数据源 return MasterSlaveDataSourceFactory.createDataSource(createDataSourceMap(), masterSlaveRuleConfig, props); } private Map<String, DataSource> createDataSourceMap() { Map<String, DataSource> result = new HashMap<>(); //主库 result.put("demo_ds_master", DataSourceUtil.createDataSource("demo_ds_master")); //两从库 result.put("demo_ds_slave_0", DataSourceUtil.createDataSource("demo_ds_slave_0")); result.put("demo_ds_slave_1", DataSourceUtil.createDataSource("demo_ds_slave_1")); return result; }
创建sharding主从数据源MasterSlaveDataSource
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException { super(dataSourceMap); //缓存mysql元数据 cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); //主从规则配置 this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig); //主从sql解析 parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType()); shardingProperties = new ShardingProperties(null == props ? new Properties() : props); }
3.执行insert插入方法
@Override public Long insert(final Order order) throws SQLException { String sql = "INSERT INTO t_order (user_id, status) VALUES (?, ?)"; //获取MasterSlaveDataSource数据源连接,同时创建MasterSlavePreparedStatement //这里有两个Statement分别含义 //1.MasterSlaveStatement:执行sql时候才路由 //2.MasterSlavePreparedStatement:创建Statement时就路由 //Statement.RETURN_GENERATED_KEYS 自动生成主键并返回生成的主键 try (Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) { preparedStatement.setInt(1, order.getUserId()); preparedStatement.setString(2, order.getStatus()); //MasterSlavePreparedStatement执行sql preparedStatement.executeUpdate(); try (ResultSet resultSet = preparedStatement.getGeneratedKeys()) { if (resultSet.next()) { order.setOrderId(resultSet.getLong(1)); } } } return order.getOrderId(); }
获取数据库连接MasterSlaveConnection->AbstractConnectionAdapter#getConnection
/** * Get database connection. * * @param dataSourceName data source name * @return database connection * @throws SQLException SQL exception */ //MEMORY_STRICTLY:Proxy会保持一个数据库中所有被路由到的表的连接,这种方式的好处是利用流式ResultSet来节省内存 // //CONNECTION_STRICTLY:代理在取出ResultSet中的所有数据后会释放连接,同时,内存的消耗将会增加 // public final Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0); } /** * Get database connections. * * @param connectionMode connection mode * @param dataSourceName data source name * @param connectionSize size of connection list to be get * @return database connections * @throws SQLException SQL exception */ public final List<Connection> getConnections(final ConnectionMode connectionMode, final String dataSourceName, final int connectionSize) throws SQLException { //获取数据源 DataSource dataSource = getDataSourceMap().get(dataSourceName); Preconditions.checkState(null != dataSource, "Missing the data source name: '%s'", dataSourceName); Collection<Connection> connections; //并发从cache中获取连接 synchronized (cachedConnections) { connections = cachedConnections.get(dataSourceName); } List<Connection> result; //如果cache中连接数大于指定连接数时,返回指定连接数量 if (connections.size() >= connectionSize) { result = new ArrayList<>(connections).subList(0, connectionSize); } else if (!connections.isEmpty()) { result = new ArrayList<>(connectionSize); result.addAll(connections); //创建缺少的指定连接数 List<Connection> newConnections = createConnections(dataSourceName, connectionMode, dataSource, connectionSize - connections.size()); result.addAll(newConnections); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, newConnections); } } else { result = new ArrayList<>(createConnections(dataSourceName, connectionMode, dataSource, connectionSize)); synchronized (cachedConnections) { cachedConnections.putAll(dataSourceName, result); } } return result; } @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException { //为1时不存在并发获取连接情况,直接返回单个连接 if (1 == connectionSize) { return Collections.singletonList(createConnection(dataSourceName, dataSource)); } //TODO 不处理并发 if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) { return createConnections(dataSourceName, dataSource, connectionSize); } //并发 synchronized (dataSource) { return createConnections(dataSourceName, dataSource, connectionSize); } } private List<Connection> createConnections(final String dataSourceName, final DataSource dataSource, final int connectionSize) throws SQLException { List<Connection> result = new ArrayList<>(connectionSize); for (int i = 0; i < connectionSize; i++) { try { result.add(createConnection(dataSourceName, dataSource)); } catch (final SQLException ex) { for (Connection each : result) { each.close(); } throw new SQLException(String.format("Could't get %d connections one time, partition succeed connection(%d) have released!", connectionSize, result.size()), ex); } } return result; } private Connection createConnection(final String dataSourceName, final DataSource dataSource) throws SQLException { //判断是否是sharding事物 Connection result = isInShardingTransaction() ? shardingTransactionManager.getConnection(dataSourceName) : dataSource.getConnection(); replayMethodsInvocation(result); return result; }
预准备路由并缓存Statement
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException { this.connection = connection; //创建router对象 masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); //缓存路由后的Statement,useCache缓存解析后的sql Statement for (String each : masterSlaveRouter.route(sql, true)) { //获取数据库连接 PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys); routedStatements.add(preparedStatement); } }
执行MasterSlaveRouter#route方法获取路由库
public Collection<String> route(final String sql, final boolean useCache) { //解析sql,这里不分析sql如何使用antlr4解析 Collection<String> result = route(parseEngine.parse(sql, useCache)); //是否打印sql if (showSQL) { SQLLogger.logSQL(sql, result); } return result; } private Collection<String> route(final SQLStatement sqlStatement) { //判断是否master if (isMasterRoute(sqlStatement)) { //设置当前线程是否允许访问主库 MasterVisitedManager.setMasterVisited(); //返回主库 return Collections.singletonList(masterSlaveRule.getMasterDataSourceName()); } //根据配置的算法获取从库,两种算法: //1、随机 //2、轮询 return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()))); }
执行MasterSlavePreparedStatement#executeUpdate
@Override public int executeUpdate() throws SQLException { int result = 0; //从本地缓存遍历执行 for (PreparedStatement each : routedStatements) { result += each.executeUpdate(); } return result; }
4.获取从库算法策略
随机算法
@Getter @Setter public final class RandomMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { private Properties properties = new Properties(); @Override public String getType() { return "RANDOM"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) { //从slave.size()中获取一个随机数 return slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size())); } }
轮询算法
@Getter @Setter public final class RoundRobinMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm { //并发map private static final ConcurrentHashMap<String, AtomicInteger> COUNTS = new ConcurrentHashMap<>(); private Properties properties = new Properties(); @Override public String getType() { return "ROUND_ROBIN"; } @Override public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) { //查看对应名称的计数器,没有则初始化一个 AtomicInteger count = COUNTS.containsKey(name) ? COUNTS.get(name) : new AtomicInteger(0); COUNTS.putIfAbsent(name, count); // 采用cas轮询,如果计数器长到slave.size(),那么归零(防止计数器不断增长下去) count.compareAndSet(slaveDataSourceNames.size(), 0); //绝对值,计数器%slave.size()取模 return slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size()); } }
默认算法
SPI扩展机制,load加载第一个算法作为默认算法;ss默认是随机
到此,关于“怎么使用sharding-jdbc读写分离”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。