阿里云InfluxDB® 版对接Kapacitor指南

简介: Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环,阿里云InfluxDB® 版对接Kapacitor指南新鲜出炉

背景

InfluxDB subscription(订阅)主要用于Kapacitor,但是任何可以接受UDP、HTTP或者HTTPS连接的终端,都可以订阅InfluxDB并获得写入InfluxDB的数据。
在InfluxDB中,创建、删除和查看subscription都需要admin权限,为了满足客户的需求,完善InfluxDB®的生态,我们最新的引擎版本已经支持客户通过控制台创建admin权限账号,详情见文档

介绍

Kapacitor是一个数据处理框架,用于告警、ETL(Extract-Transform-Load)和检测异常。Kapacitor是TICK生态中的最后一环。Kapacitor的主要特性包括:

  • 既可以处理流数据(streaming data),也可以处理批量数据(batch data)
  • 定期从InfluxDB查询数据,并且通过行协议或其它InfluxDB支持的方式获得数据
  • 执行InfluxQL支持的函数
  • 将处理结果存回InfluxDB
  • 支持添加用户自定义的函数检测异常
  • 与HipChat、OpsGenie、Alerta、Sensu、PagerDuty和Slack等集成

具体的介绍可查看Kapacitor官方文档

Kapacitor & Pull

Kapacitor将Prometheus关于service discover和scraping的代码集成到Kapacitor,所以,Kapacitor也可以支持pull模式。通过Kapacitor拉取数据写入InfluxDB,InfluxDB也可以间接地实现pull模式。

流处理 & 批处理

在Kapacitor中,用户可以使用TICKscript实现数据转换、降精度和告警等功能,特别地,可分为流处理和批处理:

  • 流处理:在InfluxDB中创建subscription,每个写入InfluxDB的数据点,同时也写入Kapacitor。
  • 批处理:Kapacitor周期性地从InfluxDB读取数据(在指定时间区间范围内的数据)。

什么时候需要流处理:

  • 需要实时地对每个数据点进行转换(这种操作也可以使用批处理,但是会有延迟)
  • 需要非常低的延迟,需要马上触发警报的场景
  • 当InfluxDB处理大量查询负载时,需要减轻InfluxDB的查询压力
  • 流处理根据数据点的时间戳了解时间,不需要判断一个数据点是否会在时间窗口外,然而在批处理中,数据点可能会因为延迟而落在相关时间窗口外

什么时候需要批处理:

  • 执行聚合操作
  • 不需要对每个数据点都进行判断是否需要告警(因为不经常发生状态改变)
  • 数据降精度
  • 稍微的延迟不会严重影响您的操作
  • 对于有超高吞吐量的InfluxDB实例(因为Kapacitor处理数据的速度无法像数据写入InfluxDB那样快)
  • 需要注意的是,流处理需要缓存更多的数据,占用较多的内存。

Kapacitor & Continuous Queries

Kapacitor的批处理跟InfluxDB的连续查询(Continuous Queries)有点类似,都是周期性地查询InfluxDB中的数据。那么,什么时候该用Kapacitor,什么时候该用CQ呢?

  • 大量CQ导致InfluxDB查询负载大时用Kapacitor,需要用流处理,因为批处理同样会增加InfluxDB的查询压力
  • 需要执行复杂的数据转换时用Kapacitor,Kapacitor支持用户自定义的函数,不仅仅是InfluxQL函数
  • 需要告警、异常检查等功能时用Kapacitor
  • 只需将少量数据进行降精度时用CQ,当查询压力不大时,用CQ即可

Kapacitor安装

登录Influxdata官网根据操作系统类型选择所需版本,下载链接,本文以centos为例

  • 1、 下载rpm包 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.5.3.x86_64.rpm
  • 2、安装sudo yum localinstall kapacitor-1.5.3.x86_64.rpm
  • 3、修改配置文件 /etc/kapacitor/kapacitor.conf ,主要是influxdb相关配置,现在Influxdb只能通过VPC网络外发数据,所以需要登录TSDB控制台,开通VPC网络双向访问,demo如下
[[influxdb]]
  # Connect to an InfluxDB cluster
  # Kapacitor can subscribe, query and write to this cluster.
  # Using InfluxDB is not required and can be disabled.
  enabled = true
  default = true
  name = "localhost"
  urls = ["https://ts-*****.influxdata.rds.aliyuncs.com:3242"]
  username = "admin"
  password = "Admin1234"
  timeout = 0
  # Absolute path to pem encoded CA file.
  # A CA can be provided without a key/cert pair
  #   ssl-ca = "/etc/kapacitor/ca.pem"
  # Absolutes paths to pem encoded key and cert files.
  #   ssl-cert = "/etc/kapacitor/cert.pem"
  #   ssl-key = "/etc/kapacitor/key.pem"
  # Do not verify the TLS/SSL certificate.
  # This is insecure.
  insecure-skip-verify = false
  # Maximum time to try and connect to InfluxDB during startup
  startup-timeout = "5m"
  # Turn off all subscriptions
  disable-subscriptions = false
  # Subscription mode is either "cluster" or "server"
  subscription-mode = "cluster"
  # Which protocol to use for subscriptions
  # one of 'udp', 'http', or 'https'.
  subscription-protocol = "http"
  # Subscriptions resync time interval
  # Useful if you want to subscribe to new created databases
  # without restart Kapacitord
  subscriptions-sync-interval = "1m0s"
  # Override the global hostname option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to `hostname` if empty.
  kapacitor-hostname = "192.168.11.1"
  # Override the global http port option for this InfluxDB cluster.
  # Useful if the InfluxDB cluster is in a separate network and
  # needs special config to connect back to this Kapacitor instance.
  # Defaults to the port from `[http] bind-address` if 0.
  http-port = 0
  # Host part of a bind address for UDP listeners.
  # For example if a UDP listener is using port 1234
  # and `udp-bind = "hostname_or_ip"`,
  # then the UDP port will be bound to `hostname_or_ip:1234`
  # The default empty value will bind to all addresses.
  udp-bind = ""
  # Subscriptions use the UDP network protocl.
  # The following options of for the created UDP listeners for each subscription.
  # Number of packets to buffer when reading packets off the socket.
  udp-buffer = 1000
  # The size in bytes of the OS read buffer for the UDP socket.
  # A value of 0 indicates use the OS default.
  udp-read-buffer = 0
  [influxdb.subscriptions]
    # Set of databases and retention policies to subscribe to.
    # If empty will subscribe to all, minus the list in
    # influxdb.excluded-subscriptions
    #
    test =["autogen"]
    telegraf=["autogen"]
    # Format
    # db_name = <list of retention policies>
    # Example:
    # my_database = [ "default", "longterm" ]
  [influxdb.excluded-subscriptions]
    # Set of databases and retention policies to exclude from the subscriptions.
    # If influxdb.subscriptions is empty it will subscribe to all
    # except databases listed here.
    #
    # Format
    # db_name = <list of retention policies>
    #
    # Example:
    # my_database = [ "default", "longterm" ]
  • 4、启动服务 sudo service kapacitor start
  • 5、启动后Influxdb 服务端就可以看到订阅信息
