Learn Influxdb the hard way (2) - Dive into the code backbone

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介:

前言

在上一篇文章我们从上帝视角鸟瞰了一下Influxdb的组件结构,在这篇文章中,我们会开始深入代码,从代码的流程中帮助大家切割Influxdb的核心组件。希望在看本篇文章的时候,大家能够将Influxdb的代码下载的本地,对照进行查看。

git clone -b v1.5.0 git@github.com:influxdata/influxdb.git
AI 代码解读

代码主干流程

Influxdb的源码仓库中包含了influx、influx_inspect、influx_stress、influx_tsm、influxd、store等多个子项目,对于本系列而言,更多的侧重在Influxd也就是Influxdb的主要存储Server。

首先我们进入到cmd/influxd/main.go的入口文件,进行代码跟踪,进入到run命令的代码分支流程下。

    //cmd/influxd/run/command.go 133行
    
    s, err := NewServer(config, buildInfo)
    if err != nil {
        return fmt.Errorf("create server: %s", err)
    }
    s.Logger = cmd.Logger
    s.CPUProfile = options.CPUProfile
    s.MemProfile = options.MemProfile
    if err := s.Open(); err != nil {
        return fmt.Errorf("open server: %s", err)
    }
    cmd.Server = s

    // Begin monitoring the server's error channel.
    go cmd.monitorServerErrors()
AI 代码解读

可以看到Influxdb中Server的构建,并最终调用了Server的Open方法启动Server,这个Server对象是Influxdb的逻辑封装。我们来看下Server包含的内容。

    //cmd/influxd/run/server.go 158行
    
    s.Monitor = monitor.New(s, c.Monitor)
    s.config.registerDiagnostics(s.Monitor)

    if err := s.MetaClient.Open(); err != nil {
        return nil, err
    }

    s.TSDBStore = tsdb.NewStore(c.Data.Dir)
    s.TSDBStore.EngineOptions.Config = c.Data

    // Copy TSDB configuration.
    s.TSDBStore.EngineOptions.EngineVersion = c.Data.Engine
    s.TSDBStore.EngineOptions.IndexVersion = c.Data.Index

    // Create the Subscriber service
    s.Subscriber = subscriber.NewService(c.Subscriber)

    // Initialize points writer.
    s.PointsWriter = coordinator.NewPointsWriter()
    s.PointsWriter.WriteTimeout = time.Duration(c.Coordinator.WriteTimeout)
    s.PointsWriter.TSDBStore = s.TSDBStore

    // Initialize query executor.
    s.QueryExecutor = query.NewQueryExecutor()
    s.QueryExecutor.StatementExecutor = &coordinator.StatementExecutor{
        MetaClient:  s.MetaClient,
        TaskManager: s.QueryExecutor.TaskManager,
        TSDBStore:   coordinator.LocalTSDBStore{Store: s.TSDBStore},
        ShardMapper: &coordinator.LocalShardMapper{
            MetaClient: s.MetaClient,
            TSDBStore:  coordinator.LocalTSDBStore{Store: s.TSDBStore},
        },
        Monitor:           s.Monitor,
        PointsWriter:      s.PointsWriter,
        MaxSelectPointN:   c.Coordinator.MaxSelectPointN,
        MaxSelectSeriesN:  c.Coordinator.MaxSelectSeriesN,
        MaxSelectBucketsN: c.Coordinator.MaxSelectBucketsN,
    }
    s.QueryExecutor.TaskManager.QueryTimeout = time.Duration(c.Coordinator.QueryTimeout)
    s.QueryExecutor.TaskManager.LogQueriesAfter = time.Duration(c.Coordinator.LogQueriesAfter)
    s.QueryExecutor.TaskManager.MaxConcurrentQueries = c.Coordinator.MaxConcurrentQueries

    // Initialize the monitor
    s.Monitor.Version = s.buildInfo.Version
    s.Monitor.Commit = s.buildInfo.Commit
    s.Monitor.Branch = s.buildInfo.Branch
    s.Monitor.BuildTime = s.buildInfo.Time
    s.Monitor.PointsWriter = (*monitorPointsWriter)(s.PointsWriter)
AI 代码解读

