Learn Influxdb the hard way (4) - Services in Influxdb II

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介:

前言

在上篇文章中我们讲解了Influxdb中MonitorService的工作原理,在本篇文章中会继续讲解PrecreatorService,从名字上来看,这个Service的作用是预先创建一些资源,那么具体Influxdb会预先初始化哪些资源,以及这些资源原先创建的优势是什么呢,让我们今天一起来从代码里面找答案。

PrecreatorService解析

首先我们来看下PrecreatorService的创建过程,从这个Service的结构中我们可以看到它包含了两个时间的属性checkInterval和advancePeriod,此外包含了一个MetaClient接口的引用以及一个waitgroup,由此可以猜测,这个Service的主要实现方式是通过周期性的通过多个并发的goroutines实现创建资源,然后再讲meta信息通过MetaClient的实例应用写入到meta.db中。

//cmd/influxd/run/server.go 320行
func (s *Server) appendPrecreatorService(c precreator.Config) error {
    if !c.Enabled {
        return nil
    }
    srv := precreator.NewService(c)
    srv.MetaClient = s.MetaClient
    s.Services = append(s.Services, srv)
    return nil
}

// services/precreator/service.go 12行
// Service manages the shard precreation service.
type Service struct {
    checkInterval time.Duration
    advancePeriod time.Duration

    Logger *zap.Logger

    done chan struct{}
    wg   sync.WaitGroup

    MetaClient interface {
        PrecreateShardGroups(now, cutoff time.Time) error
    }
}

// NewService returns an instance of the precreation service.
func NewService(c Config) *Service {
    return &Service{
        checkInterval: time.Duration(c.CheckInterval),
        advancePeriod: time.Duration(c.AdvancePeriod),
        Logger:        zap.NewNop(),
    }
}

下面我们就通过实际的这个Service的运行Code进行下验证。从源码中可以发现,这个Service主要作用就是在checkInterval的周期到达的时候,通过时间戳进行ShardGroups的创建。

// services/precreator/service.go 71行
// runPrecreation continually checks if resources need precreation.
func (s *Service) runPrecreation() {
    defer s.wg.Done()

    for {
        select {
        case <-time.After(s.checkInterval):
            if err := s.precreate(time.Now().UTC()); err != nil {
                s.Logger.Info("Failed to precreate shards", zap.Error(err))
            }
        case <-s.done:
            s.Logger.Info("Terminating precreation service")
            return
        }
    }
}

// precreate performs actual resource precreation.
func (s *Service) precreate(now time.Time) error {
    cutoff := now.Add(s.advancePeriod).UTC()
    return s.MetaClient.PrecreateShardGroups(now, cutoff)
}

在代码中有一个很有趣的用法cutoff := now.Add(s.advancePeriod).UTC(),这个的作用是什么呢,查询了一下官方的文档,这个参数的主要作用是提前多长时间创建ShardGroups,默认情况下checkInterval的取值为10m,而advancePeriod的取值为30m,也就是说Influxdb会每10分钟进行一下检查,看是否需要为30分钟后预创建ShardGroups。接下来深入到创建ShardGroups的源码中。

// services/meta/client.go 777行
// PrecreateShardGroups creates shard groups whose endtime is before the 'to' time passed in, but
// is yet to expire before 'from'. This is to avoid the need for these shards to be created when data
// for the corresponding time range arrives. Shard creation involves Raft consensus, and precreation
// avoids taking the hit at write-time.
func (c *Client) PrecreateShardGroups(from, to time.Time) error {
    c.mu.Lock()
    defer c.mu.Unlock()
    data := c.cacheData.Clone()
    var changed bool

    for _, di := range data.Databases {
        for _, rp := range di.RetentionPolicies {
            if len(rp.ShardGroups) == 0 {
                // No data was ever written to this group, or all groups have been deleted.
                continue
            }
            g := rp.ShardGroups[len(rp.ShardGroups)-1] // Get the last group in time.
            if !g.Deleted() && g.EndTime.Before(to) && g.EndTime.After(from) {
                // Group is not deleted, will end before the future time, but is still yet to expire.
                // This last check is important, so the system doesn't create shards groups wholly
                // in the past.

                // Create successive shard group.
                nextShardGroupTime := g.EndTime.Add(1 * time.Nanosecond)
                // if it already exists, continue
                if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil {
                    c.logger.Info("Shard group already exists",
                        logger.ShardGroup(sg.ID),
                        logger.Database(di.Name),
                        logger.RetentionPolicy(rp.Name))
                    continue
                }
                newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime)
                if err != nil {
                    c.logger.Info("Failed to precreate successive shard group",
                        zap.Uint64("group_id", g.ID), zap.Error(err))
                    continue
                }
                changed = true
                c.logger.Info("New shard group successfully precreated",
                    logger.ShardGroup(newGroup.ID),
                    logger.Database(di.Name),
                    logger.RetentionPolicy(rp.Name))
            }
        }
    }

    if changed {
        if err := c.commit(data); err != nil {
            return err
        }
    }

    return nil
}

