阿里云InfluxDB®之snapshot及其内存优化

简介: 作为阿里在APM和IOT领域的重要布局,时序数据库承载着阿里对于物理网和未来应用监控市场的未来和排头兵,作为业内排名第一的时序数据库InfluxDB,其在国内和国际都拥有了大量的用户,阿里适逢其时,重磅推出了阿里云 InfluxDB®。

简介

作为阿里在APM和IOT领域的重要布局,时序数据库承载着阿里对于物理网和未来应用监控市场的未来和排头兵,作为业内排名第一的时序数据库InfluxDB,其在国内和国际都拥有了大量的用户,阿里适逢其时,重磅推出了阿里云 InfluxDB®。
         限于篇幅,本文仅就InfluxDB的其中一个模块:snapshot,对其机制和内存使用的优化进行分析。

为什么要做snapshot

InfluxDB采用的是TSM引擎,TSM 存储引擎主要由几个部分组成: cache、wal、tsm file、compactor
s1

TSM存储引擎,其核心思想类似于LSM Tree,它会将最近的数据缓存在磁盘中,在达到预设的阈值之后就会触发snapshot,也就是我们常说的快照刷盘。
内存的作用是为了缓存,加速查询。snapshot主要是解决数据持久化落盘问题。

Snapshot的工作机制

由于snapshot是将cache中的数据刷到磁盘,那么首先,我们来看一下Cache的内部结构。

Cache的内部结构

s2

如上图所示,每一个cache内部划分成了16个partition。每1个partition内部包含一个map,所有的map其key为SeriesKey,value为entry。

// Value represents a TSM-encoded value.
type Value interface {
    // UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
    UnixNano() int64

    // Value returns the underlying value.
    Value() interface{}

    // Size returns the number of bytes necessary to represent the value and its timestamp.
    Size() int

    // String returns the string representation of the value and its timestamp.
    String() string

    // internalOnly is unexported to ensure implementations of Value
    // can only originate in this package.
    internalOnly()
}

// Values represents a slice of  values.
type Values []Value

// entry is a set of values and some metadata.
type entry struct {
    mu     sync.RWMutex
    values Values // All stored values.
    vtype byte
}

entry是一个Value类型的数组,Value本身是一个接口,按照值类型的不同分为:FloatValue、StringValue、BooleanValue、IntegerValue、FloatValue、StringValue。 
以FloatValue为例,每一种类型的Value包含了一个int类型的时间戳和具体的值value。

type FloatValue struct {
    unixnano int64
    value    float64
}

Snapshot的流程

从代码层面上来讲,从总体的概要流程如下:
s3

下面让我们来逐个分析下:

Snapshot的入口

if e.enableCompactionsOnOpen {
   e.SetCompactionsEnabled(true)
}

Snapshot的机制

// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache() {
    t := time.NewTicker(time.Second)
    defer t.Stop()
    for {
        e.mu.RLock()
        quit := e.snapDone
        e.mu.RUnlock()

        select {
        case <-quit:
            tsdb.UpdateCacheSize(e.id, 0, e.logger)
            return

        case <-t.C:
            e.Cache.UpdateAge()
            tsdb.UpdateCacheSize(e.id, e.Cache.Size(), e.logger)
            if e.ShouldCompactCache(time.Now()) {
                start := time.Now()
                e.traceLogger.Info("Compacting cache", zap.String("path", e.path))
                err := e.WriteSnapshot()
                if err != nil && err != errCompactionsDisabled {
                    e.logger.Info("Error writing snapshot", zap.Error(err))
                    atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)
                } else {
                    atomic.AddInt64(&e.stats.CacheCompactions, 1)
                }
                atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())
            }
        }
    }
}

每隔1秒钟检查一次,是否达到snapshot的条件。
snapshot的条件有两个:
是否达到配置的阈值。(默认情况下是25M)
离上次snapshot的间隔是否超越:cache-snapshot-write-cold-duration的配置。(默认情况下是10min)

Snapshot的具体实现

那么,这里涉及两个问题:

1、落盘的文件格式如何?
2、snapshot刷盘的过程本身是如何进行的?
我们先看落盘的文件格式:

/>

TSM文件包括了三个部分:Series Data Section、Series Index Section、 Footer。

1、Series Data Section生成:
Series Data Section有若干个Series Data Block组成。
其中对于Series Data Block,这个是在内存中完成组装的,具体是有cacheKeyIterator.encode函数完成,代码如下:

