前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:bigcache

golang源码分析:bigcache

作者头像
golangLeetcode
发布2023-09-06 19:27:34
3210
发布2023-09-06 19:27:34
举报

github.com/allegro/bigcache 是一个0GC的local cache,bigcache的核心设计有两项:分片(shard),其实通过分片降低访问压力是cache常见的操作;使用BytesQueue避免GC开销。

代码语言:javascript
复制
BigCache relies on optimization presented in 1.5 version of Go (issue-9477). This optimization states that if map without pointers in keys and values is used then GC will omit its content. Therefore BigCache uses map[uint64]uint32 where keys are hashed and values are offsets of entries.

Entries are kept in byte slices, to omit GC again. Byte slices size can grow to gigabytes without impact on performance because GC will only see single pointer to it.

BytesQueue的实现原理利用了上述所说的Go1.5的特性,在map里不产生指针、不触发GC。BytesQueue是bigcache的核心数据结构。为了达到0 GC,BytesQueue使用了一个索引结构map[uint64]uint32以及一个实际存储数据的数组[]byte,[]byte根据bigcache创建时的初始参数(有默认值)进行初始化,当[]byte容量不足的时候会进行2倍扩容。值得注意的是删除缓存元素的时候bigcache只是在map[uint64]uint32中删除了它的索引,byte数组里的空间是不会释放的。

缺点:1.bigcache具备缓存过期的功能,但这是依赖定时扫描以及调用Set时进行判断以及淘汰的,会导致淘汰不及时。并且过期时间只能提前设置,一个实例里只能设置成相同的过期时间;2.hash冲突返回失败,而不会进行兼容;3.不能进行更新操作;4.需要调用者自行进行序列化。

接着我们分析下如何使用它

代码语言:javascript
复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/allegro/bigcache/v3"
)

func main() {

  config := bigcache.Config{
    // number of shards (must be a power of 2)
    Shards: 1024,

    // time after which entry can be evicted
    LifeWindow: 10 * time.Minute,

    // Interval between removing expired entries (clean up).
    // If set to <= 0 then no action is performed.
    // Setting to < 1 second is counterproductive — bigcache has a one second resolution.
    CleanWindow: 5 * time.Minute,

    // rps * lifeWindow, used only in initial memory allocation
    MaxEntriesInWindow: 1000 * 10 * 60,

    // max entry size in bytes, used only in initial memory allocation
    MaxEntrySize: 500,

    // prints information about additional memory allocation
    Verbose: true,

    // cache will not allocate more memory than this limit, value in MB
    // if value is reached then the oldest entries can be overridden for the new ones
    // 0 value means no size limit
    HardMaxCacheSize: 8192,

    // callback fired when the oldest entry is removed because of its expiration time or no space left
    // for the new entry, or because delete was called. A bitmask representing the reason will be returned.
    // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
    OnRemove: nil,

    // OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left
    // for the new entry, or because delete was called. A constant representing the reason will be passed through.
    // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
    // Ignored if OnRemove is specified.
    OnRemoveWithReason: nil,
  }

  cache, initErr := bigcache.New(context.Background(), config)
  if initErr != nil {
    log.Fatal(initErr)
  }

  cache.Set("my-unique-key", []byte("value"))

  if entry, err := cache.Get("my-unique-key"); err == nil {
    fmt.Println(string(entry))
  }
}

大致分为三步:创建配置,初始化cache,然后进行get和set操作。但是它的缺点很明显,hash不能冲突,比如下面的例子

代码语言:javascript
复制
package main

import (
  "context"
  "fmt"
  "log"
  "time"

  "github.com/allegro/bigcache/v3"
)

type Hasher struct{}

func (h *Hasher) Sum64(string) uint64 {
  return 23
}

