本篇内容介绍了“怎么使用StampLock”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
主要成员变量
public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage { // 实际存储数据的位置 private final EntryLogger entryLogger; // ----------------- // index 相关 // ----------------- // 记录fence,exist,masterKey等信息 private final LedgerMetadataIndex ledgerIndex; // 关于位置的index private final EntryLocationIndex entryLocationIndex; // 临时的ledgerCache private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache; // ----------------- // 写入相关 // ----------------- // 用来写入的memtable,2个互相swap private final StampedLock writeCacheRotationLock = new StampedLock(); // Write cache where all new entries are inserted into protected volatile WriteCache writeCache; // Write cache that is used to swap with writeCache during flushes protected volatile WriteCache writeCacheBeingFlushed; // Cache where we insert entries for speculative reading private final ReadCache readCache; // checkpoint 相关 private final CheckpointSource checkpointSource; private Checkpoint lastCheckpoint = Checkpoint.MIN; }
可以读写ledger,维护ledger的位置(index)
保存ledger相关的metadata
支持checkpoint
写入会直接写入到WriteCache
里面,这里面使用了StampLock,将swap cache的操作进行了保护,StampLock是一个乐观读的读写锁,并发更高。
public long addEntry(ByteBuf entry) throws IOException, BookieException { long startTime = MathUtils.nowInNano(); long ledgerId = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); long lac = entry.getLong(entry.readerIndex() + 16); // 这里的模板是StampLock乐观读取的通用模板 // 相对的互斥操作实际上是swap cache的操作 // First we try to do an optimistic locking to get access to the current write cache. // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the // rest of the time, we can have multiple thread using the optimistic lock here without interfering. // 乐观读锁 long stamp = writeCacheRotationLock.tryOptimisticRead(); boolean inserted = false; inserted = writeCache.put(ledgerId, entryId, entry); // 如果插入过程中发生了cache swap 则再次插入 if (!writeCacheRotationLock.validate(stamp)) { // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat // the operation because we might have inserted in a write cache that was already being flushed and cleared, // without being sure about this last entry being flushed or not. // 说明插入到被swap的那个cache里面了 // 如果insert是true TODO // 如果是false的话没啥影响 stamp = writeCacheRotationLock.readLock(); try { inserted = writeCache.put(ledgerId, entryId, entry); } finally { writeCacheRotationLock.unlockRead(stamp); } } // 如果这里写入到writeCache失败的话,触发Flush WriteCache // 走到这里说明可能2个buffer都满了? if (!inserted) { triggerFlushAndAddEntry(ledgerId, entryId, entry); } // 更新LAC的缓存 // after successfully insert the entry, update LAC and notify the watchers updateCachedLacIfNeeded(ledgerId, lac); return entryId; }
这里的逻辑比较容易,一直不断循环插入到writeCache 里面,如果超时的话就跳出循环标记,这个写入失败。
如果没有触发flush动作的话,会提交一个flush task。
private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException, BookieException { // metric 打点 dbLedgerStorageStats.getThrottledWriteRequests().inc(); ... // 最大等待写入时间,超时之前不断重试 while (System.nanoTime() < absoluteTimeoutNanos) { // Write cache is full, we need to trigger a flush so that it gets rotated // If the flush has already been triggered or flush has already switched the // cache, we don't need to trigger another flush // 提交一个flush任务,如果之前有了就不提交了 if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { // Trigger an early flush in background log.info("Write cache is full, triggering flush"); executor.execute(() -> { try { flush(); } catch (IOException e) { log.error("Error during flush", e); } }); } long stamp = writeCacheRotationLock.readLock(); try { if (writeCache.put(ledgerId, entryId, entry)) { // We succeeded in putting the entry in write cache in the return; } } finally { writeCacheRotationLock.unlockRead(stamp); } // Wait some time and try again try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId); } } // Timeout expired and we weren't able to insert in write cache dbLedgerStorageStats.getRejectedWriteRequests().inc(); throw new OperationRejectedException(); }
实际上flush流程是触发checkpoint的逻辑,
主要动作
交换2个writeCache,正在写入的cache会被交换成flush的batch
遍历writeCache,将内容写到EntryLogger里面
sync EntryLogger将上一步写入的内容落盘
更新ledgerLocationIndex,同时flush这个index到rocksDb里面
public void flush() throws IOException { // journal Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(cp, true); } public void checkpoint(Checkpoint checkpoint) throws IOException { // journal Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); // 这里检查是否在这个点之前做过checkpoint了 if (lastCheckpoint.compareTo(checkpoint) > 0) { return; } long startTime = MathUtils.nowInNano(); // Only a single flush operation can happen at a time flushMutex.lock(); try { // Swap the write cache so that writes can continue to happen while the flush is // ongoing // 这里逻辑比较容易,交换当前的writeCache和后备的writeCache // 获取的是StampLock的writeLock swapWriteCache(); long sizeToFlush = writeCacheBeingFlushed.size(); // Write all the pending entries into the entry logger and collect the offset // position for each entry // 刷cache到实际的保存位置上、 // 构建一个rocksDb的batch Batch batch = entryLocationIndex.newBatch(); writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> { try { // 把写入的entry刷到entryLogger里面 // 这里返回的这个entry的offset long location = entryLogger.addEntry(ledgerId, entry, true); // 这里的逻辑实际上就是把3个long 拆分成k/v 写入到RocksDb的batch 里面 entryLocationIndex.addLocation(batch, ledgerId, entryId, location); } catch (IOException e) { throw new RuntimeException(e); } }); // 这里不展开说了,实际上会把刚才写入的entryLogger进行flush && fsync 到磁盘上。 entryLogger.flush(); // 这里触发RocksDb的batch flush // 这个写入是sync的 long batchFlushStarTime = System.nanoTime(); batch.flush(); batch.close(); // flush ledgerIndex // 这里的内容变化比较少,因为记录的是metadata ledgerIndex.flush(); // 调度一个cleanUp的逻辑 cleanupExecutor.execute(() -> { // There can only be one single cleanup task running because the cleanupExecutor // is single-threaded try { if (log.isDebugEnabled()) { log.debug("Removing deleted ledgers from db indexes"); } entryLocationIndex.removeOffsetFromDeletedLedgers(); ledgerIndex.removeDeletedLedgers(); } catch (Throwable t) { log.warn("Failed to cleanup db indexes", t); } }); // 保存checkpoint lastCheckpoint = thisCheckpoint; // 清空这个cache // Discard all the entry from the write cache, since they're now persisted writeCacheBeingFlushed.clear(); } catch (IOException e) { // Leave IOExecption as it is throw e; } catch (RuntimeException e) { // Wrap unchecked exceptions throw new IOException(e); } finally { try { isFlushOngoing.set(false); } finally { flushMutex.unlock(); } } }
这样写入就完成了
这里会从3个位置开始读取
writeCache,包括正在刷新的和正在写入的
readCache,预读的缓存
entryLogger,读文件,这部分已经落盘了
读取成功之后会尝试增加预读的buffer
如果正在flush这个时候有触发读取会怎么样?
上面的flush流程是在所有内容已经落盘之后才把刷新的writeCache 清空的
即使有并发读,如果最后还是落到了读文件这一步,那怎么都能读到
还有个问题就是这个先后顺序,不确定是否有相同ledgerId,entry,但是内容不同的请求出现。
这样的话感觉可能有问题
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { long startTime = MathUtils.nowInNano(); // 读LAC的情况 if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { return getLastEntry(ledgerId); } // We need to try to read from both write caches, since recent entries could be found in either of the two. The // write caches are already thread safe on their own, here we just need to make sure we get references to both // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches. long stamp = writeCacheRotationLock.tryOptimisticRead(); WriteCache localWriteCache = writeCache; WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed; if (!writeCacheRotationLock.validate(stamp)) { // Fallback to regular read lock approach stamp = writeCacheRotationLock.readLock(); try { localWriteCache = writeCache; localWriteCacheBeingFlushed = writeCacheBeingFlushed; } finally { writeCacheRotationLock.unlockRead(stamp); } } // First try to read from the write cache of recent entries ByteBuf entry = localWriteCache.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // If there's a flush going on, the entry might be in the flush buffer entry = localWriteCacheBeingFlushed.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // Try reading from read-ahead cache entry = readCache.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // Read from main storage long entryLocation; try { entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); if (entryLocation == 0) { throw new NoEntryException(ledgerId, entryId); } entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); } catch (NoEntryException e) { recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); throw e; } readCache.put(ledgerId, entryId, entry); // Try to read more entries long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; }
“怎么使用StampLock”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。