func (c *cacheKeyIterator) encode() {
    concurrency := runtime.GOMAXPROCS(0)
    n := len(c.ready)

    // Divide the keyset across each CPU
    chunkSize := 1
    idx := uint64(0)

    for i := 0; i < concurrency; i++ {
        // Run one goroutine per CPU and encode a section of the key space concurrently
        go func() {
            tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)
            fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)
            benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)
            uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)
            senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)
            ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)

            defer putTimeEncoder(tenc)
            defer putFloatEncoder(fenc)
            defer putBooleanEncoder(benc)
            defer putUnsignedEncoder(uenc)
            defer putStringEncoder(senc)
            defer putIntegerEncoder(ienc)

            for {
                i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize

                if i >= n {
                    break
                }

                key := c.order[i]
                values := c.cache.values(key)

                for len(values) > 0 {

                    end := len(values)
                    if end > c.size {
                        end = c.size
                    }

                    minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
                    var b []byte
                    var err error

                    switch values[0].(type) {
                    case FloatValue:
                        b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
                    case IntegerValue:
                        b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
                    case UnsignedValue:
                        b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
                    case BooleanValue:
                        b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
                    case StringValue:
                        b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
                    default:
                        b, err = Values(values[:end]).Encode(nil)
                    }

                    values = values[end:]

                    c.blocks[i] = append(c.blocks[i], cacheBlock{
                        k:       key,
                        minTime: minTime,
                        maxTime: maxTime,
                        b:       b,
                        err:     err,
                    })

                    if err != nil {
                        c.err = err
                    }
                }
                // Notify this key is fully encoded
                c.ready[i] <- struct{}{}
            }
        }()
    }
}

其中针对几个不同的数据类型分别通过不同Encoder来进行组装,最后是形成了一个cacheBlock的二维数组,并保存在iter当中。
接下来的问题就是如何将这些二维数组刷盘。

2、依次刷盘
前面我们已知iter中保留有这些cacheBlock,我们只需要遍历迭代器就可以将这些数据刷盘。但还有一个问题,Series Index Section如何生成?
因为Series Index Section最终由IndexEntry构成,而IndexEntry中minTime和maxTime、Size都可以由cacheBlock的数据得到,关键是Offset。
其实Offset的计算是随着迭代的过程,不断地往前走,就像在一个Buffer中,填满了一个Series Data Block,就会更新一次Offset。

关键的代码如下:

    n, err := t.w.Write(block) // Write的过程中,会更新t.n,也就是offset
    if err != nil {
        return err
    }
    n += len(checksum)

    // Record this block in index
    t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n)) //t.n 就是offset

总结以上所述,大体的过程是,encode生层Series Data Block, 在迭代过程中,生成了Series Index Section,最终,将Series Index Section Append到Series Data Block 就生成了TSM文件。

那么问题来了Series Index Section的保存时需要空间的,如果是Series Index Section占用的内存过大,则可能会因此加大了程序OOME的风险。

snapshot内存使用的优化

如果有n个cache,同时做snapshot。 则耗费的内存为: n IndexSize。 
例如:5db
4 retention, IndexSize = 50m。 则节省:5 4 50 = 1G的内存使用量。

所以,我们可以想到的一个优化点是:在snapshot过程当中利用文件来做Series Index Section的暂存区,从而节省这一部分内存。

验证

当我们利用磁盘来做Index的缓冲区时,系统在snapshot的过程中,会生成1个临时的索引文件,如下图所示。
s5

而不用磁盘来做Index缓冲区的时候,则不会生成这个文件,如下图所示。
s6

经过我们的长时间的稳定性测试,证实,在利用磁盘来做Index的缓冲区时,能有效降低系统大压力下的OOME概率。

商业化