func main() {

  config := bigcache.Config{
    // number of shards (must be a power of 2)
    Shards: 1024,

    // time after which entry can be evicted
    LifeWindow: 10 * time.Minute,

    // Interval between removing expired entries (clean up).
    // If set to <= 0 then no action is performed.
    // Setting to < 1 second is counterproductive — bigcache has a one second resolution.
    CleanWindow: 5 * time.Minute,

    // rps * lifeWindow, used only in initial memory allocation
    MaxEntriesInWindow: 1000 * 10 * 60,

    // max entry size in bytes, used only in initial memory allocation
    MaxEntrySize: 500,

    // prints information about additional memory allocation
    Verbose: true,

    // cache will not allocate more memory than this limit, value in MB
    // if value is reached then the oldest entries can be overridden for the new ones
    // 0 value means no size limit
    HardMaxCacheSize: 8192,

    // callback fired when the oldest entry is removed because of its expiration time or no space left
    // for the new entry, or because delete was called. A bitmask representing the reason will be returned.
    // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
    OnRemove: nil,

    // OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left
    // for the new entry, or because delete was called. A constant representing the reason will be passed through.
    // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
    // Ignored if OnRemove is specified.
    OnRemoveWithReason: nil,
    Hasher:             &Hasher{},
  }

  cache, initErr := bigcache.New(context.Background(), config)
  if initErr != nil {
    log.Fatal(initErr)
  }

  cache.Set("my-unique-key", []byte("value"))
  cache.Set("my-unique-key2", []byte("value2"))
  if entry, err := cache.Get("my-unique-key"); err == nil {
    fmt.Println(string(entry))
  } else {
    fmt.Println(entry)
  }
}

我们自定义了hash函数,返回的值一样,人为造成hash冲突,运行结果如下:

代码语言:javascript
复制
% go run ./test/cache/exp3/main.go
2023/05/14 11:38:02 Collision detected. Both "my-unique-key" and "my-unique-key2" have the same hash 17
[]

为什么会这样呢?因为它的原理是map里面值是hash和存储数据在byte数组里的偏移量的映射,如果hash冲突了,会造成缓存写脏,所以是不允许hash冲突的。接着,我们通过源码分析下它是如何实现的。

它的配置定义在onfig.go

