使用EMR来进行mysqlbinlog日志准实时传输

  1. 云栖社区>
  2. 博客列表>
  3. 正文

使用EMR来进行mysqlbinlog日志准实时传输

qiaozhou 2018-01-04 10:40:28 浏览775 评论1

摘要: 如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输

简介

本文将介绍如何利用阿里云的sls插件功能和emr来进行mysql binlog的准实时传输

基本架构

rds -> sls -> spark streaming -> spark hdfs

主要包含3个链路:

1. 怎么把rds的binlog收集到sls

2.怎么通过spark streaming将sls中的日志读取出来,进行分析

3.怎么把2中读取和处理过的日志,保存到spark hdfs中

环境准备

需要mysql类型数据库、开通sls服务并添加对应的project和logstore、然后创建一个emr集群。

具体步骤

准备好mysql数据库环境,添加用户权限

首先需要一个mysql类型数据库(使用mysql协议,例如RDS、DRDS等),
数据库开启binlog,且配置binlog类型为ROW模式(RDS默认开启)。

  1. 先检查下环境:

  ### 查看是否开启binlog
  mysql> show variables like "log_bin";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | log_bin       | ON    |
  +---------------+-------+
  1 row in set (0.02 sec)
  ### 查看binlog类型
  mysql> show variables like "binlog_format";
  +---------------+-------+
  | Variable_name | Value |
  +---------------+-------+
  | binlog_format | ROW   |
  +---------------+-------+
  1 row in set (0.03 sec)

  2. 添加权限,当然也可以直接通过rds控制台添加:

  CREATE USER canal IDENTIFIED BY 'canal';
  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  FLUSH PRIVILEGES;

开通sls,添加对应的配置文件,并检查数据是否正常采集

需要开通sls,并在sls控制台添加对应的project和logstore,例如:我创建的project叫canaltest,logstore叫canal.
然后对sls进行配置,配置文件目录在:/etc/ilogtail下创建文件:user_local_config.json,具体配置文件如下:


 {
      "metrics": {
          "##1.0##canaltest$plugin-local": {
              "aliuid": "****",
              "enable": true,
              "category": "canal",
              "defaultEndpoint": "*******",
              "project_name": "canaltest",
              "region": "cn-hangzhou",
              "version": 2
              "log_type": "plugin",
              "plugin": {
                  "inputs": [
                      {
                          "type": "service_canal",
                          "detail": {
                              "Host": "*****",
                              "Password": "****",
                              "ServerID": ****,
                              "User" : "***",
                              "DataBases": [
                                  "yourdb"
                              ],
                              "IgnoreTables": [
                                  "\\S+_inner"
                              ],
                               "TextToString" : true
                          }
                      }
                  ],
                  "flushers": [
                      {
                          "type": "flusher_sls",
                          "detail": {}
                      }
                  ]
              }
          }
      }
  }

其中detail中的Host和Password等信息为MySQL数据库信息,User请用之前授权过的用户名。
aliUid、defaultEndpoint、project_name、category请根据自己的实际情况填写对应的用户和sls信息。
配置好后,我们过2分钟左右,通过sls控制台,看下数据有没有上来,具体如图:
sls日志.png
如果没有正常采集上来,请根据sls的提示,查看sls的采集日志进行排查。

准备好代码,编译成jar包,上传到oss

  • 代码准备

我们可以将emr的example代码通过git复制下来,修改下,具体命令如下:

git clone https://github.com/aliyun/aliyun-emapreduce-demo.git

example代码中已经有LoghubSample这个类,主要用于从sls采集数据,然后直接打印出来。
我们只要稍做修改,就可以达到我们的目的了。
以下是我修改后的代码:


package com.aliyun.emr.example

  import org.apache.spark.SparkConf
  import org.apache.spark.storage.StorageLevel
  import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
  import org.apache.spark.streaming.{Milliseconds, StreamingContext}

  object LoghubSample {
    def main(args: Array[String]): Unit = {
      if (args.length < 7) {
        System.err.println(
          """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
            |            
            |           
          """.stripMargin)
        System.exit(1)
      }

      val loghubProject = args(0)
      val logStore = args(1)
      val loghubGroupName = args(2)
      val endpoint = args(3)
      val accessKeyId = args(4)
      val accessKeySecret = args(5)
      val batchInterval = Milliseconds(args(6).toInt * 1000)

      val conf = new SparkConf().setAppName("Mysql Sync")
  //    conf.setMaster("local[4]");
      val ssc = new StreamingContext(conf, batchInterval)
      val loghubStream = LoghubUtils.createStream(
        ssc,
        loghubProject,
        logStore,
        loghubGroupName,
        endpoint,
        1,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK)

      loghubStream.foreachRDD(rdd =>
          rdd.saveAsTextFile("/mysqlbinlog")
      )

      ssc.start()
      ssc.awaitTermination()
    }
  }