Server的核心代码主要就是实例化内部组件,主要包含Monitor、Subscriber、PointsWriter和QueryExecutor。在上篇文章中已经简单的介绍过这几个组件的作用,在此先不过多赘述,我们再来看下Server的Open方法。

    //cmd/influxd/run/server.go 371行  
    
    s.appendMonitorService()
    s.appendPrecreatorService(s.config.Precreator)
    s.appendSnapshotterService()
    s.appendContinuousQueryService(s.config.ContinuousQuery)
    s.appendHTTPDService(s.config.HTTPD)
    s.appendStorageService(s.config.Storage)
    s.appendRetentionPolicyService(s.config.Retention)
    for _, i := range s.config.GraphiteInputs {
        if err := s.appendGraphiteService(i); err != nil {
            return err
        }
    }
    for _, i := range s.config.CollectdInputs {
        s.appendCollectdService(i)
    }
    for _, i := range s.config.OpenTSDBInputs {
        if err := s.appendOpenTSDBService(i); err != nil {
            return err
        }
    }
    for _, i := range s.config.UDPInputs {
        s.appendUDPService(i)
    }

    s.Subscriber.MetaClient = s.MetaClient
    s.PointsWriter.MetaClient = s.MetaClient
    s.Monitor.MetaClient = s.MetaClient

    s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)

    // Configure logging for all services and clients.
    if s.config.Meta.LoggingEnabled {
        s.MetaClient.WithLogger(s.Logger)
    }
    s.TSDBStore.WithLogger(s.Logger)
    if s.config.Data.QueryLogEnabled {
        s.QueryExecutor.WithLogger(s.Logger)
    }
    s.PointsWriter.WithLogger(s.Logger)
    s.Subscriber.WithLogger(s.Logger)
    for _, svc := range s.Services {
        svc.WithLogger(s.Logger)
    }
    s.SnapshotterService.WithLogger(s.Logger)
    s.Monitor.WithLogger(s.Logger)

    // Open TSDB store.
    if err := s.TSDBStore.Open(); err != nil {
        return fmt.Errorf("open tsdb store: %s", err)
    }

    // Open the subcriber service
    if err := s.Subscriber.Open(); err != nil {
        return fmt.Errorf("open subscriber: %s", err)
    }

    // Open the points writer service
    if err := s.PointsWriter.Open(); err != nil {
        return fmt.Errorf("open points writer: %s", err)
    }

    s.PointsWriter.AddWriteSubscriber(s.Subscriber.Points())

    for _, service := range s.Services {
        if err := service.Open(); err != nil {
            return fmt.Errorf("open service: %s", err)
        }
    }
AI 代码解读

在Server的Open中先进行了Service的注册,然后将实例化的内部组件进行启动并设置相应的Logger等配置,最后依次启动注册在Server上的服务。

我们可以挑选appendPrecreatorService作为范例进行分析,来看下一个Service的注册过程。

//cmd/influxd/run/server.go 320行  

func (s *Server) appendPrecreatorService(c precreator.Config) error {
    if !c.Enabled {
        return nil
    }
    srv := precreator.NewService(c)
    srv.MetaClient = s.MetaClient
    s.Services = append(s.Services, srv)
    return nil
}
AI 代码解读

在本例中讲解的是PrecreatorService这个Service的注册过程,首先实例化一个PrecreatorService,然后设置其使用的MetaClient,最后将PrecreatorService的实例注册到Server上,在Influxdb中Service是一个接口类型,需要实现如下三个方法,分别负责统一的日志接入,启动和停止。

//cmd/influxd/run/server.go 558行

// Service represents a service attached to the server.
type Service interface {
    WithLogger(log *zap.Logger)
    Open() error
    Close() error
}
AI 代码解读

这样Influxdb的代码基本已经切割清晰了,首先是一个Server负责实例化几个全局的内部组件单例,然后生成上层业务包装的Service,最后再依次启动。

最后

在接下来的文章中,我们会深入到每个Service中,依次讲解他们的功能与原理。如果有错误的地方麻烦大家多多指正。