代码语言:javascript
复制
type Config struct {
  // Number of cache shards, value must be a power of two
  Shards int
  // Time after which entry can be evicted
  LifeWindow time.Duration
  // Interval between removing expired entries (clean up).
  // If set to <= 0 then no action is performed. Setting to < 1 second is counterproductive — bigcache has a one second resolution.
  CleanWindow time.Duration
  // Max number of entries in life window. Used only to calculate initial size for cache shards.
  // When proper value is set then additional memory allocation does not occur.
  MaxEntriesInWindow int
  // Max size of entry in bytes. Used only to calculate initial size for cache shards.
  MaxEntrySize int
  // StatsEnabled if true calculate the number of times a cached resource was requested.
  StatsEnabled bool
  // Verbose mode prints information about new memory allocation
  Verbose bool
  // Hasher used to map between string keys and unsigned 64bit integers, by default fnv64 hashing is used.
  Hasher Hasher
  // HardMaxCacheSize is a limit for BytesQueue size in MB.
  // It can protect application from consuming all available memory on machine, therefore from running OOM Killer.
  // Default value is 0 which means unlimited size. When the limit is higher than 0 and reached then
  // the oldest entries are overridden for the new ones. The max memory consumption will be bigger than
  // HardMaxCacheSize due to Shards' s additional memory. Every Shard consumes additional memory for map of keys
  // and statistics (map[uint64]uint32) the size of this map is equal to number of entries in
  // cache ~ 2×(64+32)×n bits + overhead or map itself.
  HardMaxCacheSize int
  // OnRemove is a callback fired when the oldest entry is removed because of its expiration time or no space left
  // for the new entry, or because delete was called.
  // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
  // ignored if OnRemoveWithMetadata is specified.
  OnRemove func(key string, entry []byte)
  // OnRemoveWithMetadata is a callback fired when the oldest entry is removed because of its expiration time or no space left
  // for the new entry, or because delete was called. A structure representing details about that specific entry.
  // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
  OnRemoveWithMetadata func(key string, entry []byte, keyMetadata Metadata)
  // OnRemoveWithReason is a callback fired when the oldest entry is removed because of its expiration time or no space left
  // for the new entry, or because delete was called. A constant representing the reason will be passed through.
  // Default value is nil which means no callback and it prevents from unwrapping the oldest entry.
  // Ignored if OnRemove is specified.
  OnRemoveWithReason func(key string, entry []byte, reason RemoveReason)


  onRemoveFilter int


  // Logger is a logging interface and used in combination with `Verbose`
  // Defaults to `DefaultLogger()`
  Logger Logger
}
代码语言:javascript
复制
func DefaultConfig(eviction time.Duration) Config {

我们可以配置内存占用的硬限和过期时间等参数。

cache结构体定义在bigcache.go,它用一个数组来存所有的分片,来减少锁冲突。

代码语言:javascript
复制
type BigCache struct {
  shards     []*cacheShard
  lifeWindow uint64
  clock      clock
  hash       Hasher
  config     Config
  shardMask  uint64
  close      chan struct{}
}  
代码语言:javascript
复制
func NewBigCache(config Config) (*BigCache, error) {
  return newBigCache(context.Background(), config, &systemClock{})
}

初始化的时候会初始化每一个分片的entry,然后通过定时器来实现过期的清理操作:

代码语言:javascript
复制
func newBigCache(ctx context.Context, config Config, clock clock) (*BigCache, error) {
      cache := &BigCache{
      for i := 0; i < config.Shards; i++ {
    cache.shards[i] = initNewShard(config, onRemove, clock)
  }


go func() {
ticker := time.NewTicker(config.CleanWindow)
      defer ticker.Stop()
      for {
        select {
        case <-ctx.Done():
          fmt.Println("ctx done, shutting down bigcache cleanup routine")
          return
        case t := <-ticker.C:
          cache.cleanUp(uint64(t.Unix()))
代码语言:javascript
复制
func (c *BigCache) cleanUp(currentTimestamp uint64) {
  for _, shard := range c.shards {
    shard.cleanUp(currentTimestamp)
  }
}

接着我们看下存储操作的执行过程:

代码语言:javascript
复制
func (c *BigCache) Set(key string, entry []byte) error {
        hashedKey := c.hash.Sum64(key)
  shard := c.getShard(hashedKey)
  return shard.set(key, hashedKey, entry)

先计算出hash值,然后根据hash值找到分片,最后做分片内部的存储操作,读取的过程是类似的

代码语言:javascript
复制
  func (c *BigCache) Get(key string) ([]byte, error) {
  hashedKey := c.hash.Sum64(key)
  shard := c.getShard(hashedKey)
  return shard.get(key, hashedKey)
}

首先根据key获取到entry,然后从entry中解析出请求的key和数据,对比请求key和entry里面的key一致才返回数据,每个shard的操作代码位于shard.go

代码语言:javascript
复制
func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) {
  s.lock.RLock()
  wrappedEntry, err := s.getWrappedEntry(hashedKey)
  if err != nil {
    s.lock.RUnlock()
    return nil, err
  }
  if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey {
    s.lock.RUnlock()
    s.collision()
    if s.isVerbose {
      s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey)
    }
    return nil, ErrEntryNotFound
  }
  entry := readEntry(wrappedEntry)
  s.lock.RUnlock()
  s.hit(hashedKey)


  return entry, nil
}

shard定义如下:

代码语言:javascript
复制
type cacheShard struct {
  hashmap     map[uint64]uint32
  entries     queue.BytesQueue
  lock        sync.RWMutex
  entryBuffer []byte
  onRemove    onRemoveCallback


  isVerbose    bool
  statsEnabled bool
  logger       Logger
  clock        clock
  lifeWindow   uint64


  hashmapStats map[uint64]uint32
  stats        Stats
  cleanEnabled bool
}

清理过程中会从entry中取出数据,然后执行相应操作。

代码语言:javascript
复制
func (s *cacheShard) cleanUp(currentTimestamp uint64) {
  s.lock.Lock()
  for {
    if oldestEntry, err := s.entries.Peek(); err != nil {
      break
    } else if evicted := s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry); !evicted {
      break
    }
  }
  s.lock.Unlock()
}
代码语言:javascript
复制
func (s *cacheShard) onEvict(oldestEntry []byte, currentTimestamp uint64, evict func(reason RemoveReason) error) bool {
  if s.isExpired(oldestEntry, currentTimestamp) {
    evict(Expired)
    return true
  }
  return false
}
代码语言:javascript
复制
func (s *cacheShard) removeOldestEntry(reason RemoveReason) error {
代码语言:javascript
复制
func initNewShard(config Config, callback onRemoveCallback, clock clock) *cacheShard {
      return &cacheShard{
代码语言:javascript
复制
func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error {
  currentTimestamp := uint64(s.clock.Epoch())


  s.lock.Lock()


  if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 {
    if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil {
      resetKeyFromEntry(previousEntry)
      //remove hashkey
      delete(s.hashmap, hashedKey)
    }
  }


  if !s.cleanEnabled {
    if oldestEntry, err := s.entries.Peek(); err == nil {
      s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry)
    }
  }


  w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer)


  for {
    if index, err := s.entries.Push(w); err == nil {
      s.hashmap[hashedKey] = uint32(index)
      s.lock.Unlock()
      return nil
    }
    if s.removeOldestEntry(NoSpace) != nil {
      s.lock.Unlock()
      return fmt.Errorf("entry is bigger than max shard size")
    }
  }
}
代码语言:javascript
复制
func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) {
  itemIndex := s.hashmap[hashedKey]


  if itemIndex == 0 {
    s.miss()
    return nil, ErrEntryNotFound
  }


  wrappedEntry, err := s.entries.Get(int(itemIndex))
  if err != nil {
    s.miss()
    return nil, err
  }


  return wrappedEntry, err
}

具体数据存在entries queue.BytesQueue中,它的定义位于queue/bytes_queue.go

代码语言:javascript
复制
type BytesQueue struct {
  full         bool
  array        []byte
  capacity     int
  maxCapacity  int
  head         int
  tail         int
  count        int
  rightMargin  int
  headerBuffer []byte
  verbose      bool
}
代码语言:javascript
复制
func NewBytesQueue(capacity int, maxCapacity int, verbose bool) *BytesQueue {

实现对应的push和pop操作

代码语言:javascript
复制
func (q *BytesQueue) Push(data []byte) (int, error) {
      q.push(data, neededSize)
代码语言:javascript
复制
func (q *BytesQueue) push(data []byte, len int) {
代码语言:javascript
复制
func (q *BytesQueue) Pop() ([]byte, error) {
代码语言:javascript
复制
func (q *BytesQueue) Get(index int) ([]byte, error) {
  data, _, err := q.peek(index)
  return data, err
}    
代码语言:javascript
复制
func (q *BytesQueue) peek(index int) ([]byte, int, error) {
  err := q.peekCheckErr(index)
  if err != nil {
    return nil, 0, err
  }


  blockSize, n := binary.Uvarint(q.array[index:])
  return q.array[index+n : index+int(blockSize)], int(blockSize), nil
}

entry的编码位于encoding.go

代码语言:javascript
复制
func readEntry(data []byte) []byte {
  length := binary.LittleEndian.Uint16(data[timestampSizeInBytes+hashSizeInBytes:])


  // copy on read
  dst := make([]byte, len(data)-int(headersSizeInBytes+length))
  copy(dst, data[headersSizeInBytes+length:])


  return dst
}

除了核心功能,还定义了一个服务器,用于提供cache的监控和统计数据的功能,源码位于server/server.go

代码语言:javascript
复制
func main() {
      f, err := os.OpenFile(logfile, os.O_APPEND|os.O_WRONLY, 0600)
      cache, err = bigcache.New(context.Background(), config)
        if err != nil {
        http.Handle(cachePath, serviceLoader(cacheIndexHandler(), requestMetrics(logger)))
      http.Handle(statsPath, serviceLoader(statsIndexHandler(), requestMetrics(logger)))
      log.Fatal("ListenAndServe: ", http.ListenAndServe(strPort, nil))
}

server/middleware.go

代码语言:javascript
复制
func serviceLoader(h http.Handler, svcs ...service) http.Handler {
  for _, svc := range svcs {
    h = svc(h)
  }
  return h
}

server/stats_handler.go

代码语言:javascript
复制
func statsIndexHandler() http.Handler {
  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    switch r.Method {
    case http.MethodGet:
      getCacheStatsHandler(w, r)
    default:
      w.WriteHeader(http.StatusMethodNotAllowed)
    }
  })
}
本文参与?腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-05-17,如有侵权请联系?cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体分享计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com