其中主要改动是:



loghubStream.foreachRDD(rdd =>
       rdd.saveAsObjectFile("/mysqlbinlog")
)


这样在emr里运行时,就会把spark streaming中流出来的数据,保存到emr的hdfs中。

注意

  1. 由于如果要在本地运行起来,请在本地环境提前搭建hadoop集群。
  2. 由于emr的spark sdk做了升级,他的example code有点旧,不能直接在参数中传递oss的accessKeyId、accessKeySecret 而是需要通过sparkConf设置进来。

trait RunLocally {
  val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.mapreduce.job.run-local", "true")
  conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
  conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
  conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
  conf.set("spark.hadoop.job.runlocal", "true")
  conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
  conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")


  val sc = new SparkContext(conf)

  def getAppName: String
}

  1. 在本地调试时,需要把


loghubStream.foreachRDD(rdd =>
rdd.saveAsObjectFile("/mysqlbinlog")
)

中的/mysqlbinlog修改成本地hdfs的地址

  • 代码编译

在本地调试完成后,我们可以通过如下命令进行打包编译:

mvn clean install

  • 上传jar包

通过oss控制台或oss的sdk将target/shaded目录下的examples-1.1-shaded.jar上传到oss上

我在oss上建立了bucket为qiaozhou-emr/jar的目录,这个上传后的jar包地址为oss://qiaozhou-emr/jar/examples-1.1-shaded.jar

注意,这个地址在后续会用上,如下图:

4e0bc88da76dab1f1660ead1e3f0fa233460da8f


搭建emr集群,创建任务,运行执行计划,查看结果

  1. 首先我们需要通过emr控制台创建一个emr集群,这个过程比较漫长,需要10分钟左右,请耐心等待
  2. 创建类型为spark的作业
    --master yarn --deploy-mode client --driver-memory 4g --executor-memory 2g --executor-cores 2 --class com.aliyun.emr.example.LoghubSample ossref://emr-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $sls_endpoint $sls_access_id $sls_secret_key 1
    
    

请根据你的具体配置将

slsendpointslsendpointsls_access_id $sls_secret_key替换成真实值
注意参数的顺序,否则会报错。
3. 创建执行计划,将作业和emr集群绑定后,开始运行
4. 找到master节点的ip,如图:
Master.png

通过ssh登录上去后,执行命令:

hadoop fs -ls /


就会看到mysqlbinlog开头的目录,再通过命令:

hadoop fs -ls /mysqlbinlog


查看mysqlbinlog文件,如图:
hdfs1.png

最后还可以通过命令:

hadoop fs -cat /mysqlbinlog/part-00000


如图:

查看文件内容,是不是很神奇呀?

错误排查

如果没有出来正常的结果,可以通过emr的运行记录,来进行问题排查,如图:
errortrace.png

用云栖社区APP,舒服~

【云栖快讯】云栖社区技术交流群汇总,阿里巴巴技术专家及云栖社区专家等你加入互动,老铁,了解一下?  详情请点击

网友评论

1F
jeokerman

user_local_config.json这个配置文件是需要在MYSQL所在的机器上面配置吗?还需要安装Logtail吗?logstore中选择的数据类型应该是什么?

元乙

您好,user_local_config.json 只需要在控制台配置会自动下发,Logtail需要安装在可以通mysql的机器就可以。详细可以参见https://yq.aliyun.com/articles/338423。binlog采集可以加钉钉群联系更方便:[Aliyun用户]日志服务-SLS,也可以加我钉钉:元乙

评论
qiaozhou
文章2篇 | 关注3
关注
一款端到端一体化实时监控解决方案的PaaS级阿里云产品。通过该产品,用户可以基于海量的数据迅... 查看详情
支持以数据库为核心的结构化存储产品之间的数据传输。 它是一种集数据迁移、数据订阅及数据实时同... 查看详情
针对日志类数据的一站式服务,用户无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,... 查看详情
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效... 查看详情
阿里中间件云大使

阿里中间件云大使