目录
打赏
0
0
0
0
78903
分享
相关文章
文献解读-Prediction of axillary lymph node metastasis in triple-negative breast cancer by multi-omics analysis and an integrated model
研究旨在为三阴性乳腺癌患者提供更准确的腋窝淋巴结转移风险评估工具。研究者综合分析了临床病理信息、基因组和转录组数据,构建了一个多组学预测模型。
59 4
文献解读-Processing UMI Datasets at High Accuracy and Efficiency with the Sentieon ctDNA Analysis Pipeline
Sentieon ctDNA分析流程通过创新的算法设计和高效的软件实现,为高深度、大panel的ctDNA测序数据分析提了一个快速而准确的解决方案。它在多个数据集上均展现出优于或等同于现有方法的性能,同时大幅提高了处理速度。这一进展有望推动ctDNA技术在临床肿瘤学中的广泛应用,特别是在早期癌症检测和最小残留病监测等领域。
75 8
文献解读-Sentieon DNAscope LongRead – A highly Accurate, Fast, and Efficient Pipeline for Germline Variant Calling from PacBio HiFi reads
PacBio® HiFi 测序是第一种提供经济、高精度长读数测序的技术,其平均读数长度超过 10kb,平均碱基准确率达到 99.8% 。在该研究中,研究者介绍了一种准确、高效的 DNAscope LongRead 管道,用于从 PacBio® HiFi 读数中调用胚系变异。DNAscope LongRead 是对 Sentieon 的 DNAscope 工具的修改和扩展,该工具曾获美国食品药品管理局(FDA)精密变异调用奖。
66 2
文献解读-Sentieon DNAscope LongRead – A highly Accurate, Fast, and Efficient Pipeline for Germline Variant Calling from PacBio HiFi reads
轻量级网络论文精度笔(一):《Micro-YOLO: Exploring Efficient Methods to Compress CNN based Object Detection Model》
《Micro-YOLO: Exploring Efficient Methods to Compress CNN based Object Detection Model》这篇论文提出了一种基于YOLOv3-Tiny的轻量级目标检测模型Micro-YOLO,通过渐进式通道剪枝和轻量级卷积层,显著减少了参数数量和计算成本,同时保持了较高的检测性能。
161 2
轻量级网络论文精度笔(一):《Micro-YOLO: Exploring Efficient Methods to Compress CNN based Object Detection Model》
【博士每天一篇文献-算法】Memory augmented echo state network for time series prediction
本文介绍了一种记忆增强的回声状态网络(MA-ESN),它通过在储层中引入线性记忆模块和非线性映射模块来平衡ESN的记忆能力和非线性映射能力,提高了时间序列预测的性能,并在多个基准数据集上展示了其优越的记忆能力和预测精度。
70 3
【博士每天一篇文献-算法】Memory augmented echo state network for time series prediction
【博士每天一篇论文-算法】Overview of Echo State Networks using Different Reservoirs and Activation Functions
本文研究了在物联网网络中应用回声状态网络(ESN)进行交通预测的不同拓扑结构,通过与SARIMA、CNN和LSTM等传统算法的比较,发现特定配置的ESN在数据速率和数据包速率预测方面表现更佳,证明了ESN在网络流量预测中的有效性。
50 4
[Initial Image Segmentation Generator]论文实现:Efficient Graph-Based Image Segmentation
[Initial Image Segmentation Generator]论文实现:Efficient Graph-Based Image Segmentation
101 1
Caffe(Convolutional Architecture for Fast Feature Embedding)
Caffe(Convolutional Architecture for Fast Feature Embedding)是一个流行的深度学习框架,主要用于图像分类、物体检测和语义分割等计算机视觉任务。它由Berkeley Vision and Learning Center(BVLC)开发,使用C++编写,提供了高效的神经网络实现和训练工具。
239 1
DEPPN:Document-level Event Extraction via Parallel Prediction Networks 论文解读
当在整个文档中描述事件时,文档级事件抽取(DEE)是必不可少的。我们认为,句子级抽取器不适合DEE任务,其中事件论元总是分散在句子中
181 0
DEPPN:Document-level Event Extraction via Parallel Prediction Networks 论文解读
On the Unreasonable Effectiveness of Feature propagation in Learning on Graphs with Missing 论文阅读笔记
On the Unreasonable Effectiveness of Feature propagation in Learning on Graphs with Missing 论文阅读笔记
250 0
On the Unreasonable Effectiveness of Feature propagation in Learning on Graphs with Missing 论文阅读笔记
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等