CanalSharp-mysql数据库binlog的增量订阅&消费组件Canal的.NET客户端

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

一.前言

CanalSharp是阿里巴巴开源项目mysql数据库binlog的增量订阅&消费组件 Canal 的.NET客户端,关于什么是 Canal?又能做什么?我会在后文为大家一一介绍。CanalSharp 这个项目,是由我和 WithLin (主要贡献) 完成,并将一直进行维护的Canal的.NET客户端项目。目前开源在github:https://github.com/CanalSharp/CanalSharp/ 希望大家多多支持,旨在为.NET开发者提供一个友好的对接Canal的选择,为.NET社区生态做贡献。

二.Canal介绍

1.背景

早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。

  1. 目前内部版本已经支持mysql和oracle部分版本的日志解析,当前的canal开源版本支持5.7及以下的版本(阿里内部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)

基于日志增量订阅&消费支持的业务:

  1. 数据库镜像
  2. 数据库实时备份
  3. 多级索引 (卖家和买家各自分库索引)
  4. search build
  5. 业务cache刷新
  6. 价格变化等重要业务消息

2.工作原理

2.1 mysql主备复制实现

1537856325274

从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据。

2.2 Canal的工作原理

1537856417016

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

以上内容摘自Canal项目官方资料 https://github.com/alibaba/canal

3.Canal的安装以及使用

Canal的安装以及使用请查阅官方文档,本文不在赘述。 https://github.com/alibaba/canal/wiki

三.CanalSharp介绍

1.工作原理

CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。

2.工作流程

1.Canal连接到mysql数据库,模拟slave

2.CanalSharp与Canal建立连接

2.数据库发生变更写入到binlog

5.Canal向数据库发送dump请求,获取binlog并解析

4.CanalSharp向Canal请求数据库变更

4.Canal发送解析后的数据给CanalSharp

5.CanalSharp收到数据,消费成功,发送回执。(可选)

6.Canal记录消费位置。

以一张图来表示:

1537860226808

3.应用场景

CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。这里我举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

四.CanalSharp的使用

1.使用前的准备

使用 CanalSharp 之前,必然要先准备好mysql数据库以及Canal才行,这个步骤请直接查阅Canal官方文档 https://github.com/alibaba/canal/wiki 。但是为了让大家能快速跑通CanalSharp,CanalSharp 项目为大家提供了一个通过 docker-compose 同时运行 mysql和canal。

2.通过docker-compose运行mysql和canal:

git clone https://github.com/CanalSharp/CanalSharp.git
cd docker
docker-compose up -d

出现下图表示运行成功:

1537866674285

3.使用navicat等数据库管理工具连接mysql

ip:运行docker的服务器ip

mysql用户:root

mysql密码:000000

mysql端口:4406

默认提供了一个test数据库,然后有一张名为test的表。

1537866852816

4.创建一个 .NET Core 控制台项目

5.添加 Nuget 程序包

Install-Package CanalSharp.Client

1537867132107

6.编码

也可以直接下载源码运行 Sample 项目 https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient

(1)建立连接

   //canal 配置的 destination,默认为 example
    var destination = "example";
    //创建一个简单CanalClient连接对象(此对象不支持集群)传入参数分别为 canal地址、端口、destination、用户名、密码
    var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
    //连接 Canal
    connector.Connect();
    //订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
    connector.Subscribe(".*\\\\..*");

(2)获取数据

  while (true)
    {
        //获取数据 1024表示数据大小 单位为字节
        var message = connector.Get(1024);
        //批次id 可用于回滚
        var batchId = message.Id;
        if (batchId == -1 || message.Entries.Count <= 0)
        {
            Thread.Sleep(300);
            continue;
        }

        PrintEntry(message.Entries);
    }

(3)输出数据

