下载安卓APP箭头
箭头给我发消息

客服QQ:3315713922

VictorialMetrics存储原理之索引

作者:匿名     来源: 系统运维点击数:1333发布时间: 2022-06-19 17:29:38

标签: vmstorage监控

  我们来分析下当 vmstorage 接收到数据后是如何保存监控指标的。

  ​​前文我们介绍了 VictorialMetrics 中是如何接收和传输数据的​​​,接下来我们来分析下当 vmstorage 接收到数据后是如何保存监控指标的。

  现在我们使用 csv 来导入一行指标数据,直接使用下面的请求即可:

  复制

  1. curl -d "GOOG,1.23,4.56,NYSE" 'http://127.0.0.1:8480/insert/0/prometheus/api/v1/import/csv?format=2:metric:ask,3:metric:bid,1:label:ticker,4:label:market'

  执行上面的请求后,在 vmstorage 组件下面会收到如下所示的一些日志信息:

  同时在数据目录 vmstorage-data 下面也多了一个 cache 目录,而且 data 下面的 small 目录和 indexdb 目录下面也生成了一些文件,这些文件就是用来存储指标数据的。

  接下来我们就来仔细分析下这些文件是干什么的,以及这些文件的存储格式是怎样的。

  要想弄明白 vmstorage 是如何去存储数据的,首先我们要先弄明白几个概念。

  存储格式

  下图是 VictoriaMetrics 支持的 Prometheus 协议的一个写入示例。

  VM 在收到写入请求时,会对请求中包含的时序数据做转换处理。首先根据包含 metric 和 labels 的 MetricName 生成一个唯一标识 TSID,然后 metric(指标名称__name__) + labels + TSID 作为索引 index,TSID + timestamp + value 作为数据 data,最后索引 index 和数据 data 分别进行存储和检索。

  因此 VM 的数据整体上分成索引和数据两个部分,因此文件格式整体上会有两个部分,其中索引部分主要是用于支持按照 label 或者 tag 进行多维检索,数据存储时,先将数据按 TSID 进行分组,然后每个 TSID 包含的数据点各自使用列式压缩存储。

  TSID

  VictoriaMetrics 的 MetricName 的结构如下所示,包含 MetricGroup(指标名称 __name__) 和 Tag 数组,其中,Tags 是可选的,每个 Tag 由 Key 和 Value 等字节数组构成。

  为了规范,Tags 必须按标签 Key 排序,使用 sortTags 方法。

  VictoriaMetrics 的 TSID 的结构如下所示,包含 MetricGroupID、JobID、InstanceID、MetricID 等几个字段,其中除了 MetricID 外,其他字段都是可选的。这个几个 ID 的生成方法如下:

  MetricGroupID​ 是根据MetricName​ 中的MetricGroup​ 使用xxhash 的 sum64 算法生成。

  JobID​ 和InstanceID​ 分别由MetricName​ 中的第一个 tag 和第二个 tag 使用xxhash 的 sum64 算法生成。为什么使用第一个 tag 和第二个 tag?这是因为 VictoriaMetrics 在写入时,将写入请求中的 JobID 和 InstanceID 放在了 Tag 数组的第一个和第二个位置。

  MetricID,使用 VictoriaMetrics 进程启动时的系统纳秒时间戳自增生成。

  复制

  1. // lib/storage/tsid.go

  2. // TSID 是一个时间序列的唯一 ID,实际上就是唯一标识一个时间序列的结构体。

  3. //

  4. // 时间序列会根据 TSID 进行排序。

  5. //

  6. // 除了 MetricID 之外其他属性都是可选的。 它们的存在仅仅是为了更好地对相关指标进行分组。

  7. // 如果它们的含义与它们的命名不同,那也没关系。

  8. type TSID struct {

  9. AccountID uint32

  10. ProjectID uint32 // 下面分析的时候可以暂时忽略这两个属性,用于多租户标识的属性

      11. 

  12. // MetricGroupID(指标组ID)对于指定的(AccountID, ProjectID)必须是唯一的。

  13. //

  14. // Metric Group 包含具有相同名称的指标,例如 “memory_usage”、“http_requests”,但具有不同的标签。

  15. // 例如,下面的这些指标属于 memory_usage 这个指标组:

  16. //

  17. // memory_usage{datacenter="foo1", job="bar1", instance="baz1:1234"}

  18. // memory_usage{datacenter="foo1", job="bar1", instance="baz2:1234"}

  19. // memory_usage{datacenter="foo1", job="bar2", instance="baz1:1234"}

  20. // memory_usage{datacenter="foo2", job="bar1", instance="baz2:1234"}

  21. MetricGroupID uint64

  22. // JobID 是给定项目的单个作业(又名服务)的 ID。

  23. //

  24. // JobID 对于指定的(AccountID, ProjectID)必须是唯一的。

  25. //

  26. // 一个 Job 任务可能由多个实例组成。

  27. // See https://prometheus.io/docs/concepts/jobs_instances/ for details.

  28. JobID uint32

  29. // InstanceID 是实例(进程)ID,对于特定的(AccountID, ProjectID)必须是唯一的。

  30. InstanceID uint32

  31. // MetricID 是指标(时间序列)的唯一ID。

  32. //

  33. // 其他所有的 TSID 字段都可以通过 MetricID 获取。

  34. MetricID uint64

  35. }

  因为 TSID 中除了 MetricID 外,其他字段都是可选的,因此 TSID 中可以始终作为有效信息的只有 MetricID,因此 VictoriaMetrics 的在构建 tag 到 TSID 的字典过程中,是直接存储的 tag 到 MetricID 的字典。

  以写入 http_requests_total{status="200", method="GET"} 为例,则 MetricName 为 http_requests_total{status="200", method="GET"},假设生成的 TSID 为 {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286},则 VictoriaMetrics 在写入时就构建了如下几种类型的索引 item,其他类型的索引 item 是在后台或者查询时构建的。

  metricName -> TSID​, 即http_requests_total{status="200", method="GET"} -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}。

  metricID -> metricName​,即51106185174286 -> http_requests_total{status="200", method="GET"}。

  metricID -> TSID​,即51106185174286 -> {metricGroupID=0, jobID=0, instanceID=0, metricID=51106185174286}。

  tag -> metricID​,即status="200" -> 51106185174286​、method="GET" -> 51106185174286​、"__name__" = http_requests_total -> 51106185174286(其实还有一个联合索引)。

  有了这些索引的 item 后,就可以支持基于 tag 的多维检索了,在当给定查询条件 http_requests_total{status="200"} 时,VictoriaMetrics 先根据给定的 tag 条件,找出每个 tag 的 metricID 列表,然后计算所有 tag 的 metricID 列表的交集,然后根据交集中的 metricID,再到索引文件中检索出 TSID,根据 TSID 就可以到数据文件中查询数据了,在返回结果之前,再根据 TSID 中的 metricID,到索引文件中检索出对应的写入时的原始 MetircName。

  但是由于 VictoriaMetrics 的 tag 到 metricID 的字典,没有将相同 tag 的所有 metricID 放在一起存储,在检索时,一个 tag 可能需要查询多次才能得到完整的 metricID 列表。另外查询出 metricID 后,还要再到索引文件中去检索 TSID 才能去数据文件查询数据,又增加了一次 IO 开销。这样来看的话,VictoriaMetrics 的索引文件在检索时,如果命中的时间线比较多的情况下,其 IO 开销会比较大,查询延迟也会比较高。

  这里我们了解了 TSID 这个非常重要的概念,还有几个结构体需要我们了解下,比如 rawRow 表示一个原始的时间序列行,MetricRow 表示插入到存储中的指标数据:

  复制

  1. // lib/storage/raw_row.go

  2. // rawRow 表示一个原始的时间序列行

  3. type rawRow struct {

  4. TSID TSID // 时间序列ID

  5. Timestamp int64 // 时间戳

  6. Value float64 // 给定时间戳的时间序列值

  7. // PrecisionBits是要存储的值中的有效位数,可能值为 [1..64]

  8. // 1 表示最大. 50% error, 2 - 25%, 3 - 12.5%, 64 没有错误, i.e.

  9. // 存储的值不会丢失精度

  10. PrecisionBits uint8

  11. }

  12. // libe/storage/storage.go

  13. // MetricRow 插入到存储中的指标

  14. type MetricRow struct {

  15. // MetricNameRaw 包含原始的指标名称,必须使用 metricne.UnmarshalRaw 对其进行解码。

  16. MetricNameRaw []byte

  17. Timestamp int64

  18. Value float64

  19. }

  插入指标

  有了上面几个概念的认识,现在我们回过头再去看下 vmstorage 中对 vminsert 请求的处理:

  复制

  1. // app/vmstorage/transport/server.go

  2. func (s *Server) processVMInsertConn(bc *handshake.BufferedConn) error {

  3. return clusternative.ParseStream(bc, func(rows []storage.MetricRow) error {

  4. vminsertMetricsRead.Add(len(rows))

  5. return s.storage.AddRows(rows, uint8(*precisionBits))

  6. }, s.storage.IsReadOnly)

  7. }

  当 vmstorage 节点接收到数据后,最后会通过回调执行 s.storage.AddRows(rows, uint8(*precisionBits)),该函数将数据添加到底层存储去:

  复制

  1. // lib/storage/storage.go

  2. // AddRows 添加 mrs 集合到存储 s

  3. func (s *Storage) AddRows(mrs []MetricRow, precisionBits uint8) error {

  4. if len(mrs) == 0 {

  5. return nil

  6. }

  7. // 限制可能向存储添加行的并发 goroutine 数量

  8. // 当太多的 goroutine 调用 AddRows 时,这应该可以防止内存不足错误和 CPU 抖动。

  9. select {

  10. // 如果写入 channel 成功,说明并发小于 CPU 最大核数,然后就可以走插入逻辑

  11. // 如果没写入成功(也就是满了),则执行default case

  12. case addRowsConcurrencyCh <- struct{}{}:

  13. default: // 如果插入 channel 失败,说明某个 insert 操作的协程被阻塞,这时需要通知 select 协程去让出。

  14. atomic.AddUint64(&s.addRowsConcurrencyLimitReached, 1)

  15. t := timerpool.Get(addRowsTimeout) // 获取一个30s超时的timer

      16.

  17. // 数据摄取优先级高于并发搜索

      18.

  19. // pacelimiter(步长限制器)中有个原子累加的变量,表示有多少个 insert 操作在等待

  20. // 走到这里证明有一个 insert 操作被阻塞了,调用 Inc,表示需要(Search操作)等待

  21. storagepacelimiter.Search.Inc()

  22.

      23. select { // 写入不成功或者还未超时就会阻塞在这里了

  24. // 在超时的30s时间内,尝试去写入 channel 队列

  25. case addRowsConcurrencyCh <- struct{}{}:

  26. timerpool.Put(t) // 把 timer 放回对象池,减少 GC

  27. // 可以成功写入 channel 了,那么可以执行 insert 操作了,则执行限制器的 Dec 操作,减一

  28. storagepacelimiter.Search.Dec()

  29. // 当限制器的等待数量为0的时候,会调用 cond.Broadcast() 去通知 select 协程开始工作。

  30. case <-t.C: // 到30s超时时间了

  31. // 把 timer 放回对象池,减少 GC timerpool.Put(t)

  32. // 超时了那么当前的 insert 就报错了,等待的数量就可以减一了

  33. storagepacelimiter.Search.Dec()

  34. atomic.AddUint64(&s.addRowsConcurrencyLimitTimeout, 1) // 记录下超时次数

  35. atomic.AddUint64(&s.addRowsConcurrencyDroppedRows, uint64(len(mrs))) // 记录没有被插入成功的 mr 数量

  36. // 等待了30秒仍然没有CPU资源,只能报错

  37. return fmt.Errorf("cannot add %d rows to storage in %s, since it is overloaded with %d concurrent writers; add more CPUs or reduce load",

  38. len(mrs), addRowsTimeout, cap(addRowsConcurrencyCh))

  39. }

  40. }

      41.

  42. // 下面是插入逻辑

  43. // 一次插入不要太大

  44. var firstErr error

  45. ic := getMetricRowsInsertCtx()

  46. maxBlockLen := len(ic.rrs)

  47. for len(mrs) > 0 {

  48. mrsBlock := mrs

  49. // 如果要插入的 mrs 超过了最大长度

  50. if len(mrs) > maxBlockLen {

  51. // 则先插入最大长度的 mrs mrsBlock = mrs[:maxBlockLen]

  52. // 剩下的 mrs 下次循环去处理

  53. mrs = mrs[maxBlockLen:]

  54. } else {

  55. mrs = nil

  56. }

  57. // 执行真正的 add 操作

  58. if err := s.add(ic.rrs, ic.tmpMrs, mrsBlock, precisionBits); err != nil {

  59. if firstErr == nil {

  60. firstErr = err

  61. }

  62. continue

  63. }

  64. // 记录下插入成功的 mrs 数量

  65. atomic.AddUint64(&rowsAddedTotal, uint64(len(mrsBlock)))

  66. }

  67. // 放回对象池

  68. putMetricRowsInsertCtx(ic)

      69.

  70. <-addRowsConcurrencyCh // insert 逻辑执行完成后,出队

      71.

  72. return firstErr

  73. }

  该函数的实现非常经典,会限制可能向存储添加数据的并发 goroutine 数量,当太多的 goroutine 调用 AddRows 时,可以防止内存不足错误和 CPU 抖动。这里实现了插入比查询更高的优先级,当资源不足时,查询操作会挂起让出资源给到插入操作使用。

  获取 TSID

  真正实现添加数据是下面的 add 函数,其中 rawRow 是原始的时序数据行,MetricRow 是要插入到存储中的行数据,该函数的核心就是要生成指标序列的 TSID 数据,如下所示:

  复制

  1. // lib/storage/storage.go

  2. func (s *Storage) add(rows []rawRow, dstMrs []*MetricRow, mrs []MetricRow, precisionBits uint8) error {

  3. // 当前使用的索引

  4. idb := s.idb()

  5. j := 0

  6. var (

  7. // 这些变量用于加速同一 metricName 的多个相邻行的批量导入。

  8. prevTSID TSID

  9. prevMetricNameRaw []byte

  10. )

  11. var pmrs *pendingMetricRows

  12. // 获取该数据块的最小时间和最大时间

  13. minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps()

      14.

  15. // 带有第几代索引信息的 TSID 对象

  16. var genTSID generationTSID

      17.

  18. // 只返回第一个错误,因为它返回所有错误没有意义

  19. var firstWarn error

  20. // 循环数据行,其实就是填充 rawRow 中的 TSID 数据

  21. for i := range mrs {

  22. mr := &mrs[i]

  23. if math.IsNaN(mr.Value) { // 值为 NaN

  24. if !decimal.IsStaleNaN(mr.Value) {

  25. // 跳过 Prometheus staleness 标记以外的 NaN

  26. // 因为底层编码不知道如何使用它们。

  27. continue

  28. }

  29. }

  30. // 如果指标的时间戳小于最小的时间戳

  31. // 则跳过保留期外时间戳过小的行

  32. if mr.Timestamp < minTimestamp {

  33. ......

  34. continue

  35. }

  36. // 同样跳过超过最大时间戳的数据

  37. if mr.Timestamp > maxTimestamp {

  38. ......

  39. continue

  40. }

  41. dstMrs[j] = mr

  42. r := &rows[j]

  43. j++

  44. r.Timestamp = mr.Timestamp

  45. r.Value = mr.Value

  46. r.PrecisionBits = precisionBits

  47. // 快速路径 - 当前 mr 包含与前一 mr 相同的指标名称,因此它包含相同的 TSID。

  48. if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {

  49. // 当许多行包含相同的 MetricNameRaw 时,应在批量导入时触发此路径。

  50. r.TSID = prevTSID

  51. continue

  52. }

  53. // 判断 TSID 是否在缓存中(命中缓存)

  54. if s.getTSIDFromCache(&genTSID, mr.MetricNameRaw) {

  55. r.TSID = genTSID.TSID

  56. // 跳过该行,因为已超出唯一序列数的限制。

  57. if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {

  58. j--

  59. continue

  60. }

  61. // 快速路径 - 给定 MetricNameRaw 的 TSID 已在缓存中找到,并且未删除。

  62. // 不需要检查 r.TSID.MetricID 是否已删除,因为 tsidCache 不包含已删除时间序列的 MetricName -> TSID 条目,可以查看 Storage.DeleteMetrics 的代码

  63. prevTSID = r.TSID // 设置前一个 TSID 的值

  64. prevMetricNameRaw = mr.MetricNameRaw // 设置前一个 MetricNameRaw 的值

  65.

      66. // 找到的TSID不是当前代的索引(来自上一代缓存下来的索引)

  67. if genTSID.generation != idb.generation {

  68. // 索引需要尝试使用该 TSID 重新填充当前代的索引数据

  69. // https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1401

  70. created, err := idb.maybeCreateIndexes(&genTSID.TSID, mr.MetricNameRaw)

  71. if err != nil {

  72. return fmt.Errorf("cannot create indexes in the current indexdb: %w", err)

  73. }

  74. if created {

  75. // 如果填充成功,则将当前的 TSID 设置为当前代索引

  76. genTSID.generation = idb.generation

  77. // 重新将该 TSID -> MetricNameRaw 数据放回缓存,方便后面的序列处理

  78. s.putTSIDToCache(&genTSID, mr.MetricNameRaw)

  79. }

  80. }

  81. continue

  82. }

  83.

      84. // 慢速路径 - 缓存中缺少TSID

  85. // 在下面的循环中推迟搜索

  86. j--

  87. if pmrs == nil {

  88. // 初始化 pendingMetricRows

  89. pmrs = getPendingMetricRows()

  90. }

  91. // 将 mr 数据添加到 pendingMetricRows 中去待处理

  92. if err := pmrs.addRow(mr); err != nil {

  93. // 错误时不要停止添加数据 - 只需跳过无效行即可。

  94. // 这保证了无效行不会阻止将有效行添加到存储中去。

  95. if firstWarn == nil {

  96. firstWarn = err

  97. }

  98. continue

  99. }

  100. }

  101. // 有指标的 TSID 没有在缓存中(上面的慢速路径)

  102. if pmrs != nil {

  103. // 按指标名称对 pendingMetricRows 进行排序,以便通过下面循环中的 “is” 加快搜索速度。

  104. pendingMetricRows := pmrs.pmrs

  105. sort.Slice(pendingMetricRows, func(i, j int) bool {

  106. return string(pendingMetricRows[i].MetricName) < string(pendingMetricRows[j].MetricName)

  107. })

  108. //

  109. is := idb.getIndexSearch(0, 0, noDeadline)

  110. prevMetricNameRaw = nil // 接收前一个 MetricNameRaw

  111. var slowInsertsCount uint64

  112. for i := range pendingMetricRows {

  113. pmr := &pendingMetricRows[i]

  114. mr := pmr.mr // MetricRaw

  115. dstMrs[j] = mr

  116. r := &rows[j]

  117. j++

  118. r.Timestamp = mr.Timestamp

  119. r.Value = mr.Value

  120. r.PrecisionBits = precisionBits

  121. // 快速路径 - 当前 mr 包含与前一个 mr 相同的指标名称,因此它包含相同的 TSID。

  122. if string(mr.MetricNameRaw) == string(prevMetricNameRaw) {

  123. // 当许多行包含相同的 MetricNameRaw 时,在批量导入时会触发该路径。

  124. r.TSID = prevTSID

  125. if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {

  126. // 跳过该行,因为已超出唯一序列数的限制

  127. j--

  128. continue

  129. }

  130. continue

  131. }

  132. // 慢速路径

  133. slowInsertsCount++ // 记录慢插入次数

  134. // 通过 MetricName 去获取(没有就创建)TSID 数据

  135. if err := is.GetOrCreateTSIDByName(&r.TSID, pmr.MetricName); err != nil {

  136. if firstWarn == nil {

  137. firstWarn = fmt.Errorf("cannot obtain or create TSID for MetricName %q: %w", pmr.MetricName, err)

  138. }

  139. j--

  140. continue

  141. }

  142. // 设置 genTSID 为当前生成的 TSID

  143. genTSID.generation = idb.generation

  144. genTSID.TSID = r.TSID

  145. // 返回缓存

  146. s.putTSIDToCache(&genTSID, mr.MetricNameRaw)

  147. // 缓存当前的 TSID 和 MetricNameRaw,方便下一条序列快速处理

  148. prevTSID = r.TSID

  149. prevMetricNameRaw = mr.MetricNameRaw

  150. if s.isSeriesCardinalityExceeded(r.TSID.MetricID, mr.MetricNameRaw) {

  151. // 跳过该行,因为已超出唯一序列数的限制

  152. j--

  153. continue

  154. }

  155. }

  156. // 回收对象

  157. idb.putIndexSearch(is)

  158. putPendingMetricRows(pmrs)

  159. atomic.AddUint64(&s.slowRowInserts, slowInsertsCount)

  160. }

  161. // 提示错误信息

  162. if firstWarn != nil {

  163. logger.WithThrottler("storageAddRows", 5*time.Second).Warnf("warn occurred during rows addition: %s", firstWarn)

  164. }

  165. dstMrs = dstMrs[:j]

  166. rows = rows[:j]

      167.

  168. // TSID 填充完成,可以插入数据了

  169. var firstError error

  170. if err := s.tb.AddRows(rows); err != nil {

  171. firstError = fmt.Errorf("cannot add rows to table: %w", err)

  172. }

  173. if err := s.updatePerDateData(rows, dstMrs); err != nil && firstError == nil {

  174. firstError = fmt.Errorf("cannot update per-date data: %w", err)

  175. }

  176. if firstError != nil {

  177. return fmt.Errorf("error occurred during rows addition: %w", firstError)

  178. }

  179. return nil

  180. }

  首先循环数据,把时间戳过小或过大的都过滤掉,然后就是想办法尽可能快地获取到指标的 TSID:

  快速路径- 当前 MetricRow 包含与前一 MetricRow 相同的指标名称,因此它们具有相同的 TSID,所以直接将当前对象的 TSID 设置成前一个 TSID,这是最快的方式。

  如果和前一个指标名称不一样,则去查看 genTSID 是否在缓存中(命中缓存)。

  如果命中缓存则 genTSID 中的 TSID 就是我们需要的,同时也将其设置为前一个 prevTSID。如果该 TSID 不是当代的索引(来自上一代缓存下来的索引),则需要尝试使用该 TSID 重新填充当代的索引数据,这和索引轮换有关,后面会详细说明。

  如果没有命中缓存,则属于慢速路径,将当前数据添加到pendingMetricRows 中去待处理。

  循环了所有指标数据后,接下来需要处理pendingMetricRows 中的数据,也就是缓存中没有对应的 TSID,此时就需要我们去生成对应的 TSID 数据。

  快速路径- 同样是当前 MetricRow 与前一个 MetricRow 的指标名称相同,因此它包含相同的 TSID,直接设置成前一个 TSID 即可。

  慢速路径- 走到这个分支则只能去创建 TSID 了,通过 MetricName 去获取(没有就创建)TSID 数据,也就是上面GetOrCreateTSIDByName 函数。获取后记得放到缓存中去。

  上面费了很大的功夫就是为了获取时间序列对应的 TSID 数据的,这也是插入数据过程中最可能出现慢插入的地方,因为该过程涉及到索引,比较耗时间,如果你插入的数据出现大量的高基数序列(比如包含一些随机生成的 ID 作为标签),则会大大降低 vmstorage 的插入性能。

  我们可以去查看下 GetOrCreateTSIDByName 函数的实现。

  复制

  1. // lib/storage/index_db.go

  2. // GetOrCreateTSIDByName 使用指定 metricName 的 TSID 填充 dst。

  3. func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName []byte) error {

  4. // hack:在多次连续未命中后跳过 TSID 的搜索

  5. // 这将提高大批量新时间序列的插入性能。

  6. if is.tsidByNameMisses < 100 {

  7. err := is.getTSIDByMetricName(dst, metricName)

  8. if err == nil {

  9. is.tsidByNameMisses = 0

  10. return nil

  11. }

  12. if err != io.EOF {

  13. return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)

  14. }

  15. is.tsidByNameMisses++

  16. } else {

  17. is.tsidByNameSkips++

  18. if is.tsidByNameSkips > 10000 {

  19. is.tsidByNameSkips = 0

  20. is.tsidByNameMisses = 0

  21. }

  22. }

  23. // 找不到给定名称的 TSID,创建它。

  24. // 如果 mn 的重复 TSID 是由并发 goroutines 创建的,那么这也是可以的。

  25. // 指标结果将在表搜索 TableSearch 后由 mn 合并。

  26. if err := is.db.createTSIDByName(dst, metricName); err != nil {

  27. return fmt.Errorf("cannot create TSID by MetricName %q: %w", metricName, err)

  28. }

  29. return nil

  30. }

  31. // 根据 metricName 去搜索 TSID

  32. func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {

  33. dmis := is.db.s.getDeletedMetricIDs()

  34. ts := &is.ts // TableSearch

  35. kb := &is.kb

  36. kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID) // MetricName -> TSID 的前缀

  37. kb.B = append(kb.B, metricName...)

  38. kb.B = append(kb.B, kvSeparatorChar)

  39. ts.Seek(kb.B) // Seek 查找 ts 中大于或等于 k 的第一项

  40. for ts.NextItem() { // 循环查找

  41. if !bytes.HasPrefix(ts.Item, kb.B) { // ts.Item 不是以 kb.B 为前缀

  42. // 没找到

  43. return io.EOF

  44. }

  45. v := ts.Item[len(kb.B):] // 获得尾部的值

  46. tail, err := dst.Unmarshal(v) // 填充dst

  47. if err != nil {

  48. return fmt.Errorf("cannot unmarshal TSID: %w", err)

  49. }

  50. if len(tail) > 0 { // 尾部还有值

  51. return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)

  52. }

  53. if dmis.Len() > 0 { // 有标记删除的 MetricID 列表

  54. // 验证 dst 是否标记为已删除。

  55. if dmis.Has(dst.MetricID) {

  56. // dst 被删除了,继续搜索。

  57. continue

  58. }

  59. }

  60. // 找到了有效的 dst

  61. return nil

  62. }

  63. if err := ts.Error(); err != nil {

  64. return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)

  65. }

  66. // 什么都没发现

  67. return io.EOF

  68. }

  该函数会获取 metricName 对应的 TSID,但是可能会出现多次连续未命中的情况,为了提高性能,这里做了一点 hack,如果连续未查询到 TSID 100 次则跳过搜索,就只能去创建 TSID 了,如果跳过了 10000 次则又重置可以重新去搜索。

  搜索 TSID 是通过下面的 getTSIDByMetricName 函数来实现的,创建 TSID 是通过 createTSIDByName 函数实现的。

  TSID 的生成方法如下所示:

  复制

  1. // lib/storage/index_db.go

  2. // 根据指定的 metricName 创建 TSID

  3. func (db *indexDB) createTSIDByName(dst *TSID, metricName []byte) error {

  4. mn := GetMetricName()

  5. defer PutMetricName(mn)

  6. if err := mn.Unmarshal(metricName); err != nil {

  7. return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)

  8. }

  9. // 创建 TSID

  10. created, err := db.getOrCreateTSID(dst, metricName, mn)

  11. if err != nil {

  12. return fmt.Errorf("cannot generate TSID: %w", err)

  13. }

  14. // TSID 创建后要创建索引,这一步是最耗时的

  15. if err := db.createIndexes(dst, mn); err != nil {

  16. return fmt.Errorf("cannot create indexes: %w", err)

  17. }

  18. // 不需要使 tag 缓存无效,因为它在 db 上无效,tb 通过传递给 OpenTable 的invalidateTagFiltersCache flushCallback 刷新。

  19. if created {

  20. // 仅当 indexDB 中未找到 tsid 时,才增加 newTimeseriesCreated 计数器

  21. atomic.AddUint64(&db.newTimeseriesCreated, 1)

  22. if logNewSeries {

  23. logger.Infof("new series created: %s", mn.String())

  24. }

  25. }

  26. return nil

  27. }

  28. // getOrCreateTSID 在 db.extDB 中查找指定 metricName 的 TSID

  29. // 如果找不到任何内容,则创建新的 TSID

  30. //

  31. // 如果 TSID 已创建,则返回 true;如果 TSID 在 extDB 中,则返回 false

  32. func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {

  33. // 在外部存储中搜索 TSID

  34. // 这个 db 通常来自上一个时期

  35. var err error

  36. // 相当于去上一个索引 db 中查找 TSID

  37. if db.doExtDB(func(extDB *indexDB) {

  38. err = extDB.getTSIDByNameNoCreate(dst, metricName)

  39. }) {

  40. if err == nil {

  41. // 已在外部存储中找到 TSID

  42. return false, nil

  43. }

  44. if err != io.EOF {

  45. return false, fmt.Errorf("external search failed: %w", err)

  46. }

  47. }

  48. // 在外部存储中找不到 TSID,在本地生成。

  49. generateTSID(dst, mn)

  50. return true, nil

  51. }

  52. // 生成 TSID 数据

  53. func generateTSID(dst *TSID, mn *MetricName) {

  54. dst.AccountID = mn.AccountID

  55. dst.ProjectID = mn.ProjectID

  56. // 根据 MetricName 中的 MetricGroup 使用 xxhash 的 sum64 算法生成。

  57. dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)

  58. // 假设 job-like metric 放在 mn.Tags[0],而 instance-like metric 放在 mn.Tags[1]

  59. // 这个假设是正确的,因为 mn.Tags 必须在调用 generateTSID() 函数之前使用 mn.sortTags() 进行排序。

  60. // 这允许对磁盘上彼此靠近的相同(job、instance)的数据块进行分组。

  61. // 当从磁盘读取相同 job 和/或 instance 的数据块时,这会减少磁盘寻道和磁盘读取 IO。

  62. // 例如,与 `process_resident_memory_bytes{job="vmstorage"}` 匹配的时间序列的数据块在磁盘上是物理相邻的。

  63. if len(mn.Tags) > 0 {

  64. dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value)) // 第一个Tag规定为 JobID

  65. }

  66. if len(mn.Tags) > 1 {

  67. dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value)) // 第二个Tag规定为 InstanceID

  68. }

  69. dst.MetricID = generateUniqueMetricID() // 生成唯一的指标ID

  70. }

  MetricID 通过 generateUniqueMetricID() 生成, 在重启时, nextUniqueMetricID 被赋值为当时的时间戳, 随后每次新的 TSID 的创建都会在此基础之上+1。

  复制

  1. // lib/storage/index_db.go

  2. // 生成唯一的 MetricID

  3. func generateUniqueMetricID() uint64 {

  4. // 期望的是从此函数返回的 metricID 必须是密集的。

  5. // 如果它们是稀疏的,那么这可能会损害 metric_ids 与 uint64set.Set 的交集性能。

  6. return atomic.AddUint64(&nextUniqueMetricID, 1)

  7. }

  8. // 该数在重新启动时不能倒退,否则可能会发生 metricID 冲突。

  9. // 所以不要在 VictoriaMetrics 重新启动期间更改服务器上的时间。

  10. var nextUniqueMetricID = uint64(time.Now().UnixNano())

  但是我们可能在这里看不懂 TSID 是如何去搜索或者创建的,这就需要我们去了解下 VM 中的倒排索引了。

  倒排索引

  当创建完 TSID 后, 需要建立一系列的索引供查找时使用。在 VM 中不同类型的索引都是通过 KV 关系来描述,在代码中称为 Item , Item 的结构如下:

  在 VM 中 Item 的整体上是一个 KV 结构的字节数组,共计有 7 种类型,每种类型的 Item 通过固定前缀来区分,前缀类型如下图所示。

  在 storage/index_db.go: createIndexes 函数中,去分别建立了各个索引,生成 Items,代码如下所示:

  复制

  1. // lib/storage/index_db.go

  2. // 创建索引

  3. func (db *indexDB) createIndexes(tsid *TSID, mn *MetricName) error {

  4. // 索引 items 的顺序很重要,它保证了索引的一致性。

  5. ii := getIndexItems()

  6. defer putIndexItems(ii)

  7. // 创建 MetricName -> TSID 的索引。

  8. ii.B = append(ii.B, nsPrefixMetricNameToTSID) // 前缀

  9. ii.B = mn.Marshal(ii.B)

  10. ii.B = append(ii.B, kvSeparatorChar) // 分隔符

  11. ii.B = tsid.Marshal(ii.B)

  12. ii.Next()

  13. // 创建 MetricID -> MetricName 索引。

  14. ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName, mn.AccountID, mn.ProjectID)

  15. ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)

  16. ii.B = mn.Marshal(ii.B)

  17. ii.Next()

  18. // 创建 MetricID -> TSID 索引

  19. ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID, mn.AccountID, mn.ProjectID)

  20. ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)

  21. ii.B = tsid.Marshal(ii.B)

  22. ii.Next()

  23. // 创建 Tag -> MetricID 索引

  24. prefix := kbPool.Get()

  25. prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)

  26. ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)

  27. kbPool.Put(prefix)

  28. // 将 Items 添加到 Table 中去

  29. return db.tb.AddItems(ii.Items)

  30. }

  对于 ask{market="NYSE",ticker="GOOG"} 1.23 的时序指标,对应的 MetricName 为 AccountID=0, ProjectID=0, ask{market="NYSE",ticker="GOOG"},假设生成的 TSID 为:

  复制

  1. {

  2. AccountID: 0

  3. ProjectID: 0

  4. MetricGroupID: 6661248876682682060

  5. JobID: 3817370224

  6. InstanceID: 4166188337

  7. MetricID: 1654132102944898001

  8. }

  则生成的索引 Item 逻辑结构如下图所示:

  上图为构建的 MetricName -> TSID 的索引,前缀为 nsPrefixMetricNameToTSID=0,整个索引项就是一个 key: value 的形式,key 为 MetricName 编码后的值,value 为 TSID 编码后的值,中间通过一个 kvSeparator 的分隔符进行连接,当然这些值真正的存储形式都是 []byte。除了上图的这个索引之外还有几个其他的索引:MetricID -> MetricName、MetricID -> TSID、Tag -> MetricID,方式都是一样的,只是要注意每种索引的前缀是不一样的。最后得到的索引就是上面构建的几种索引的集合数组。

  索引构建完成后又是如何去持久化数据的呢?保存的数据又是怎样的格式呢?

  来源: k8s技术圈

  >>>>>>点击进入系统运维专题

赞(11)
踩(0)
分享到:
华为认证网络工程师 HCIE直播课视频教程