分享改进 高性能数据同步工具(一)

本文涉及的产品
云数据库 RDS SQL Server,独享型 2核4GB
简介:

题外:在博文索引中暂时列出了开源的计划一览,虫子开源的目的是希望能有更多的交流,部分软件可能小得连开源协议的认证价值都没有。不管程序有多小多简单,用心把一个完整的设计思路、实现过程以及测试结果展现给大家。欢迎大牛拍砖,小牛问路。

虫子的博文索引 http://www.cnblogs.com/dubing/archive/2011/11/03/2234599.html

软件背景

拿本次高性能数据同步工具来说,目前还处于开发阶段,大概是1/4的样子。为了避免模糊,就先把这1/4分享给大家。

数据作为系统的核心价值,因为其流动性所以经常会有载体的变更。如何高性能、安全的将数据搬移是一个大家经常接触也一直在用的课题。如果只是sql to sql可能作为程序员而言,DBA更适合这个内容,例如dts导入等。但是更多的实际场景下,可能会有文件、服务、甚至其他类型的数据流来源。所以作为码农,我们不妨多了解一下这方面的内容。


设计思路

暂时开源程序中只做了sql to sql的一部分。直接就以这个开始来讲吧。

首先是入参和返参的设计

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/// <summary>
     /// 入参接口
     /// </summary>
     public  interface  IAOPParam
     {
          /// <summary>
          /// 目标地址
          /// </summary>
          string  T_ConnectionString {  get ;  }
 
          /// <summary>
          /// 请求行数
          /// </summary>
          long  MaxSize {  get ; }
 
          /// <summary>
          /// 表名
          /// </summary>
          string  TableName {  get ; }
 
          /// <summary>
          /// 当前行数
          /// </summary>
          long  CurrentSize {  get ; }
 
          /// <summary>
          /// 域名
          /// </summary>
          string  p_Domain {  get ;  }
 
          /// <summary>
          /// 断点文件地址
          /// </summary>
          string  p_InitPath {  get ;  }
 
          /// <summary>
          /// 断点时间
          /// </summary>
          DateTime p_Previous {  get ;  }
 
          /// <summary>
          /// 是否结束
          /// </summary>
          bool  p_IsEnd {  get ;  }
 
          /// <summary>
          /// 排序方式
          /// </summary>
          string  SortName {  get set ; }
 
          /// <summary>
          /// 单次请求大小
          /// </summary>
          long  SingleSize {  get ; }
 
          /// <summary>
          /// 排序主键
          /// </summary>
          string  Sortkey {  get ;  }
          
          /// <summary>
          /// 是否支持事务
          /// </summary>
          bool  IsTransaction {  get ;  }
 
          /// <summary>
          /// true为支持断点 发生断点或异常后程序终止   false为不支持断点 遇到断点或异常继续填充直到此次请求完成
          /// </summary>
          bool  IsBreakPoints {  get ;  }
 
          /// <summary>
          /// guid
          /// </summary>
          string  T_Guid {  get ; }
     }
 
     /// <summary>
     /// 对象处理返回的入参接口(泛型)
     /// </summary>
     public  interface  IAOPParam<T> : IAOPParam
     {
         /// <summary>
         /// 泛型附加对象
         /// </summary>
         T ParamAttachObjectEx {  get ; }
     }

 这样设计的目的是考虑到服务器的内存与资源占用问题,如果数据来源的体积过大,我们将会对请求的来源进行分块处理。另外通过排序字段或者自定义的sql语句或者存储过程(暂未补充)可以对数据源进行高级过滤,断点续传的设计目前比较简单,web程序的话植入cookie、控制台或者cs程序通过文本媒介json格式来控制。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#region IAOPResult
 
    /// <summary>
    /// 对象处理返回的结果接口
    /// </summary>
    /// <remarks>
    /// 建议在代码调用返回值中都采用此类实例为返回值<br />
    /// 一般ResultNo小于0表示异常,0表示成功,大于0表示其它一般提示信息
    /// </remarks>
    public  interface  IAOPResult
    {
        /// <summary>
        /// 返回代码
        /// </summary>
        int  ResultNo {  get ; }
 
        /// <summary>
        /// 对应的描述信息
        /// </summary>
        string  ResultDescription {  get ; }
 
        /// <summary>
        /// 相应的附加信息
        /// </summary>
        object  ResultAttachObject {  get ; }
 
        /// <summary>
        /// 内部AOPResult
        /// </summary>
        IAOPResult InnerAOPResult {  get ; }
 
        /// <summary>
        /// 处理结果是否成功(ResultNo == 0)
        /// </summary>
        bool  IsSuccess {  get ; }
 
        /// <summary>
        /// 处理结果是否失败(ResultNo != 0 )
        /// </summary>
        bool  IsNotSuccess {  get ; }
 
        /// <summary>
        /// 处理结果是否失败(ResultNo < 0 )
        /// </summary>
        bool  IsFailed {  get ; }
 
        /// <summary>
        /// 已处理,但有不致命的错误(ResultNo > 0)
        /// </summary>
        bool  IsPassedButFailed {  get ; }
 
        /// <summary>
        /// 如果处理失败,则抛出异常
        /// </summary>
        /// <returns>返回本身</returns>
        IAOPResult ThrowErrorOnFailed();
    }
 
    #endregion IAOPResult
 
    #region IAOPResult<T>
 
    /// <summary>
    /// 对象处理返回的结果接口(泛型)
    /// </summary>
    public  interface  IAOPResult<T> : IAOPResult
    {
        /// <summary>
        /// 泛型附加对象
        /// </summary>
        T ResultAttachObjectEx {  get ; }
    }
 
    #endregion

 返参的设计比较通用化,大家可以自己摸索下。自己也可以补充添加。

 异常基类。

 日志采取lognet 不赘述

 单例通用类 关于作用可以参考虫子设计模式随笔中的相关博文

 状态类,通过这个类可以反映出当前数据同步的进度。