/// <summary>
/// 输出数据
/// </summary>
/// <param name="entrys">一个entry表示一个数据库变更</param>
private static void PrintEntry(List<Entry> entrys)
{
    foreach (var entry in entrys)
    {
        if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
        {
            continue;
        }

        RowChange rowChange = null;

        try
        {
            //获取行变更
            rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
        }
        catch (Exception e)
        {

        }

        if (rowChange != null)
        {
            //变更类型 insert/update/delete 等等
            EventType eventType = rowChange.EventType;
            //输出binlog信息 表名 数据库名 变更类型
            Console.WriteLine(
                $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

            //输出 insert/update/delete 变更类型列数据
            foreach (var rowData in rowChange.RowDatas)
            {
                if (eventType == EventType.Delete)
                {
                    PrintColumn(rowData.BeforeColumns.ToList());
                }
                else if (eventType == EventType.Insert)
                {
                    PrintColumn(rowData.AfterColumns.ToList());
                }
                else
                {
                    Console.WriteLine("-------> before");
                    PrintColumn(rowData.BeforeColumns.ToList());
                    Console.WriteLine("-------> after");
                    PrintColumn(rowData.AfterColumns.ToList());
                }
            }
        }

    }
}

/// <summary>
/// 输出每个列的详细数据
/// </summary>
/// <param name="columns"></param>
private static void PrintColumn(List<Column> columns)
{
    foreach (var column in columns)
    {
        //输出列明 列值 是否变更
        Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
    }
}

7.测试运行

首次运行会输出一堆数据,那些都是初始化运行创建表的数据,忽略即可

运行项目,然后一次执行sql观察输出:

insert into test values(1000,'111');
update test set name='222' where id=1000;
delete from test where id=1000;

通过新标签页打开图片查看大图

可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。

五.使用Canal的经验

1.mysql数据库版本有要求:5.7.13, 5.6.10,、5.5.18和5.1.40/48,不一定非要满足小版本号的要求,比如 5.7.x、5.6.x、5.5.x都应该可以,但是实际需要自己做测试。前面的具体版本号是Canal官方提供的资料,但是博主公司用的mysql 的版本是5.5.60,是可以正常使用Canal的。

2.mysql数据binlog的格式强烈建议设置为row

3.Canal并非必须连接到master数据库,它同样可以连接到slave数据库,只是从库出了需要开启写入binlog以外还需要设置 log-slave-updates 开启。

4.如果生产环境已经存在mysql集群,且集群主库的binlog格式为mixed,mysql数据库集群的主库binlog格式可以不用改依然为 mixed,设置某一个从库binlog格式配置为 row,让Canal连接从库,这样可以避免对生产环境的mysql集群产生影响。

5.mysql支持Statement,MiXED,以及ROW三种格式的binlog为什么推荐使用row格式binlog,经过博主实际测试,使用row格式兼容性是最好的,实际可以自己测试。

六.结束语

CanalSharp的介绍到这里就结束了,如果觉得这个项目有用的欢迎大家来个 star 。后续将会写几篇文章介绍更详细的使用方法以及实战。

七.资料

CanalSharp 开源地址:https://github.com/CanalSharp/CanalSharp

Canal 开源地址:https://github.com/alibaba/canal

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
28天前
|
关系型数据库 MySQL
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
elasticsearch对比mysql以及使用工具同步mysql数据全量增量
21 0
|
3月前
|
SQL 数据库 C++
C++ Qt开发:Charts与数据库组件联动
Qt 是一个跨平台C++图形界面开发库,利用Qt可以快速开发跨平台窗体应用程序,在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置,实现图形化开发极大的方便了开发效率,本章将重点介绍`Charts`组件与`QSql`数据库组件的常用方法及灵活运用。在之前的文章中详细介绍了关于`QCharts`绘图组件的使用方式,本章将继续延续这个知识点,通过使用`QSql`数据库模块动态的读取某一个时间节点上的数据,当用户点击查询数据时则动态的输出该事件节点的所有数据,并将数据绘制到图形组件内,实现动态查询图形的功能。
34 0
C++ Qt开发:Charts与数据库组件联动
|
4月前
|
存储 SQL 数据库
C++ Qt开发:数据库与TableView多组件联动
在Qt中,通常我们不会在`TableView`等组件中保存数据,一般会将这些数据存储至数据库或者是文件中保存,当使用时则动态的在数据库中调出来,以下案例将实现,当用户点击并选中`TableView`组件内的某一行时,我们通过该行中的`name`字段查询,并将查询结果关联到`ListView`组件内,同时将`TableView`中选中行的字段分别显示在窗体底部的`LineEdit`编辑框内。Qt 是一个跨平台C++图形界面开发库,利用Qt可以快速开发跨平台窗体应用程序,在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置,实现图形化开发极大的方便了开发效率,本章将重点介绍`TableView`
42 1
C++ Qt开发:数据库与TableView多组件联动
|
4月前
|
SQL 数据库连接 数据库
C++ Qt开发:QSqlDatabase数据库组件
Qt SQL模块是Qt框架的一部分,它提供了一组类和函数,用于在Qt应用程序中进行数据库操作。这个模块的目标是简化数据库访问和操作,并提供一致的接口,使得开发者可以方便地与不同数据库系统进行交互。一般SQL组件常用的操作,包括读取数据、插入数据、更新数据、删除数据功能,这四个功能我将分别介绍它是如何使用的。Qt 是一个跨平台C++图形界面开发库,利用Qt可以快速开发跨平台窗体应用程序,在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置,实现图形化开发极大的方便了开发效率,本章将重点介绍`QSqlDatabase`数据库模块的常用方法及灵活运用。
27 0
C++ Qt开发:QSqlDatabase数据库组件
|
1月前
|
SQL 关系型数据库 MySQL
【MySQL】— —熟练掌握用SQL语句实现数据库和基本表的创建。熟练掌握MySQL的安装、客户端登录方法;熟练掌握MySQL的编码、数据类型等基础知识;掌握实体完整性的定义和维护方法、掌握参照完整性
【MySQL】— —熟练掌握用SQL语句实现数据库和基本表的创建。熟练掌握MySQL的安装、客户端登录方法;熟练掌握MySQL的编码、数据类型等基础知识;掌握实体完整性的定义和维护方法、掌握参照完整性
101 1
|
5月前
|
Linux Android开发 iOS开发
基于.Net开发的ChatGPT客户端,兼容Windows、IOS、安卓、MacOS、Linux
基于.Net开发的ChatGPT客户端,兼容Windows、IOS、安卓、MacOS、Linux
88 0
|
1月前
|
关系型数据库 MySQL 数据库
rds安装数据库客户端工具
安装阿里云RDS的数据库客户端涉及在本地安装对应类型(如MySQL、PostgreSQL)的客户端工具。对于MySQL,可选择MySQL Command-Line Client或图形化工具如Navicat,安装后输入RDS实例的连接参数进行连接。对于PostgreSQL,可以使用`psql`命令行工具或图形化客户端如PgAdmin。首先从阿里云控制台获取连接信息,然后按照官方文档安装客户端,最后配置客户端连接以确保遵循安全指引。
86 1
|
3月前
|
存储 关系型数据库 MySQL
利用Xtrabackup进行mysql增量备份和全量备份
利用Xtrabackup进行mysql增量备份和全量备份
193 0
|
3月前
|
SQL 关系型数据库 MySQL
免费MySQL数据库客户端推荐
免费MySQL数据库客户端推荐
63 0
|
3月前
|
SQL 关系型数据库 MySQL
MySQL数据库免费客户端简介
MySQL数据库免费客户端简介
38 0