下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

VictorialMetrics存储原理之索引存储格式

作者:匿名     来源: 系统运维点击数:852发布时间: 2022-06-18 23:43:57

标签: 持久化数据存储原理索引存储格式

  索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?本节我们将对此进行详细讲解。

  ​​前文我们介绍了当插入数据的时候会先去添加索引数据​​,索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?本节我们将对此进行详细讲解。

  添加索引数据

  索引构建完成后会调用 AddItems​ 函数将索引添加到 Table 中去:

  复制

  1. // lib/mergeset/table.go

  2. // AddItems 添加指定的 items 到 table 中去

  3. func (tb *Table) AddItems(items [][]byte) error {

  4. if err := tb.rawItems.addItems(tb, items); err != nil {

  5. return fmt.Errorf("cannot insert data into %q: %w", tb.path, err)

  6. }

  7. return nil

  8. }

  Table 的结构如下所示:

  复制

  1. // lib/mergeset/table.go

  2. // Table 代表 mergeset table.

  3. type Table struct {

  4. activeMerges uint64

  5. mergesCount uint64

  6. itemsMerged uint64

  7. assistedMerges uint64

  8. // merge 索引

  9. mergeIdx uint64

  10. // 路径

  11. path string

  12. // flush回调

  13. flushCallback func()

  14. flushCallbackWorkerWG sync.WaitGroup

  15. needFlushCallbackCall uint32

  16. // 在将指定项的整个块刷新到持久存储之前,在合并期间调用的回调

  17. prepareBlock PrepareBlockCallback

  18. // parts 列表

  19. partsLock sync.Mutex

  20. parts []*partWrapper

  21. // rawItems 包含最近添加的尚未转换为 parts 的数据

  22. // 出于性能原因,未在搜索中使用 rawItems

  23. rawItems rawItemsShards

  24. snapshotLock sync.RWMutex

  25. flockF *os.File

  26. stopCh chan struct{}

  27. partMergersWG syncwg.WaitGroup

  28. rawItemsFlusherWG sync.WaitGroup

  29. convertersWG sync.WaitGroup

  30. rawItemsPendingFlushesWG syncwg.WaitGroup

  31. }

  一个索引 Table​ 就对应着一个 indexDB,也就是数据目录 indexdb​ 下面的文件夹:

  其中核心的是 parts​ 和 rawItems 两个属性。

  parts​ 主要是存储 merge 后的 blocks,一个part​ 与文件系统上的一个目录对应,比如上图中的24_1_16F4A862471C1DC9​ 目录就是一个part。

  rawItems​ 是用于预处理Items​ 的,是一个rawItemsShards 对象。

  rawItemsShards 结构体定义如下所示:

  复制

  1. // lib/mergeset/table.go

  2. type rawItemsShards struct {

  3. shardIdx uint32

  4. // 在多 cpu 系统上添加 rows 数据时,shards 分片可以减少锁竞争

  5. shards []rawItemsShard

  6. }

  7. // 每个 table 的 rawItems 分片数

  8. var rawItemsShardsPerTable = cgroup.AvailableCPUs()

  9. // 每个分片最大的Block数

  10. const maxBlocksPerShard = 512

  11. // 当在打开Table的时候就会调用该函数进行初始化

  12. func (riss *rawItemsShards) init() {

  13. riss.shards = make([]rawItemsShard, rawItemsShardsPerTable)

  14. }

  15. // 添加 items 元素

  16. func (riss *rawItemsShards) addItems(tb *Table, items [][]byte) error {

  17. n := atomic.AddUint32(&riss.shardIdx, 1)

  18. shards := riss.shards

  19. idx := n % uint32(len(shards))

  20. shard := &shards[idx]

  21. return shard.addItems(tb, items)

  22. }

  rawItemsShards​ 其实就是加了一个分片功能用于保存索引数据,addItems​ 函数就是将要添加的数据添加到对应的分片上去,最终执行的逻辑是 shard.addItems。

  复制

  1. // lib/mergeset/table.go

  2. type rawItemsShard struct {

  3. mu sync.Mutex

  4. ibs []*inmemoryBlock

  5. lastFlushTime uint64

  6. }

  7. // 添加items元素

  8. func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) error {

  9. var err error

  10. var blocksToFlush []*inmemoryBlock

  11. ris.mu.Lock()

  12. ibs := ris.ibs

  13. if len(ibs) == 0 {

  14. ib := getInmemoryBlock()

  15. ibs = append(ibs, ib)

  16. ris.ibs = ibs

  17. }

  18. // 取最后一个内存块

  19. ib := ibs[len(ibs)-1]

  20. for _, item := range items {

  21. // 添加索引item到内存块

  22. if !ib.Add(item) { // 超过了内存块大小

  23. // 重新获取一个内存块,此时肯定为空

  24. ib = getInmemoryBlock()

  25. // 重新添加

  26. if !ib.Add(item) {

  27. putInmemoryBlock(ib)

  28. err = fmt.Errorf("cannot insert an item %q into an empty inmemoryBlock; it looks like the item is too large? len(item)=%d", item, len(item))

  29. break

  30. }

  31. ibs = append(ibs, ib)

  32. ris.ibs = ibs

  33. }

  34. }

  35. // 超过了每个分片的最大内存块的数量

  36. if len(ibs) >= maxBlocksPerShard {

  37. // 将内存块放到待刷新的内存块列表中去

  38. blocksToFlush = append(blocksToFlush, ibs...)

  39. // 释放前面的内存块资源

  40. for i := range ibs {

  41. ibs[i] = nil

  42. }

  43. ris.ibs = ibs[:0]

  44. ris.lastFlushTime = fasttime.UnixTimestamp()

  45. }

  46. ris.mu.Unlock()

  47. // 执行merge合并操作

  48. tb.mergeRawItemsBlocks(blocksToFlush, false)

  49. return err

  50. }

  51. // lib/mergeset/encoding.go

  52. // 内存中的一个Block块结构

  53. type inmemoryBlock struct {

  54. commonPrefix []byte

  55. data []byte // 用来存储数据

  56. items []Item // 用来存储每个item数据的起始偏移量

  57. }

  58. // Item 表示用于存储在 mergeset 中的单个 item 数据

  59. type Item struct {

  60. // 数据的开始偏移量

  61. Start uint32

  62. // 数据的结束偏移量

  63. End uint32

  64. }

  65. // maxInmemoryBlockSize 是 memoryblock.data 的最大值。

  66. //

  67. // 它必须适合 CPU 缓存大小,即当前 CPU 的缓存大小为64kb。

  68. const maxInmemoryBlockSize = 64 * 1024

  69. // Add 将 x 添加到内存卡 ib 的末尾

  70. //

  71. // 如果由于块大小限制,x 未添加到 ib,则返回 false

  72. func (ib *inmemoryBlock) Add(x []byte) bool {

  73. data := ib.data

  74. // 操过块大小限制了

  75. if len(x)+len(data) > maxInmemoryBlockSize {

  76. return false

  77. }

  78. if cap(data) == 0 {

  79. // 预分配 data 和 items 以减少内存分配

  80. data = make([]byte, 0, maxInmemoryBlockSize)

  81. ib.items = make([]Item, 0, 512)

  82. }

  83. dataLen := len(data)

  84. data = append(data, x...) // 将 x 添加到 data

  85. ib.items = append(ib.items, Item{ // 更新 items

  86. Start: uint32(dataLen),

  87. End: uint32(len(data)),

  88. })

  89. ib.data = data

  90. return true

  91. }

  rawItemsShard​ 表示保存索引数据的一个分片,里面其实就是一个 inmemoryBlock​ 的内存块切片,每个分片最多有 512 个内存块,每个内存块占用 64KB 的容量,当每个分片中的内存块数量超过最大数量(512)会去将内存块数据刷新为 Part。

  如果分片中的内存块数量没超过上限,则会通过一个任务去定时(1s)将 rawItem 数据刷新(转换)为 Part,以便它们对搜索可见。

  复制

  1. // lib/mergeset/table.go

  2. // 将最近的 rawItem 刷新(转换)为 Part,以便它们对搜索可见。

  3. const rawItemsFlushInterval = time.Second

  4. // 启动 rawItems Flusher 任务

  5. func (tb *Table) startRawItemsFlusher() {

  6. tb.rawItemsFlusherWG.Add(1)

  7. go func() {

  8. tb.rawItemsFlusher()

  9. tb.rawItemsFlusherWG.Done()

  10. }()

  11. }

  12. func (tb *Table) rawItemsFlusher() {

  13. ticker := time.NewTicker(rawItemsFlushInterval)

  14. defer ticker.Stop()

  15. for {

  16. select {

  17. case <-tb.stopCh:

  18. return

  19. case <-ticker.C:

  20. tb.flushRawItems(false)

  21. }

  22. }

  23. }

  合并内存数据

  将内存块数据转换为 Part​ 都是通过 mergeRawItemsBlocks 函数去实现的。

  复制

  1. // lib/mergeset/table.go

  2. // 一次合并的默认 parts 数

  3. //

  4. // 这个数字是根据经验得出的,它提供了尽可能低的开销

  5. // 有关详细信息,请参阅 appendPartsToMerge test

  6. const defaultPartsToMerge = 15

  7. // merge 内存块数据

  8. func (tb *Table) mergeRawItemsBlocks(ibs []*inmemoryBlock, isFinal bool) {

  9. if len(ibs) == 0 {

  10. return

  11. }

  12. tb.partMergersWG.Add(1)

  13. defer tb.partMergersWG.Done()

  14. pws := make([]*partWrapper, 0, (len(ibs)+defaultPartsToMerge-1)/defaultPartsToMerge)

  15. var pwsLock sync.Mutex

  16. var wg sync.WaitGroup

  17. for len(ibs) > 0 {

  18. // 一次最大合并的内存块数量

  19. n := defaultPartsToMerge

  20. if n > len(ibs) {

  21. n = len(ibs)

  22. }

  23. wg.Add(1)

  24. go func(ibsPart []*inmemoryBlock) {

  25. defer wg.Done()

  26. // merge inmemoryBlock

  27. pw := tb.mergeInmemoryBlocks(ibsPart)

  28. if pw == nil {

  29. return

  30. }

  31. pw.isInMerge = true

  32. pwsLock.Lock()

  33. pws = append(pws, pw)

  34. pwsLock.Unlock()

  35. }(ibs[:n])

  36. ibs = ibs[n:]

  37. }

  38. wg.Wait()

  39. if len(pws) > 0 {

  40. if err := tb.mergeParts(pws, nil, true); err != nil {

  41. logger.Panicf("FATAL: cannot merge raw parts: %s", err)

  42. }

  43. if tb.flushCallback != nil {

  44. if isFinal {

  45. tb.flushCallback()

  46. } else {

  47. atomic.CompareAndSwapUint32(&tb.needFlushCallbackCall, 0, 1)

  48. }

  49. }

  50. }

  51. for {

  52. tb.partsLock.Lock()

  53. ok := len(tb.parts) <= maxParts

  54. tb.partsLock.Unlock()

  55. if ok {

  56. return

  57. }

  58. // The added part exceeds maxParts count. Assist with merging other parts.

  59. //

  60. // Prioritize assisted merges over searches.

  61. storagepacelimiter.Search.Inc()

  62. err := tb.mergeExistingParts(false)

  63. storagepacelimiter.Search.Dec()

  64. if err == nil {

  65. atomic.AddUint64(&tb.assistedMerges, 1)

  66. continue

  67. }

  68. if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {

  69. return

  70. }

  71. logger.Panicf("FATAL: cannot merge small parts: %s", err)

  72. }

  73. }

  mergeRawItemsBlocks​ 函数将指定的内存块进行 merge 合并操作,一次合并最大的内存块数量为 15,然后在独立的 goroutine 中去进行合并操作,使用 mergeInmemoryBlocks 函数。

  复制

  1. // lib/mergeset/table.go

  2. // merge InmemoryBlocks

  3. func (tb *Table) mergeInmemoryBlocks(ibs []*inmemoryBlock) *partWrapper {

  4. // 将 InmemoryBlock 列表转换成 inmemoryPart 列表

  5. // inmemoryPart 表示内存中的Part

  6. mps := make([]*inmemoryPart, 0, len(ibs))

  7. for _, ib := range ibs {

  8. if len(ib.items) == 0 {

  9. continue

  10. }

  11. mp := getInmemoryPart()

  12. mp.Init(ib) // 将inmemoryBlock转换为inmemoryPart

  13. putInmemoryBlock(ib)

  14. mps = append(mps, mp)

  15. }

  16. if len(mps) == 0 {

  17. return nil

  18. }

  19. if len(mps) == 1 {

  20. // 没有要合并的内容。只需返回单个 inmemory part。

  21. mp := mps[0]

  22. p := mp.NewPart()

  23. return &partWrapper{

  24. p: p,

  25. mp: mp,

  26. refCount: 1,

  27. }

  28. }

  29. defer func() {

  30. for _, mp := range mps {

  31. putInmemoryPart(mp)

  32. }

  33. }()

  34. atomic.AddUint64(&tb.mergesCount, 1)

  35. atomic.AddUint64(&tb.activeMerges, 1)

  36. defer atomic.AddUint64(&tb.activeMerges, ^uint64(0))

  37. // 为每个 `inmemoryPart` 构造 `blockStreamReader`, 用于迭代读取 items

  38. bsrs := make([]*blockStreamReader, 0, len(mps))

  39. for _, mp := range mps {

  40. bsr := getBlockStreamReader()

  41. bsr.InitFromInmemoryPart(mp)

  42. bsrs = append(bsrs, bsr)

  43. }

  44. // 准备一个 blockStreamWriter 用于合并写入的 part

  45. bsw := getBlockStreamWriter()

  46. // 不要通过 getInmemoryPart() 获取 mpDst,因为与池中的其他条目相比,它的大小可能太大。

  47. // 这可能会导致内存使用量增加,因为存在大量的碎片。

  48. // 创建一个新的 inmemoryPart,接收合并的数据

  49. mpDst := &inmemoryPart{}

  50. bsw.InitFromInmemoryPart(mpDst)

  51. // 开始 merge 数据

  52. // 该 merge 不应该被 stopCh 中断,因为它可能是 stopCh 关闭后的最终结果

  53. err := mergeBlockStreams(&mpDst.ph, bsw, bsrs, tb.prepareBlock, nil, &tb.itemsMerged)

  54. if err != nil {

  55. logger.Panicf("FATAL: cannot merge inmemoryBlocks: %s", err)

  56. }

  57. putBlockStreamWriter(bsw)

  58. for _, bsr := range bsrs {

  59. putBlockStreamReader(bsr)

  60. }

  61. p := mpDst.NewPart()

  62. return &partWrapper{

  63. p: p,

  64. mp: mpDst,

  65. refCount: 1,

  66. }

  67. }

  上面的函数会将指定的内存块转换成 partWrapper​,该结构就是一个包含 part​ 和 inmemoryPart 的包装器。

  复制

  1. // lib/mergeset/table.go

  2. type partWrapper struct {

  3. p *part

  4. mp *inmemoryPart

  5. refCount uint64

  6. isInMerge bool

  7. }

  part 的结构如下所示:

  复制

  1. // lib/mergeset/part.go

  2. type part struct {

  3. ph partHeader

  4. path string

  5. size uint64

  6. mrs []metaindexRow

  7. indexFile fs.MustReadAtCloser

  8. itemsFile fs.MustReadAtCloser

  9. lensFile fs.MustReadAtCloser

  10. }

  一个 part​ 就是 Table 下面的一个数据目录。

  part​ 中包含一个 partHeader​,该属性中包含当前 part​ 的一些 Meta 信息,一共有多少个 items、有多少 blocks、第一个和最后一个 item,对应着 part​ 目录下面的 metadata.json 文件。

  复制

  1. // lib/mergeset/part_header.go

  2. type partHeader struct {

  3. // part 包含的 items 数

  4. itemsCount uint64

  5. // part 包含的 blocks 数

  6. blocksCount uint64

  7. // part 中的第一个 item

  8. firstItem []byte

  9. // part 中的最后一个 item

  10. lastItem []byte

  11. }

  part​ 中另外的属性 path​ 表示当前 part​ 的路径,size​ 表示大小,另外三个属性 indexFile​、itemsFile​、lensFile​ 对应中 part​ 目录下面的三个文件:index.bin​、items.bin​、lens.bin​。此外 part​ 结构中还有最后一个 mrs​ 属性,是一个 []metaindexRow。

  复制

  1. // lib/mergeset/metaindex_row.go

  2. // metaindexRow 描述了一个 blockHeaders 即索引块。

  3. type metaindexRow struct {

  4. // 第一个 block 中的第一个 item 元素

  5. // 它用于快速查找所需的索引块

  6. firstItem []byte

  7. // 块包含的 blockHeaders 的数量

  8. blockHeadersCount uint32

  9. // 索引文件中块的偏移量

  10. indexBlockOffset uint64

  11. // 索引文件中块的大小

  12. indexBlockSize uint32

  13. }

  除了 part​ 之外还有一个内存中的 inmemoryPart​ 结构,其基本结构和 part​ 类似,不同的是几个相关的属性不是文件对象,而是 ByteBuffer,因为是内存中的结构。

  复制

  1. // lib/mergeset/inmemory_part.go

  2. // 在内存中的 Part 结构

  3. type inmemoryPart struct {

  4. // partHeader 记录 itemsCount, blocksCount, firstItem, lastItem 信息, 最后会序列化到 metadata.json

  5. ph partHeader

  6. // 当前 block 的 header 信息,有 commonPrefix, firstItem, marshalType, itemsCount, itemsBlockOffset, lenBlockOffset, itemsBlockSize, lenBlockSize

  7. bh blockHeader

  8. // 当前 block 的 metaindex 信息,存储了当前 blockHeader 的 firstItem, blockHeaderCount, indexBlockOffset, indexBlockSize

  9. mr metaindexRow

  10. // 用于序列化后写入内存/磁盘文件使用

  11. metaindexData bytesutil.ByteBuffer // -> metaindex.bin

  12. indexData bytesutil.ByteBuffer // -> index.bin

  13. itemsData bytesutil.ByteBuffer // -> items.bin

  14. lensData bytesutil.ByteBuffer // -> lens.bin

  15. }

  其他几个属性上面介绍过,blockHeader 结构如下所示,用于记录 block 头信息:

  复制

  1. // lib/mergeset/block_header.go

  2. type blockHeader struct {

  3. // 块中所有 items 的公用前缀

  4. commonPrefix []byte

  5. // 第一个 item

  6. firstItem []byte

  7. // 用于块压缩的 Marshal 类型

  8. marshalType marshalType

  9. // 块中的 items 数,不包括第一个 item

  10. itemsCount uint32

  11. // items block 的偏移量

  12. itemsBlockOffset uint64

  13. // lens block 的偏移量

  14. lensBlockOffset uint64

  15. // items block 的大小

  16. itemsBlockSize uint32

  17. // lens block 的大小

  18. lensBlockSize uint32

  19. }

  整个 part 的结构看上去确实比较复杂,为什么需要设计这些属性?核心肯定就是为了快速索引,我们先往下分析,待会再回过头来看。

  inmemoryPart​ 是 part​ 读入内存中的结构, 在 inmemoryBlock​ merge 之前,每个 inmemoryBlock​ 都会先通过 mp.Init​ 转换成一个 inmemoryPart​ 的结构,inmemoryPart​ 中 metaindexData​、indexData​、itemsData​、lensData 数据结构与磁盘对应的文件内容一致。

  序列化数据

  现在我们再回到上面的 mergeInmemoryBlocks 函数,流程如下所示:

  1.将所有的inmemoryBlock​ 转换为inmemoryPart 结构。

  2.为每个inmemoryPart​ 构造blockStreamReader,用于迭代读取 items。

  3.创建一个新的inmemoryPart​,并构造一个blockSteamWriter 用于合并写入的数据。

  4.然后调用mergeBlockStreams 函数执行真正的merge操作。

  首先通过 Init​ 函数将 inmemoryBlock​ 转换为 inmemoryPart 结构。

  复制

  1. // lib/mergeset/inmemory_part.go

  2. // Init 初始化 mp 从 ib.

  3. func (mp *inmemoryPart) Init(ib *inmemoryBlock) {

  4. mp.Reset()

  5. sb := &storageBlock{}

  6. sb.itemsData = mp.itemsData.B[:0]

  7. sb.lensData = mp.lensData.B[:0]

  8. // 使用尽可能小的压缩等级来压缩 inmemoryPart,因为它很快就会被合并到文件 part 去。

  9. compressLevel := -5

  10. // 序列化乱序的数据

  11. mp.bh.firstItem, mp.bh.commonPrefix, mp.bh.itemsCount, mp.bh.marshalType = ib.MarshalUnsortedData(sb, mp.bh.firstItem[:0], mp.bh.commonPrefix[:0], compressLevel)

  12. // 获取 partHeader 值

  13. mp.ph.itemsCount = uint64(len(ib.items))

  14. mp.ph.blocksCount = 1

  15. mp.ph.firstItem = append(mp.ph.firstItem[:0], ib.items[0].String(ib.data)...)

  16. mp.ph.lastItem = append(mp.ph.lastItem[:0], ib.items[len(ib.items)-1].String(ib.data)...)

  17. // 获取itemsData,更新blockHeader的items偏移和数量

  18. mp.itemsData.B = sb.itemsData

  19. mp.bh.itemsBlockOffset = 0

  20. mp.bh.itemsBlockSize = uint32(len(mp.itemsData.B))

  21. // 获取lensData,更新blockHeader的lens偏移和数量

  22. mp.lensData.B = sb.lensData

  23. mp.bh.lensBlockOffset = 0

  24. mp.bh.lensBlockSize = uint32(len(mp.lensData.B))

  25. // 获取 indexData,blockHeader序列化的值

  26. bb := inmemoryPartBytePool.Get()

  27. bb.B = mp.bh.Marshal(bb.B[:0])

  28. mp.indexData.B = encoding.CompressZSTDLevel(mp.indexData.B[:0], bb.B, 0)

  29. // 获取 metaindexData,metaindexRow序列化的值

  30. mp.mr.firstItem = append(mp.mr.firstItem[:0], mp.bh.firstItem...)

  31. mp.mr.blockHeadersCount = 1

  32. mp.mr.indexBlockOffset = 0

  33. mp.mr.indexBlockSize = uint32(len(mp.indexData.B))

  34. bb.B = mp.mr.Marshal(bb.B[:0])

  35. mp.metaindexData.B = encoding.CompressZSTDLevel(mp.metaindexData.B[:0], bb.B, 0)

  36. inmemoryPartBytePool.Put(bb)

  37. }

  上面的函数将 inmemoryBlock​ 转换成 inmemoryPart​,首先会通过一个 MarshalUnsortedData 函数来序列化未排序的数据。

  复制

  1. // MarshalUnsortedData 序列化未排序的 items 从 ib 到 sb.

  2. //

  3. // It also:

  4. // - 将第一个 item 追加到 firstItemDst 并返回结果

  5. // - 将所有 item 的公共前缀附加到 commonPrefixDst 并返回结果

  6. // - 返回包含第一个 item 的编码项的数量

  7. // - 返回用于编码的 marshal 类型

  8. func (ib *inmemoryBlock) MarshalUnsortedData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {

  9. if !ib.isSorted() {

  10. sort.Sort(ib) // 排序

  11. }

  12. // 更新内存块的公共前缀

  13. ib.updateCommonPrefix()

  14. // 序列化数据

  15. return ib.marshalData(sb, firstItemDst, commonPrefixDst, compressLevel)

  16. }

  上面的序列化函数中首先会对未排序的数据进行排序,然后更新内存块的公共前缀:

  复制

  1. // lib/mergeset/encoding.go

  2. // 更新公共前缀

  3. func (ib *inmemoryBlock) updateCommonPrefix() {

  4. ib.commonPrefix = ib.commonPrefix[:0] // 公共前缀

  5. if len(ib.items) == 0 {

  6. return

  7. }

  8. items := ib.items // 数据前后位置

  9. data := ib.data // 数据

  10. cp := items[0].Bytes(data) // 第一段数据

  11. if len(cp) == 0 {

  12. return

  13. }

  14. for _, it := range items[1:] { // 后面的数据

  15. // 计算公共前缀的长度

  16. cpLen := commonPrefixLen(cp, it.Bytes(data))

  17. if cpLen == 0 {

  18. return

  19. }

  20. // 截取公共前缀数据

  21. cp = cp[:cpLen]

  22. }

  23. // 设置内存块的公共前缀

  24. ib.commonPrefix = append(ib.commonPrefix[:0], cp...)

  25. }

  公共前缀就是把每段数据包含的共同前缀提取出来,这样存储的时候后面就可以不需要存储共同的部分了,减少存储空间。

  公共前缀提取出来后,接下来调用 marshalData 函数去序列化数据。

  复制

  1. // lib/mergeset/encoding.go

  2. // 前提条件:

  3. // - ib.items 必须排序

  4. // - updateCommonPrefix 必须被调用

  5. // 序列化数据

  6. func (ib *inmemoryBlock) marshalData(sb *storageBlock, firstItemDst, commonPrefixDst []byte, compressLevel int) ([]byte, []byte, uint32, marshalType) {

  7. ......

  8. // 拷贝 inmemoryBlock 数据块的 firstItem(排序后的第一条数据)

  9. data := ib.data // 内存块数据

  10. firstItem := ib.items[0].Bytes(data) // 第一条数据

  11. firstItemDst = append(firstItemDst, firstItem...)

  12. // 最大公共前缀

  13. commonPrefixDst = append(commonPrefixDst, ib.commonPrefix...)

  14. // 内存块数据小于2段或(数据大小-公共前缀长度*数据段大小 < 64) 则定义为小块

  15. if len(data)-len(ib.commonPrefix)*len(ib.items) < 64 || len(ib.items) < 2 {

  16. // 对small block使用普通序列化,因为它更便宜

  17. ib.marshalDataPlain(sb)

  18. return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain

  19. }

  20. bbItems := bbPool.Get()

  21. bItems := bbItems.B[:0] // 保存目的 items 数据的内存 buffer

  22. bbLens := bbPool.Get()

  23. bLens := bbLens.B[:0] // 保存目的 lens 数据的内存buffer

  24. // 序列化 items 数据

  25. // 第一项数据不需要存储,所以获取的 Uint64s 大小要减1

  26. xs := encoding.GetUint64s(len(ib.items) - 1)

  27. defer encoding.PutUint64s(xs)

  28. cpLen := len(ib.commonPrefix) // 公共前缀的长度

  29. prevItem := firstItem[cpLen:] // 第一项数据(排除公共前缀)

  30. prevPrefixLen := uint64(0)

  31. // 从第二个元素开始遍历(第一个 firstItem 单独存储)

  32. for i, it := range ib.items[1:] {

  33. // 偏移到公共前缀之后的位置

  34. it.Start += uint32(cpLen)

  35. // Bytes(data) 得到的数据不包含公共前缀的部分

  36. item := it.Bytes(data)

  37. // 计算第 N 项和 N-1 项的公共前缀长度

  38. prefixLen := uint64(commonPrefixLen(prevItem, item))

  39. // 仅仅只把差异的部分拷贝到目的buffer

  40. bItems = append(bItems, item[prefixLen:]...)

  41. // 第一次,与0异或,还是等于原值。异或后,两个整数值前面相同的部分都为0了,数值变得更短,能够便于压缩。

  42. xLen := prefixLen ^ prevPrefixLen

  43. // 上次的除去公共前缀的item

  44. prevItem = item

  45. // 上次计算得到的公共前缀长度

  46. prevPrefixLen = prefixLen

  47. xs.A[i] = xLen // 异或后的公共前缀值

  48. }

  49. // 对N-1个长度进行序列化(将uint64数组序列化成byte数组)

  50. bLens = encoding.MarshalVarUint64s(bLens, xs.A)

  51. // 将items数据(只有差异的部分)ZSTD压缩后,写入storageBlock

  52. sb.itemsData = encoding.CompressZSTDLevel(sb.itemsData[:0], bItems, compressLevel)

  53. bbItems.B = bItems

  54. bbPool.Put(bbItems)

  55. // 序列化 lens 数据

  56. // 第一项数据大小(排除公共前缀)

  57. prevItemLen := uint64(len(firstItem) - cpLen)

  58. for i, it := range ib.items[1:] { // 从第二个元素开始遍历

  59. // item长度 = End-Start-公共前缀大小

  60. itemLen := uint64(int(it.End-it.Start) - cpLen)

  61. // 与前面一个元素长度异或

  62. xLen := itemLen ^ prevItemLen

  63. // 上次去除公共前缀的长度

  64. prevItemLen = itemLen

  65. xs.A[i] = xLen // 异或后的元素长度

  66. }

  67. // 前面记录的是两两相对的长度,这里记录的是数据的真实长度

  68. // 长度信息包含两种,相对长度和总长度

  69. bLens = encoding.MarshalVarUint64s(bLens, xs.A)

  70. // 将lens数据进行ZSTD压缩后,写入storageBlock

  71. sb.lensData = encoding.CompressZSTDLevel(sb.lensData[:0], bLens, compressLevel)

  72. bbLens.B = bLens

  73. bbPool.Put(bbLens)

  74. // 如果压缩不到90%则选择不压缩

  75. if float64(len(sb.itemsData)) > 0.9*float64(len(data)-len(ib.commonPrefix)*len(ib.items)) {

  76. // 压缩率不高的时候,选择不压缩

  77. ib.marshalDataPlain(sb)

  78. return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypePlain

  79. }

  80. // 很好的压缩率

  81. return firstItemDst, commonPrefixDst, uint32(len(ib.items)), marshalTypeZSTD

  82. }

  上面的序列化函数看上去比较复杂,实际上核心的一点就是想办法尽可能减少存储空间。首先将数据块的第一个数据拷贝出来放入 firstItemDst​,然后后面就从第二个元素开始去循环处理,首先计算第 N​ 项和 N-1​ 项的公共前缀长度,然后将差异的数据部分保存起来,为了能够反序列化回数据,还需要将两两之间公共前缀的长度保存下来,为了能够便于压缩,使用异或的方式来计算两两之间的公共前缀长度值。

  循环计算后,将保存的两两之间的公共前缀长度进行序列化,下面的函数将一个 uint64 类型的切片转换成字节切片,如果数据小于 128 直接转换即可,如果大于 127 则用一个 7bit 来表示数值的内容,最高位后面的一个字节用来表示长度,这样就可以用变长长度来序列化数值,而不是每个数值都占用固定的长度。

  复制

  1. // lib/encoding/int.go

  2. // 将uint64切片转成字节切片

  3. func MarshalVarUint64s(dst []byte, us []uint64) []byte {

  4. for _, u := range us {

  5. if u < 0x80 { // 小于128,直接加入到 dst,能直接存到 byte 中去

  6. // Fast path

  7. dst = append(dst, byte(u))

  8. continue

  9. }

  10. for u > 0x7f { // 大于127,则超过的部分保留为 0x80,低位右移7位继续计算

  11. dst = append(dst, 0x80|byte(u))

  12. u >>= 7

  13. }

  14. dst = append(dst, byte(u))

  15. }

  16. return dst

  17. }

  长度数据序列化后,将 items 数据(只有差异的部分)进行 ZSTD 压缩后,写入 storageBlock。

  只记录两两之间的公共前缀长度还不够,还需要记录数据的真实长度,最后同样再将 lens 数据进行 ZSTD 压缩后,写入 storageBlock。

  如果最后的结果压缩不到 90% 则选择不压缩,不压缩则使用 marshalDataPlain 函数进行序列化:

  复制

  1. // lib/mergeset/encoding.go

  2. // 普通序列化数据

  3. func (ib *inmemoryBlock) marshalDataPlain(sb *storageBlock) {

  4. data := ib.data

  5. // 序列化 items 数据

  6. // 不需要序列化第一项数据,因为它会在 marshalData 中返回给调用者。

  7. cpLen := len(ib.commonPrefix) // 公共前缀长度

  8. b := sb.itemsData[:0]

  9. for _, it := range ib.items[1:] { // 第一项之后的数据

  10. it.Start += uint32(cpLen) // 跳过公共前缀

  11. b = append(b, it.String(data)...) // 添加移出公共前缀的数据

  12. }

  13. sb.itemsData = b // itemsData数据

  14. // 序列化 lens 数据

  15. b = sb.lensData[:0]

  16. for _, it := range ib.items[1:] { // 第一项之后的数据

  17. // 原始的End-Start-公共前缀长度

  18. b = encoding.MarshalUint64(b, uint64(int(it.End-it.Start)-cpLen))

  19. }

  20. sb.lensData = b

  21. }

  经过上面的序列化过后就可以得到第一个数据、公共前缀、items 个数以及序列化类型,然后将这些数据存入 blockHeader 中去,后面就是一些比较简单的常规操作。

  转换成 inmemoryPart​ 后,再包装成 blockStreamReader​,创建一个新的 inmemoryPart​,并构造一个 blockSteamWriter​ 用于合并写入的数据,然后调用 mergeBlockStreams 函数执行真正的 merge 操作。

  复制

  1. // lib/mergeset/merge.go

  2. // mergeBlockStreams 合并 bsrs 并将结果写入 bsw

  3. //

  4. // 也填充了 ph

  5. //

  6. // prepareBlock 是可选的

  7. //

  8. // 当 stopCh 关闭时,该函数立即返回

  9. //

  10. // 它还以原子方式将合并的 items 添加到 itemsMerged

  11. func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStreamReader, prepareBlock PrepareBlockCallback, stopCh <-chan struct{},

  12. itemsMerged *uint64) error {

  13. // 将多个 blockStreamReader 构造成一个 blockStreamMerger 结构

  14. bsm := bsmPool.Get().(*blockStreamMerger)

  15. if err := bsm.Init(bsrs, prepareBlock); err != nil {

  16. return fmt.Errorf("cannot initialize blockStreamMerger: %w", err)

  17. }

  18. err := bsm.Merge(bsw, ph, stopCh, itemsMerged)

  19. bsm.reset()

  20. bsmPool.Put(bsm)

  21. bsw.MustClose()

  22. if err == nil {

  23. return nil

  24. }

  25. return fmt.Errorf("cannot merge %d block streams: %s: %w", len(bsrs), bsrs, err)

  26. }

  首先把多个 blockStreamReader​ 构造成一个 blockStreamMerger​ 结构, merger 里面主要是一个 bsrHeap​ 堆用于维护 bsrs​,用于 merge 数据时的排序。首先通过 merger 的 Init​ 函数构造堆排序的结构,然后核心是调用 merger 的 Merge 函数进行处理。

  复制

  1. // lib/mergeset/merge.go

  2. func (bsm *blockStreamMerger) Merge(bsw *blockStreamWriter, ph *partHeader, stopCh <-chan struct{}, itemsMerged *uint64) error {

  3. again:

  4. if len(bsm.bsrHeap) == 0 {

  5. // 将最后的 inmemoryBlock(可能不完整)写入 bsw

  6. bsm.flushIB(bsw, ph, itemsMerged)

  7. return nil

  8. }

  9. select {

  10. case <-stopCh:

  11. return errForciblyStopped

  12. default:

  13. }

  14. // 取出 blockStreamReader

  15. bsr := heap.Pop(&bsm.bsrHeap).(*blockStreamReader)

  16. var nextItem []byte // 下一个 blockStreamReader

  17. hasNextItem := false

  18. if len(bsm.bsrHeap) > 0 {

  19. nextItem = bsm.bsrHeap[0].bh.firstItem

  20. hasNextItem = true

  21. }

  22. items := bsr.Block.items

  23. data := bsr.Block.data

  24. // 循环所有的 items

  25. for bsr.blockItemIdx < len(bsr.Block.items) {

  26. item := items[bsr.blockItemIdx].Bytes(data)

  27. if hasNextItem && string(item) > string(nextItem) {

  28. break

  29. }

  30. // 添加元素

  31. if !bsm.ib.Add(item) {

  32. // bsm.ib 已满,将其刷新到 bsw 并继续

  33. bsm.flushIB(bsw, ph, itemsMerged)

  34. continue

  35. }

  36. bsr.blockItemIdx++

  37. }

  38. if bsr.blockItemIdx == len(bsr.Block.items) {

  39. // bsr.Block 已完全读取,处理下一个 block

  40. if bsr.Next() {

  41. heap.Push(&bsm.bsrHeap, bsr)

  42. goto again

  43. }

  44. if err := bsr.Error(); err != nil {

  45. return fmt.Errorf("cannot read storageBlock: %w", err)

  46. }

  47. goto again

  48. }

  49. // bsr.Block 中的下一个 item 超过了 nextItem

  50. // 调整 bsr.bh.firstItem 并将 bsr 返回到堆

  51. bsr.bh.firstItem = append(bsr.bh.firstItem[:0], bsr.Block.items[bsr.blockItemIdx].String(bsr.Block.data)...)

  52. heap.Push(&bsm.bsrHeap, bsr)

  53. goto again

  54. }

  这里主要解决的问题是多个有序的字节数组(inmemoryPart),按照字节序排序,合成一个 inmemoryPart​ 的过程,在 merge 的过程中,每 64KB 会单独创建一个 blockHeader,用于快速索引该 block 里面的 Items。

  持久化数据

  最后重复上面的过程,将 n​ 个 inmemoryBlock​ 合并成 (n-1)/defaultPartsToMerge+1​ 个 inmemoryPart​,最后再调用 mergeParts​ 函数完成索引持久化操作,持久化后生成的索引 part,主要包含 metaindex.bin​、index.bin​、lens.bin​、items.bin​、metadata.json​ 等 5 个文件。

 

  这几个文件的关系如下图所示, metaindex.bin​ 文件通过 metaindexRow​ 索引 index.bin​ 文件,index.bin​ 文件通过 indexBlock​ 中的 blockHeader​ 同时索引 items.bin​ 文件和 items.bin​ 文件。

  metaindex.bin:文件包含一系列的 metaindexRow​ 数据,每个 metaindexRow​ 中包含第一条数据 firstItem​、索引块包含的块头部数 blockHeadersCount​、索引块偏移 indexBlockOffset​ 以及索引块大小 indexBlockSize。

  metaindexRow​ 在文件中按照firstItem 的大小的字典序排序存储,以支持二分查找。

  metaindex.bin 文件使用 ZSTD 进行压缩。

  metaindex.bin 文件中的内容在 part 打开时,会全部读出加载至内存中,以加速查询过滤。

  metaindexRow​ 包含的firstItem​ 为其索引的indexBlock​ 中所有blockHeader​ 中字典序最小的firstItem。

  查找时根据firstItem 进行二分检索。

  index.bin:文件中包含一系列的 indexBlock​, 每个 indexBlock​ 又包含一系列 blockHeader​,每个 blockHeader​ 包含 item 的公共前缀 commonPrefix​、第一项数据 firstItem​、itemsData​ 的序列化类型 marshalType​、itemsData​ 包含的 item 数、item 块的偏移 itemsBlockOffset​ 等内容,就是前面使用将 inmemoryBlock​ 转换为 inmemoryPart​ 结构的 Init 函数得到的。

  每个indexBlock​ 使用ZSTD 压缩算法进行压缩。

  在indexBlock​ 中查找时,根据firstItem​ 进行二分检索blockHeader。

  items.bin 文件中,包含一系列的 itemsData​, 每个 itemsData 又包含一系列的 Item。

  itemsData​ 会视情况而定来是否使用 ZTSD 压缩,当 item 个数小于 2 时,或者itemsData​ 的长度小于 64 字节时,不压缩;当itemsData 使用 ZSTD 压缩后的压缩率大于90%的时候也不压缩。

  每个 item 在存储时,去掉了blockHeader​ 中的公共前缀commonPrefix 以提高压缩率。

  lens.bin 文件中,包含一系列的 lensData​, 每个 lensData​ 又包含一系列 8 字节的长度 len, 长度 len 标识 items.bin​ 文件中对应 item 的长度。在读取或者需要解析 itemsData​ 中的 item 时,先要读取对应的 lensData​ 中对应的长度 len。 当 itemsData​ 进行压缩时,lensData 会先使用异或算法进行压缩,然后再使用 ZSTD 算法进一步压缩。

  到这里我们就了解了索引数据是实现和存储原理了,那么真正的指标数据又是如何去存储的呢?

  来源: k8s技术圈

  >>>>>>点击进入系统运维专题

赞(8)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程