边缘化的准备工作大体如此,下面是主要的实现过程。过程中有几个注意点,同步读写还是异步读写、是否存在线程安全甚至进程的资源安全(例如我在读写前5000条的时候突然在另外一个客户端CRUD了N条数据),另外,我们读写的时候是用连接的方式还是使用非连接的方式,如何解决服务器端内存占用问题,如何实现excel、txt、sql、oracle等不同数据来源的多态性。


实现过程 

这里就先介绍下已经解决的一些问题

 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public  class  AnalyseDataManager
{
     public  Status MStatus {  get set ; }
     public  static  int  SingleSize = 5000;
     public  static  int  StatusSize = 1000;
     readonly  Sqlhelper _sh =  new  Sqlhelper();
 
     public  AnalyseDataManager()
     {
     }
 
     public  AnalyseDataManager(Status st)
         this ()
     {
         MStatus = st;
     }
 
     public  IAsyncResult OutMethod(AopParam app)
     {
         MStatus.MTotalSize = app.MaxSize;
         var  func =  new  Func<AopParam,  bool >(ServerMethod);
         return  func.BeginInvoke(app, CallbackMethod, func);
     }
 
     /// <summary>
     /// 复制数据
     /// </summary>
     /// <returns>是否成功</returns>
     public  bool  ServerMethod(AopParam app)
     {
         try
         {
             _sh.App = app;
             if  (_sh.OpenConn().IsSuccess)
             {
                 while  (app.MaxSize > MStatus.MCurrentSize)
                 {
                     app.CurrentSize = MStatus.MCurrentSize;
                     if  (!AsyncDataToServer(app) && app.IsBreakPoints)
                     {
                         break ;
                     }
                 }
             }
         }
         catch  (Exception ex)
         {
             Log4N.WarnLog( "ServerMethod出错" , ex);
             if  (app.IsBreakPoints)
             {
                 return  false ;
             }
         }
         finally
         {
             _sh.Dispose();
         }
         return  true ;
     }
 
     private  bool  AsyncDataToServer(AopParam app)
     {
         Log4N.InfoLog( string .Format( "数据同步开始\r\n来源数据{0}\r\n表的名字{1}\r\n一次性提交的行数{2}\r\n当前行数{3}" , app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));
 
         using  ( var  bcp =  new  SqlBulkCopy(_sh.TconnSql))
         {
             MStatus.Statusflag = Status.CopyStatus.Doing;
             bcp.BatchSize = SingleSize;
             bcp.DestinationTableName = app.TableName;
 
             bcp.SqlRowsCopied +=
               OnSqlRowsCopied;
             bcp.NotifyAfter = StatusSize;
 
             try
             {
                 bcp.WriteToServer(_sh.GetDtResultImp());
             }
             catch  (Exception ex)
             {
                 Log4N.WarnLog( "AsyncDataToServer出错" , ex);
                 return  false ;
             }
             finally
             {
                 _sh.IreaderSql.Close();
             }
 
             return  true ;
 
         }
     }
     private  void  OnSqlRowsCopied( object  sender, SqlRowsCopiedEventArgs e)
     {
         Thread.Sleep(1000);
         MStatus.MCurrentSize += StatusSize;
     }
 
     public  void  CallbackMethod(IAsyncResult ar)
     {
         var  caller = (Func<AopParam, bool  >)ar.AsyncState;
         if  (caller.EndInvoke(ar))
         {
             MStatus.Statusflag = Status.CopyStatus.Finished;
         }
     }
 
}

Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。其中 SqlRowsCopied 在每次处理完 NotifyAfter 属性指定的行数时发生。