阿里云InfluxDB®现已正式商业化,欢迎访问购买页面(https://common-buy.aliyun.com/?commodityCode=hitsdb_influxdb_pre#/buy)与文档(https://help.aliyun.com/document_detail/113093.html?spm=a2c4e.11153940.0.0.57b04a02biWzGa)。

目录
相关文章
|
29天前
|
存储 分布式计算 网络协议
阿里云服务器内存型r7、r8a、r8y实例区别参考
在阿里云目前的活动中,属于内存型实例规格的云服务器有内存型r7、内存型r8a、内存型r8y这几个实例规格,相比于活动内的经济型e、通用算力型u1实例来说,这些实例规格等性能更强,与计算型和通用型相比,它的内存更大,因此这些内存型实例规格主要适用于数据库、中间件和数据分析与挖掘,Hadoop、Spark集群等场景,本文为大家介绍内存型r7、r8a、r8y实例区别及最新活动价格,以供参考。
阿里云服务器内存型r7、r8a、r8y实例区别参考
|
2月前
|
弹性计算
2024阿里云幻兽帕鲁/Palworld服务器价格表(CPU/内存/带宽/磁盘收费标准)
2024年阿里云幻兽帕鲁专用服务器的价格根据不同的配置有所不同。 • 4核16G配置的价格为32元/月,如果选择购买3个月,则价格为96元。 • 8核32G配置的价格为90元/月,如果选择购买3个月,则价格为271元。 另外,还有配置为4核16G10M带宽的服务器,其价格为26元/月起。而8核32G10M带宽的价格也是90元/月。
93 1
|
7月前
|
弹性计算 人工智能 测试技术
阿里云服务器租用价格表(最新CPU/内存/带宽/磁盘收费标准)
阿里云服务器租用价格表(最新CPU/内存/带宽/磁盘收费标准)阿里云轻量应用服务器2核2G3M带宽轻量服务器一年108元,2核4G4M带宽轻量服务器一年297.98元12个月;ECS云服务器e系列2核2G配置182元一年
2324 0
|
19天前
|
存储 缓存 PHP
阿里云服务器实例、CPU内存、带宽、操作系统选择参考
对于使用阿里云服务器的用户来说,云服务器的选择和使用非常重要,如果实例、内存、CPU、带宽等配置选择错误,可能会影响到自己业务在云服务器上的计算性能及后期运营状况,本文为大家介绍一下阿里云服务器实例、CPU内存、带宽、操作系统的选择注意事项,以供参考。
阿里云服务器实例、CPU内存、带宽、操作系统选择参考
|
1月前
|
弹性计算 固态存储 Linux
2024年阿里云服务器租用详细价格表(CPU/内存/带宽/系统盘)
2024阿里云服务器租用优惠价格表,轻量服务器2核2G3M带宽轻量服务器一年61元,2核4G4M带宽轻量服务器一年165元12个月,ECS云服务器e系列2核2G配置、3M固定带宽、40G ESSD Entry云盘,99元一年、2核4G服务器30元3个月、2核4G配置365元一年、2核8G配置522元一年,云服务器u1、云服务器c7、g7和r7优惠价格表,CPU内存带宽系统盘配置详细报价:
736 3
|
1月前
|
弹性计算 固态存储 调度
阿里云服务器部署配置选择全攻略,ECS实例规格、CPU内存配置
阿里云服务器部署配置选择全攻略,ECS实例规格、CPU内存配置,CPU内存、公网带宽和系统盘怎么选择?个人用户选择轻量应用服务器或ECS通用算力型u1云服务器,企业用户选择ECS计算型c7、通用型g7云服务器,阿里云百科分享阿里云服务器配置选择方法
|
1月前
|
弹性计算 固态存储 调度
阿里云服务器选购指南_2024新版CPU内存带宽系统盘选择攻略
阿里云服务器选购指南_2024新版CPU内存带宽系统盘选择攻略,CPU内存、公网带宽和系统盘怎么选择?个人用户选择轻量应用服务器或ECS通用算力型u1云服务器,企业用户选择ECS计算型c7、通用型g7云服务器,阿里云百科分享阿里云服务器配置选择方法
|
1月前
|
弹性计算 固态存储 调度
阿里云配置服务器详细指南_2024新版CPU内存带宽系统盘选择
阿里云配置服务器详细指南_2024新版CPU内存带宽系统盘选择,阿里云服务器配置怎么选择?CPU内存、公网带宽和系统盘怎么选择?个人用户选择轻量应用服务器或ECS通用算力型u1云服务器,企业用户选择ECS计算型c7、通用型g7云服务器,阿里云百科分享阿里云服务器配置选择方法
|
2月前
|
弹性计算 大数据 测试技术
阿里服务器租用多少钱一年?阿里云服务器租用价格表(最新CPU/内存/带宽/磁盘收费标准)
阿里服务器租用多少钱一年?阿里云服务器租用价格表(最新CPU/内存/带宽/磁盘收费标准)。阿里云服务器的租用费用因实例类型、地域、配置等因素而有所不同,价格范围可以从几百元到几千元不等。2024年阿里云服务器租用费用价格表更新,云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月,幻兽帕鲁4核16G和8核32G服务器配置,云服务器ECS可以选择经济型e实例、通用算力u1实
|
2月前
|
弹性计算 大数据 测试技术
2024阿里云服务器租用价格表(CPU/内存/带宽/磁盘收费标准)
阿里云服务器分为轻量应用服务器和云服务器ECS,轻量适合个人开发者使用,搭建轻量级的网站、测试环境使用;专业级如大数据、科学计算、高并发网站等需要使用云服务器ECS。2024年阿里云服务器租用价格表出炉!云服务器ECS经济型e实例2核2G、3M固定带宽99元一年、ECS u1实例2核4G、5M固定带宽、80G ESSD Entry盘优惠价格199元一年,轻量应用服务器2核2G3M带宽轻量服务器一年61元、2核4G4M带宽轻量服务器一年165元12个月、2核4G服务器30元3个月,幻兽帕鲁4核16G和8核32G服务器配置,云服务器ECS可以选择经济型e实例、通用算力u1实例、ECS计算型c7、通
457 1

热门文章

最新文章