您好,登錄后才能下訂單哦!
本篇內容介紹了“怎么使用StampLock”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
主要成員變量
public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage { // 實際存儲數據的位置 private final EntryLogger entryLogger; // ----------------- // index 相關 // ----------------- // 記錄fence,exist,masterKey等信息 private final LedgerMetadataIndex ledgerIndex; // 關于位置的index private final EntryLocationIndex entryLocationIndex; // 臨時的ledgerCache private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache; // ----------------- // 寫入相關 // ----------------- // 用來寫入的memtable,2個互相swap private final StampedLock writeCacheRotationLock = new StampedLock(); // Write cache where all new entries are inserted into protected volatile WriteCache writeCache; // Write cache that is used to swap with writeCache during flushes protected volatile WriteCache writeCacheBeingFlushed; // Cache where we insert entries for speculative reading private final ReadCache readCache; // checkpoint 相關 private final CheckpointSource checkpointSource; private Checkpoint lastCheckpoint = Checkpoint.MIN; }
可以讀寫ledger,維護ledger的位置(index)
保存ledger相關的metadata
支持checkpoint
寫入會直接寫入到WriteCache
里面,這里面使用了StampLock,將swap cache的操作進行了保護,StampLock是一個樂觀讀的讀寫鎖,并發更高。
public long addEntry(ByteBuf entry) throws IOException, BookieException { long startTime = MathUtils.nowInNano(); long ledgerId = entry.getLong(entry.readerIndex()); long entryId = entry.getLong(entry.readerIndex() + 8); long lac = entry.getLong(entry.readerIndex() + 16); // 這里的模板是StampLock樂觀讀取的通用模板 // 相對的互斥操作實際上是swap cache的操作 // First we try to do an optimistic locking to get access to the current write cache. // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the // rest of the time, we can have multiple thread using the optimistic lock here without interfering. // 樂觀讀鎖 long stamp = writeCacheRotationLock.tryOptimisticRead(); boolean inserted = false; inserted = writeCache.put(ledgerId, entryId, entry); // 如果插入過程中發生了cache swap 則再次插入 if (!writeCacheRotationLock.validate(stamp)) { // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat // the operation because we might have inserted in a write cache that was already being flushed and cleared, // without being sure about this last entry being flushed or not. // 說明插入到被swap的那個cache里面了 // 如果insert是true TODO // 如果是false的話沒啥影響 stamp = writeCacheRotationLock.readLock(); try { inserted = writeCache.put(ledgerId, entryId, entry); } finally { writeCacheRotationLock.unlockRead(stamp); } } // 如果這里寫入到writeCache失敗的話,觸發Flush WriteCache // 走到這里說明可能2個buffer都滿了? if (!inserted) { triggerFlushAndAddEntry(ledgerId, entryId, entry); } // 更新LAC的緩存 // after successfully insert the entry, update LAC and notify the watchers updateCachedLacIfNeeded(ledgerId, lac); return entryId; }
這里的邏輯比較容易,一直不斷循環插入到writeCache 里面,如果超時的話就跳出循環標記,這個寫入失敗。
如果沒有觸發flush動作的話,會提交一個flush task。
private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry) throws IOException, BookieException { // metric 打點 dbLedgerStorageStats.getThrottledWriteRequests().inc(); ... // 最大等待寫入時間,超時之前不斷重試 while (System.nanoTime() < absoluteTimeoutNanos) { // Write cache is full, we need to trigger a flush so that it gets rotated // If the flush has already been triggered or flush has already switched the // cache, we don't need to trigger another flush // 提交一個flush任務,如果之前有了就不提交了 if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) { // Trigger an early flush in background log.info("Write cache is full, triggering flush"); executor.execute(() -> { try { flush(); } catch (IOException e) { log.error("Error during flush", e); } }); } long stamp = writeCacheRotationLock.readLock(); try { if (writeCache.put(ledgerId, entryId, entry)) { // We succeeded in putting the entry in write cache in the return; } } finally { writeCacheRotationLock.unlockRead(stamp); } // Wait some time and try again try { Thread.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId); } } // Timeout expired and we weren't able to insert in write cache dbLedgerStorageStats.getRejectedWriteRequests().inc(); throw new OperationRejectedException(); }
實際上flush流程是觸發checkpoint的邏輯,
主要動作
交換2個writeCache,正在寫入的cache會被交換成flush的batch
遍歷writeCache,將內容寫到EntryLogger里面
sync EntryLogger將上一步寫入的內容落盤
更新ledgerLocationIndex,同時flush這個index到rocksDb里面
public void flush() throws IOException { // journal Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); checkpointSource.checkpointComplete(cp, true); } public void checkpoint(Checkpoint checkpoint) throws IOException { // journal Checkpoint thisCheckpoint = checkpointSource.newCheckpoint(); // 這里檢查是否在這個點之前做過checkpoint了 if (lastCheckpoint.compareTo(checkpoint) > 0) { return; } long startTime = MathUtils.nowInNano(); // Only a single flush operation can happen at a time flushMutex.lock(); try { // Swap the write cache so that writes can continue to happen while the flush is // ongoing // 這里邏輯比較容易,交換當前的writeCache和后備的writeCache // 獲取的是StampLock的writeLock swapWriteCache(); long sizeToFlush = writeCacheBeingFlushed.size(); // Write all the pending entries into the entry logger and collect the offset // position for each entry // 刷cache到實際的保存位置上、 // 構建一個rocksDb的batch Batch batch = entryLocationIndex.newBatch(); writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> { try { // 把寫入的entry刷到entryLogger里面 // 這里返回的這個entry的offset long location = entryLogger.addEntry(ledgerId, entry, true); // 這里的邏輯實際上就是把3個long 拆分成k/v 寫入到RocksDb的batch 里面 entryLocationIndex.addLocation(batch, ledgerId, entryId, location); } catch (IOException e) { throw new RuntimeException(e); } }); // 這里不展開說了,實際上會把剛才寫入的entryLogger進行flush && fsync 到磁盤上。 entryLogger.flush(); // 這里觸發RocksDb的batch flush // 這個寫入是sync的 long batchFlushStarTime = System.nanoTime(); batch.flush(); batch.close(); // flush ledgerIndex // 這里的內容變化比較少,因為記錄的是metadata ledgerIndex.flush(); // 調度一個cleanUp的邏輯 cleanupExecutor.execute(() -> { // There can only be one single cleanup task running because the cleanupExecutor // is single-threaded try { if (log.isDebugEnabled()) { log.debug("Removing deleted ledgers from db indexes"); } entryLocationIndex.removeOffsetFromDeletedLedgers(); ledgerIndex.removeDeletedLedgers(); } catch (Throwable t) { log.warn("Failed to cleanup db indexes", t); } }); // 保存checkpoint lastCheckpoint = thisCheckpoint; // 清空這個cache // Discard all the entry from the write cache, since they're now persisted writeCacheBeingFlushed.clear(); } catch (IOException e) { // Leave IOExecption as it is throw e; } catch (RuntimeException e) { // Wrap unchecked exceptions throw new IOException(e); } finally { try { isFlushOngoing.set(false); } finally { flushMutex.unlock(); } } }
這樣寫入就完成了
這里會從3個位置開始讀取
writeCache,包括正在刷新的和正在寫入的
readCache,預讀的緩存
entryLogger,讀文件,這部分已經落盤了
讀取成功之后會嘗試增加預讀的buffer
如果正在flush這個時候有觸發讀取會怎么樣?
上面的flush流程是在所有內容已經落盤之后才把刷新的writeCache 清空的
即使有并發讀,如果最后還是落到了讀文件這一步,那怎么都能讀到
還有個問題就是這個先后順序,不確定是否有相同ledgerId,entry,但是內容不同的請求出現。
這樣的話感覺可能有問題
public ByteBuf getEntry(long ledgerId, long entryId) throws IOException { long startTime = MathUtils.nowInNano(); // 讀LAC的情況 if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) { return getLastEntry(ledgerId); } // We need to try to read from both write caches, since recent entries could be found in either of the two. The // write caches are already thread safe on their own, here we just need to make sure we get references to both // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches. long stamp = writeCacheRotationLock.tryOptimisticRead(); WriteCache localWriteCache = writeCache; WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed; if (!writeCacheRotationLock.validate(stamp)) { // Fallback to regular read lock approach stamp = writeCacheRotationLock.readLock(); try { localWriteCache = writeCache; localWriteCacheBeingFlushed = writeCacheBeingFlushed; } finally { writeCacheRotationLock.unlockRead(stamp); } } // First try to read from the write cache of recent entries ByteBuf entry = localWriteCache.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // If there's a flush going on, the entry might be in the flush buffer entry = localWriteCacheBeingFlushed.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // Try reading from read-ahead cache entry = readCache.get(ledgerId, entryId); if (entry != null) { recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; } // Read from main storage long entryLocation; try { entryLocation = entryLocationIndex.getLocation(ledgerId, entryId); if (entryLocation == 0) { throw new NoEntryException(ledgerId, entryId); } entry = entryLogger.readEntry(ledgerId, entryId, entryLocation); } catch (NoEntryException e) { recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); throw e; } readCache.put(ledgerId, entryId, entry); // Try to read more entries long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes(); fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation); recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime); recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime); return entry; }
“怎么使用StampLock”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。