ServerMethod为主方法提供单次客户端请求的逻辑。

OutMethod对外开放以bpm异步编程模型形式进行处理、sqlhelper之所以不设计成单列,为了保证可以多个客户端请求状态不干扰。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public  class  Sqlhelper : IDisposable
    {
        private  readonly  string  _sqlconn = ConfigurationSettings.AppSettings[ "BaseConn" ];
        public  bool  IblnTransBegin {  get set ; }
        public  SqlTransaction ItransSql {  get set ; }
        public  SqlConnection IconnSql {  get set ; }
        public  SqlConnection TconnSql {  get set ; }
        public  SqlDataReader IreaderSql {  get set ; }
        public  IAOPParam App {  get set ; }
 
        public  DataTable GetDtResult( string  sqlcommand)
        {
            var  ds =  new  DataSet();
            var  da =  new  SqlDataAdapter(sqlcommand,  new  SqlConnection(_sqlconn));
            da.Fill(ds);
            if  (ds.Tables[0] !=  null )
            {
                return  ds.Tables[0];
            }
            return  null ;
        }
 
        public  DataTable GetDtResult()
        {
            //string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3}  ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());
            string  sqlstr = GetCommandByApp();
            var  ds =  new  DataSet();
            var  da =  new  SqlDataAdapter(sqlstr,  new  SqlConnection(_sqlconn));
            da.Fill(ds);
            if  (ds.Tables[0] !=  null )
            {
                return  ds.Tables[0];
            }
            return  null ;
        }
 
        public  SqlDataReader GetDtResultImp()
        {
            var  sqlstr = GetCommandByApp();
            var  command =  new  SqlCommand(
               sqlstr, IconnSql);
            IreaderSql =
              command.ExecuteReader();
            return  IreaderSql;
        }
 
        public  IAOPResult OpenConn()
        {
            var  ar =  new  AOPResult(0);
            IconnSql =  new  SqlConnection(_sqlconn);
            TconnSql =  new  SqlConnection(App.T_ConnectionString);
            try
            {
                IconnSql.Open();
                TconnSql.Open();
            }
            catch  (SqlException ex)
            {
                ar.ResultNo = 1;
                Log4N.InfoLog( string .Format( "OpenConn失败,详细消息为{0},源表" , ex.Message), App);
            }
            return  ar;
        }
 
        public  IAOPResult CloseConn()
        {
 
            var  ar =  new  AOPResult(0);
            try
            {
                IconnSql.Close();
                TconnSql.Close();
            }
            catch  (SqlException ex)
            {
                ar.ResultNo = 1;
                Log4N.InfoLog( string .Format( "CloseConn失败,详细消息为{0},源表" , ex.Message), App);
            }
            return  ar;
        }
 
        public  IAOPResult BeginTran()
        {
            ItransSql = IconnSql.BeginTransaction();
            return  null ;
        }
 
        public  void  Dispose()
        {
            CloseConn();
        }
 
        public  string  GetCommandByApp()
        {
            string  sqlstr =  string .Empty;
            if (App.CurrentSize == 0)
            {
                switch  (App.SortName.ToLower())
                {
                    case  "asc" :
                        sqlstr =  string .Format( "Select Top {0} * From {1}  order by {2} asc" , App.SingleSize.ToString(), App.TableName, App.Sortkey);
                        break ;
                    case  "desc" :
                        sqlstr =  string .Format( "Select Top {0} * From {1}  order by {2} desc" , App.SingleSize.ToString(), App.TableName, App.Sortkey);
                        break ;
                }
            }
            else
            {
                switch  (App.SortName.ToLower())
                {
                    case  "asc" :
                        sqlstr =  string .Format( "Select Top {0} * From {1} Where {2} >(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc" , App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                        break ;
                    case  "desc" :
                        sqlstr =  string .Format( "Select Top {0} * From {1} Where {2} <(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc" , App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                        break ;
                }     
 
            }
            return  sqlstr;
 
        }

 数据库访问层中首先是一个类似分页sql的设计,来优化单次请求的效率。bcp的来源可以选择连接式的SqlDataReader 或者非连接式的Dataset,2者各有优缺。前者需要打开SqlConnection,但是是逐条读取,后者非连接但是占用内存大。至于具体的性能比,虫子在下一章节再和大家讨论。至于源程序目前还是草稿版,很多功能还未实现,细节处理也不够细腻,因为异步目前只设置了一个线程,还未涉及到并行框架,性能方面还有相当大的提高空间。先放出来让大家讨论,细节方面可以暂时先略过,大家可以说说在设计方面如何才能更高效、稳定。

源码地址:点击此处下载



本文转自 熬夜的虫子  51CTO博客,原文链接:http://blog.51cto.com/dubing/712455


相关实践学习
使用SQL语句管理索引
本次实验主要介绍如何在RDS-SQLServer数据库中,使用SQL语句管理索引。
SQL Server on Linux入门教程
SQL Server数据库一直只提供Windows下的版本。2016年微软宣布推出可运行在Linux系统下的SQL Server数据库,该版本目前还是早期预览版本。本课程主要介绍SQLServer On Linux的基本知识。 相关的阿里云产品:云数据库RDS&nbsp;SQL Server版 RDS SQL Server不仅拥有高可用架构和任意时间点的数据恢复功能,强力支撑各种企业应用,同时也包含了微软的License费用,减少额外支出。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/sqlserver
相关文章
|
4月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
420 0
|
4月前
|
消息中间件 SQL 分布式计算
一篇文章搞定数据同步工具SeaTunnel
一篇文章搞定数据同步工具SeaTunnel
515 0
|
4月前
|
存储 关系型数据库 MySQL
DataX: 阿里开源的又一款高效数据同步工具
DataX 是由阿里巴巴集团开源的一款大数据同步工具,旨在解决不同数据存储之间的数据迁移、同步和实时交换的问题。它支持多种数据源和数据存储系统,包括关系型数据库、NoSQL 数据库、Hadoop 等。 DataX 提供了丰富的数据读写插件,可以轻松地将数据从一个数据源抽取出来,并将其加载到另一个数据存储中。它还提供了灵活的配置选项和高度可扩展的架构,以适应各种复杂的数据同步需求。
|
存储 文件存储 对象存储
S3存储服务间数据同步工具Rclone迁移教程
目前大多项目我们都会使用各种存储服务,例如oss、cos、minio等。当然,因各种原因,可能需要在不同存储服务间进行数据迁移工作,所以今天就给大家介绍一个比较通用的数据迁移工具Rclone。
S3存储服务间数据同步工具Rclone迁移教程
|
7月前
|
canal SQL 关系型数据库
大数据同步工具Canal 2
大数据同步工具Canal
195 0
|
7月前
|
canal 消息中间件 关系型数据库
大数据同步工具Canal 1
大数据同步工具Canal
362 0
|
9月前
|
算法 Linux
Linux系统【文件传输】rsync命令 – 远程数据同步工具
rsync命令来自于英文词组“remote sync”的缩写,其功能是用于远程数据同步。rsync命令能够基于网络(含局域网和互联网)快速的实现多台主机间的文件同步工作,并与scp或ftp发送完整文件不同,rsync有独立的文件内容差异算法,会在传送前对两个文件进行比较,只传送两者内容间的差异部分,因此速度更快。
149 2
|
12月前
|
存储 SQL JSON
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!下
|
12月前
|
存储 JavaScript 小程序
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!上
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!上
|
存储 SQL JSON
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!(2)
阿里又开源一款数据同步工具 DataX,稳定又高效,好用到爆!
323 0

热门文章

最新文章