您好,登錄后才能下訂單哦!
本篇內容介紹了“go緩存庫freecache怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
go開發緩存場景一般使用map或者緩存框架,為了線程安全會使用sync.Map或線程安全的緩存框架。
緩存場景中如果數據量大于百萬級別,需要特別考慮數據類型對于gc的影響(注意string類型底層是指針+Len+Cap,因此也算是指針類型),如果緩存key和value都是非指針類型的話就無需多慮了。但實際應用場景中,key和value是(包含)指針類型數據是很常見的,因此使用緩存框架需要特別注意其對gc影響,從是否對GC影響角度來看緩存框架大致分為2類:
零GC開銷:比如freecache或bigcache這種,底層基于ringbuf,減小指針個數;
有GC開銷:直接基于Map來實現的緩存框架。
對于map而言,gc時會掃描所有key/value鍵值對,如果其都是基本類型,那么gc便不會再掃描。下面以freecache為例分析下其實現原理,代碼示例如下:
func main() { cacheSize := 100 * 1024 * 1024 cache := freecache.NewCache(cacheSize) for i := 0; i < N; i++ { str := strconv.Itoa(i) _ = cache.Set([]byte(str), []byte(str), 1) } now := time.Now() runtime.GC() fmt.Printf("freecache, GC took: %s\n", time.Since(now)) _, _ = cache.Get([]byte("aa")) now = time.Now() for i := 0; i < N; i++ { str := strconv.Itoa(i) _, _ = cache.Get([]byte(str)) } fmt.Printf("freecache, Get took: %s\n\n", time.Since(now)) }
freecache.NewCache會初始化本地緩存,size表示存儲空間大小,freecache會初始化256個segment,每個segment是獨立的存儲單元,freecache加鎖維度也是基于segment的,每個segment有一個ringbuf,初始大小為size/256。freecache號稱零GC的來源就是其指針是固定的,只有512個,每個segment有2個,分別是rb和slotData(注意切片為指針類型)。
type segment struct { rb RingBuf // ring buffer that stores data segId int _ uint32 // 占位 missCount int64 hitCount int64 entryCount int64 totalCount int64 // number of entries in ring buffer, including deleted entries. totalTime int64 // used to calculate least recent used entry. timer Timer // Timer giving current time totalEvacuate int64 // used for debug totalExpired int64 // used for debug overwrites int64 // used for debug touched int64 // used for debug vacuumLen int64 // up to vacuumLen, new data can be written without overwriting old data. slotLens [256]int32 // The actual length for every slot. slotCap int32 // max number of entry pointers a slot can hold. slotsData []entryPtr // 索引指針 } func NewCacheCustomTimer(size int, timer Timer) (cache *Cache) { cache = new(Cache) for i := 0; i < segmentCount; i++ { cache.segments[i] = newSegment(size/segmentCount, i, timer) } } func newSegment(bufSize int, segId int, timer Timer) (seg segment) { seg.rb = NewRingBuf(bufSize, 0) seg.segId = segId seg.timer = timer seg.vacuumLen = int64(bufSize) seg.slotCap = 1 seg.slotsData = make([]entryPtr, 256*seg.slotCap) // 每個slotData初始化256個單位大小 }
freecache的key和value都是[]byte數組,使用時需要自行序列化和反序列化,如果緩存復雜對象不可忽略其序列化和反序列化帶來的影響,首先看下Set流程:
_ = cache.Set([]byte(str), []byte(str), 1)
Set流程首先對key進行hash,hashVal類型uint64,其低8位segID對應segment數組,低8-15位表示slotId對應slotsData下標,高16位表示slotsData下標對應的[]entryPtr某個數據,這里需要查找操作。注意[]entryPtr數組大小為slotCap(初始為1),當擴容時會slotCap倍增。
每個segment對應一個lock(sync.Mutex),因此其能夠支持較大并發量,而不像sync.Map只有一個鎖。
func (cache *Cache) Set(key, value []byte, expireSeconds int) (err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal // 低8位 cache.locks[segID].Lock() // 加鎖 err = cache.segments[segID].set(key, value, hashVal, expireSeconds) cache.locks[segID].Unlock() } func (seg *segment) set(key, value []byte, hashVal uint64, expireSeconds int) (err error) { slotId := uint8(hashVal >> 8) hash26 := uint16(hashVal >> 16) slot := seg.getSlot(slotId) idx, match := seg.lookup(slot, hash26, key) var hdrBuf [ENTRY_HDR_SIZE]byte hdr := (*entryHdr)(unsafe.Pointer(&hdrBuf[0])) if match { // 有數據更新操作 matchedPtr := &slot[idx] seg.rb.ReadAt(hdrBuf[:], matchedPtr.offset) hdr.slotId = slotId hdr.hash26 = hash26 hdr.keyLen = uint16(len(key)) originAccessTime := hdr.accessTime hdr.accessTime = now hdr.expireAt = expireAt hdr.valLen = uint32(len(value)) if hdr.valCap >= hdr.valLen { // 已存在數據value空間能存下此次value大小 atomic.AddInt64(&seg.totalTime, int64(hdr.accessTime)-int64(originAccessTime)) seg.rb.WriteAt(hdrBuf[:], matchedPtr.offset) seg.rb.WriteAt(value, matchedPtr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) atomic.AddInt64(&seg.overwrites, 1) return } // 刪除對應entryPtr,涉及到slotsData內存copy,ringbug中只是標記刪除 seg.delEntryPtr(slotId, slot, idx) match = false // increase capacity and limit entry len. for hdr.valCap < hdr.valLen { hdr.valCap *= 2 } if hdr.valCap > uint32(maxKeyValLen-len(key)) { hdr.valCap = uint32(maxKeyValLen - len(key)) } } else { // 無數據 hdr.slotId = slotId hdr.hash26 = hash26 hdr.keyLen = uint16(len(key)) hdr.accessTime = now hdr.expireAt = expireAt hdr.valLen = uint32(len(value)) hdr.valCap = uint32(len(value)) if hdr.valCap == 0 { // avoid infinite loop when increasing capacity. hdr.valCap = 1 } } // 數據實際長度為 ENTRY_HDR_SIZE=24 + key和value的長度 entryLen := ENTRY_HDR_SIZE + int64(len(key)) + int64(hdr.valCap) slotModified := seg.evacuate(entryLen, slotId, now) if slotModified { // the slot has been modified during evacuation, we need to looked up for the 'idx' again. // otherwise there would be index out of bound error. slot = seg.getSlot(slotId) idx, match = seg.lookup(slot, hash26, key) // assert(match == false) } newOff := seg.rb.End() seg.insertEntryPtr(slotId, hash26, newOff, idx, hdr.keyLen) seg.rb.Write(hdrBuf[:]) seg.rb.Write(key) seg.rb.Write(value) seg.rb.Skip(int64(hdr.valCap - hdr.valLen)) atomic.AddInt64(&seg.totalTime, int64(now)) atomic.AddInt64(&seg.totalCount, 1) seg.vacuumLen -= entryLen return }
seg.evacuate會評估ringbuf是否有足夠空間存儲key/value,如果空間不夠,其會從空閑空間尾部后一位(也就是待淘汰數據的開始位置)開始掃描(oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size()),如果對應數據已被邏輯deleted或者已過期,那么該塊內存可以直接回收,如果不滿足回收條件,則將entry從環頭調換到環尾,再更新entry的索引,如果這樣循環5次還是不行,那么需要將當前oldHdrBuf回收以滿足內存需要。
執行完seg.evacuate所需空間肯定是能滿足的,然后就是寫入索引和數據了,insertEntryPtr就是寫入索引操作,當[]entryPtr中元素個數大于seg.slotCap(初始1)時,需要擴容操作,對應方法見seg.expand,這里不再贅述。
寫入ringbuf就是執行rb.Write即可。
func (seg *segment) evacuate(entryLen int64, slotId uint8, now uint32) (slotModified bool) { var oldHdrBuf [ENTRY_HDR_SIZE]byte consecutiveEvacuate := 0 for seg.vacuumLen < entryLen { oldOff := seg.rb.End() + seg.vacuumLen - seg.rb.Size() seg.rb.ReadAt(oldHdrBuf[:], oldOff) oldHdr := (*entryHdr)(unsafe.Pointer(&oldHdrBuf[0])) oldEntryLen := ENTRY_HDR_SIZE + int64(oldHdr.keyLen) + int64(oldHdr.valCap) if oldHdr.deleted { // 已刪除 consecutiveEvacuate = 0 atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen continue } expired := oldHdr.expireAt != 0 && oldHdr.expireAt < now leastRecentUsed := int64(oldHdr.accessTime)*atomic.LoadInt64(&seg.totalCount) <= atomic.LoadInt64(&seg.totalTime) if expired || leastRecentUsed || consecutiveEvacuate > 5 { // 可以回收 seg.delEntryPtrByOffset(oldHdr.slotId, oldHdr.hash26, oldOff) if oldHdr.slotId == slotId { slotModified = true } consecutiveEvacuate = 0 atomic.AddInt64(&seg.totalTime, -int64(oldHdr.accessTime)) atomic.AddInt64(&seg.totalCount, -1) seg.vacuumLen += oldEntryLen if expired { atomic.AddInt64(&seg.totalExpired, 1) } else { atomic.AddInt64(&seg.totalEvacuate, 1) } } else { // evacuate an old entry that has been accessed recently for better cache hit rate. newOff := seg.rb.Evacuate(oldOff, int(oldEntryLen)) seg.updateEntryPtr(oldHdr.slotId, oldHdr.hash26, oldOff, newOff) consecutiveEvacuate++ atomic.AddInt64(&seg.totalEvacuate, 1) } } }
freecache的Get流程相對來說簡單點,通過hash找到對應segment,通過slotId找到對應索引slot,然后通過二分+遍歷尋找數據,如果找不到直接返回ErrNotFound,否則更新一些time指標。Get流程還會更新緩存命中率相關指標。
func (cache *Cache) Get(key []byte) (value []byte, err error) { hashVal := hashFunc(key) segID := hashVal & segmentAndOpVal cache.locks[segID].Lock() value, _, err = cache.segments[segID].get(key, nil, hashVal, false) cache.locks[segID].Unlock() return } func (seg *segment) get(key, buf []byte, hashVal uint64, peek bool) (value []byte, expireAt uint32, err error) { hdr, ptr, err := seg.locate(key, hashVal, peek) // hash+定位查找 if err != nil { return } expireAt = hdr.expireAt if cap(buf) >= int(hdr.valLen) { value = buf[:hdr.valLen] } else { value = make([]byte, hdr.valLen) } seg.rb.ReadAt(value, ptr.offset+ENTRY_HDR_SIZE+int64(hdr.keyLen)) }
定位到數據之后,讀取ringbuf即可,注意一般來說讀取到的value是新創建的內存空間,因此涉及到[]byte數據的復制操作。
“go緩存庫freecache怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。