> SHOW SUBSCRIPTIONS
name: test
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
name: telegraf
retention_policy name                                           mode destinations
---------------- ----                                           ---- ------------
autogen          kapacitor-9ffbc8c7-fb55-46ca-84f7-e8102a7f07fb ANY  [http://192.168.11.1:9092]
> quit
[root@r10g04440.sqa.zmf /bin]

Kapacitor task例子

本文主要介绍stream方式的报警和流处理任务

  • 报警任务配置及发布如下
[root@izuf63izb8ng929n94h5b6z ~]# cat cpu.tick 
dbrp "test"."autogen"
stream
    // Select just the cpu measurement from our example database.
    |from()
        .measurement('cpu')
    |alert()
        .crit(lambda: int("usage_idle") <  70)
        // Whenever we get an alert write it to a file.
        .log('/tmp/alerts.log')
* kapacitor define cpu_alter -tick cpu.tick
* kapacitor list task
* kapacitor enable cpu_alter 
[root@izuf63izb8ng929n94h5b6z ~]# cat telegraf.tick 
dbrp "telegraf"."autogen"
stream
    |from()
        .database('telegraf')
        .measurement('cpu')
        .groupBy(*)
    |window()
        .period(5m)
        .every(5m)
        .align()
    |mean('usage_idle')
        .as('usage_idle')
    |influxDBOut()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mean_cpu_idle')
        .precision('s')
* kapacitor define telgrafstream -tick telegraf.tick
* kapacitor list tasks
* kapacitor enable telgrafstream
* kapacitor list tasks
  • 运行效果
[root@izuf63izb8ng929n94h5b6z ~]# kapacitor list tasks
ID            Type      Status    Executing Databases and Retention Policies
cpu_alter     stream    enabled   true      ["test"."autogen"]
telgrafstream stream    enabled   true      ["telegraf"."autogen"]
目录
相关文章
|
4月前
sms4j对接阿里云短信
sms4j对接阿里云短信
185 0
|
3月前
新版阿里云内容安全对接
新版阿里云内容安全对接
72 1
|
3月前
对接阿里云RTC
对接阿里云RTC
34 0
|
3月前
对接阿里云RTC
对接阿里云RTC
52 0
|
6月前
|
存储 JavaScript 前端开发
html+vue组件实现阿里云OSS对接
html+vue组件实现阿里云OSS对接
426 0
|
7月前
|
存储 人工智能 安全
阿里云oss简介和如何对接使用
阿里云对象存储服务(Alibaba Cloud Object Storage Service,简称OSS)是阿里云提供的一种安全、稳定、高效的对象存储服务。它支持多元数据存储、持久化存储和共享访问,并且具有无限的扩展性和备份恢复能力。阿里云OSS适用于各类场景,如云计算、大数据分析、人工智能等,并且具备高可用性、高可扩展性和低成本等优势。
624 0
|
8月前
|
算法 API Python
使用 Python 对接阿里云 OpenAPI 自签名
使用 Python 对接阿里云 OpenAPI 自签名
272 1
|
9月前
|
对象存储
阿里云OSS如何对接外链网盘
阿里云OSS如何对接外链网盘
301 1
|
10月前
|
存储 数据采集 JavaScript
php对接阿里云API调用企业税号查询的高级实战案例解析(下拉筛选查询、远程调用API、xm-select组件应用)
php对接阿里云API调用企业税号查询的高级实战案例解析(下拉筛选查询、远程调用API、xm-select组件应用)
458 31
|
10月前
|
安全 Java API
阿里云短信服务对接流程
阿里云短信服务是阿里巴巴集团推出的一款通讯服务产品,支持发送短信到手机、验证码短信、语音短信等多种短信场景。通过阿里云短信服务,企业可以方便、快捷、安全地将消息发送给用户,提升用户体验和品牌形象。此外,阿里云短信服务还提供了丰富的API接口和SDK,方便开发者集成到自己的应用中。 基本上我们的生活已经不能脱离阿里云短信服务了,比如注册一个APP,登录一个APP,或者找回密码等。但是很多小白用户不知道怎么使用阿里云短信,下面小编就和大家系统讲解一下