在前几篇文章中我们稍微涉及了一些关于Influxdb存储结构上的内容,一个Influxdb可以包含多个库,每个库的存储是根据retention policy来分目录的,而retention policy之下才是真正的Shard,因此在预创建Shard的时候,需要为每一个符合条件的库以及库之下的retention policy都创建Shard。
如果此时retention policy下没有任何的Shard,则会认为无需预创建,因为大部分的情况下,这种场景意味着数据被清空的中间态。为了保证创建的Shard的时序性,在预创建的时候会获取一个retention policy下的最后一个shard,并检查当前shard的所属时间序列是否会和新创建的Shard有重合,只有新创建的Shard与最新的Shard之间不存在重合关系的时候,才会进行创建。继续跟踪代码,查看创建Shard的流程。

// services/meta/data.go 350行
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
    // Find retention policy.
    rpi, err := data.RetentionPolicy(database, policy)
    if err != nil {
        return err
    } else if rpi == nil {
        return influxdb.ErrRetentionPolicyNotFound(policy)
    }

    // Verify that shard group doesn't already exist for this timestamp.
    if rpi.ShardGroupByTimestamp(timestamp) != nil {
        return nil
    }

    // Create the shard group.
    data.MaxShardGroupID++
    sgi := ShardGroupInfo{}
    sgi.ID = data.MaxShardGroupID
    sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
    sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
    if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
        // Shard group range is [start, end) so add one to the max time.
        sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
    }

    data.MaxShardID++
    sgi.Shards = []ShardInfo{
        {ID: data.MaxShardID},
    }

    // Retention policy has a new shard group, so update the policy. Shard
    // Groups must be stored in sorted order, as other parts of the system
    // assume this to be the case.
    rpi.ShardGroups = append(rpi.ShardGroups, sgi)
    sort.Sort(ShardGroupInfos(rpi.ShardGroups))

    return nil
}

当我们看到创建Shard的代码逻辑的时候,其实是有些失望的,此时创建的Shard支持在内存中进行了创建,并没有在磁盘中创建相应的目录结构,那么在磁盘上面Shard的目录结构和对应的Shard的名字是怎么来的呢。实际上这个Shard的信息只是在meta与内存中作了一个标示,并不是真的开始存储数据,这部分的内容会在PointsWriter.WritePoints的时候刷到磁盘上的,在后面解析PointsWriter的部分会进行深入的讨论。

总结

至此PrecreatorService的部分已经比较明晰了,他的原理非常简单,通过异步定时执行,建立在内存中和meta数据中的shard索引,为后续写入数据的Shard分布提供索引,这样当数据通过PointsWriter写入数据时会根据对应的索引hash分布到真正的Shard中。也就是说通过预分配Shard的部分可以让数据更好的Hash到不同的存储文件中,通过时间戳可以对应到Shard,从而可以使得数据更好的切分,更快的查询。

目录
相关文章
|
10月前
|
安全
禁用Kibana安全提示(Elasticsearch built-in security features are not enabled)
禁用Kibana安全提示(Elasticsearch built-in security features are not enabled)
103 0
|
机器学习/深度学习 Ubuntu iOS开发
【Elastic Engineering】Beats:解密 Filebeat 中的 setup 命令
这个步骤非常重要,但是描述的内容并不是很多。为什么需要这个步骤呢?它到底能够做什么呢?
515 0
【Elastic Engineering】Beats:解密 Filebeat 中的 setup 命令
|
存储 测试技术 API
【Elastic Engineering】Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
181 0
【Elastic Engineering】Elasticsearch:Runtime fields 入门, Elastic 的 schema on read 实现 - 7.11 发布
|
弹性计算 API 索引
|
JSON 自然语言处理 数据库
|
数据可视化 API 索引
【Elastic Engineering】Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布
Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布
287 0
【Elastic Engineering】Elasticsearch:创建 Runtime field 并在 Kibana 中使用它 - 7.11 发布
|
索引
|
机器学习/深度学习 存储 安全
|
存储 索引