分享改进 高性能数据同步工具(一)

  1. 云栖社区>
  2. 博客>
  3. 正文

分享改进 高性能数据同步工具(一)

科技小能手 2017-11-12 16:27:00 浏览777
展开阅读全文

题外:在博文索引中暂时列出了开源的计划一览,虫子开源的目的是希望能有更多的交流,部分软件可能小得连开源协议的认证价值都没有。不管程序有多小多简单,用心把一个完整的设计思路、实现过程以及测试结果展现给大家。欢迎大牛拍砖,小牛问路。

虫子的博文索引http://www.cnblogs.com/dubing/archive/2011/11/03/2234599.html

软件背景

拿本次高性能数据同步工具来说,目前还处于开发阶段,大概是1/4的样子。为了避免模糊,就先把这1/4分享给大家。

数据作为系统的核心价值,因为其流动性所以经常会有载体的变更。如何高性能、安全的将数据搬移是一个大家经常接触也一直在用的课题。如果只是sql to sql可能作为程序员而言,DBA更适合这个内容,例如dts导入等。但是更多的实际场景下,可能会有文件、服务、甚至其他类型的数据流来源。所以作为码农,我们不妨多了解一下这方面的内容。


设计思路

暂时开源程序中只做了sql to sql的一部分。直接就以这个开始来讲吧。

首先是入参和返参的设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/// <summary>
    /// 入参接口
    /// </summary>
    public interface IAOPParam
    {
         /// <summary>
         /// 目标地址
         /// </summary>
         string T_ConnectionString { get;  }
 
         /// <summary>
         /// 请求行数
         /// </summary>
         long MaxSize { get; }
 
         /// <summary>
         /// 表名
         /// </summary>
         string TableName { get; }
 
         /// <summary>
         /// 当前行数
         /// </summary>
         long CurrentSize { get; }
 
         /// <summary>
         /// 域名
         /// </summary>
         string p_Domain { get;  }
 
         /// <summary>
         /// 断点文件地址
         /// </summary>
         string p_InitPath { get;  }
 
         /// <summary>
         /// 断点时间
         /// </summary>
         DateTime p_Previous { get;  }
 
         /// <summary>
         /// 是否结束
         /// </summary>
         bool p_IsEnd { get;  }
 
         /// <summary>
         /// 排序方式
         /// </summary>
         string SortName { getset; }
 
         /// <summary>
         /// 单次请求大小
         /// </summary>
         long SingleSize { get; }
 
         /// <summary>
         /// 排序主键
         /// </summary>
         string Sortkey { get;  }
          
         /// <summary>
         /// 是否支持事务
         /// </summary>
         bool IsTransaction { get;  }
 
         /// <summary>
         /// true为支持断点 发生断点或异常后程序终止   false为不支持断点 遇到断点或异常继续填充直到此次请求完成
         /// </summary>
         bool IsBreakPoints { get;  }
 
         /// <summary>
         /// guid
         /// </summary>
         string T_Guid { get; }
    }
 
    /// <summary>
    /// 对象处理返回的入参接口(泛型)
    /// </summary>
    public interface IAOPParam<T> : IAOPParam
    {
        /// <summary>
        /// 泛型附加对象
        /// </summary>
        T ParamAttachObjectEx { get; }
    }

 这样设计的目的是考虑到服务器的内存与资源占用问题,如果数据来源的体积过大,我们将会对请求的来源进行分块处理。另外通过排序字段或者自定义的sql语句或者存储过程(暂未补充)可以对数据源进行高级过滤,断点续传的设计目前比较简单,web程序的话植入cookie、控制台或者cs程序通过文本媒介json格式来控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#region IAOPResult
 
   /// <summary>
   /// 对象处理返回的结果接口
   /// </summary>
   /// <remarks>
   /// 建议在代码调用返回值中都采用此类实例为返回值<br />
   /// 一般ResultNo小于0表示异常,0表示成功,大于0表示其它一般提示信息
   /// </remarks>
   public interface IAOPResult
   {
       /// <summary>
       /// 返回代码
       /// </summary>
       int ResultNo { get; }
 
       /// <summary>
       /// 对应的描述信息
       /// </summary>
       string ResultDescription { get; }
 
       /// <summary>
       /// 相应的附加信息
       /// </summary>
       object ResultAttachObject { get; }
 
       /// <summary>
       /// 内部AOPResult
       /// </summary>
       IAOPResult InnerAOPResult { get; }
 
       /// <summary>
       /// 处理结果是否成功(ResultNo == 0)
       /// </summary>
       bool IsSuccess { get; }
 
       /// <summary>
       /// 处理结果是否失败(ResultNo != 0 )
       /// </summary>
       bool IsNotSuccess { get; }
 
       /// <summary>
       /// 处理结果是否失败(ResultNo < 0 )
       /// </summary>
       bool IsFailed { get; }
 
       /// <summary>
       /// 已处理,但有不致命的错误(ResultNo > 0)
       /// </summary>
       bool IsPassedButFailed { get; }
 
       /// <summary>
       /// 如果处理失败,则抛出异常
       /// </summary>
       /// <returns>返回本身</returns>
       IAOPResult ThrowErrorOnFailed();
   }
 
   #endregion IAOPResult
 
   #region IAOPResult<T>
 
   /// <summary>
   /// 对象处理返回的结果接口(泛型)
   /// </summary>
   public interface IAOPResult<T> : IAOPResult
   {
       /// <summary>
       /// 泛型附加对象
       /// </summary>
       T ResultAttachObjectEx { get; }
   }
 
   #endregion

 返参的设计比较通用化,大家可以自己摸索下。自己也可以补充添加。

 异常基类。

 日志采取lognet 不赘述

 单例通用类 关于作用可以参考虫子设计模式随笔中的相关博文

 状态类,通过这个类可以反映出当前数据同步的进度。

边缘化的准备工作大体如此,下面是主要的实现过程。过程中有几个注意点,同步读写还是异步读写、是否存在线程安全甚至进程的资源安全(例如我在读写前5000条的时候突然在另外一个客户端CRUD了N条数据),另外,我们读写的时候是用连接的方式还是使用非连接的方式,如何解决服务器端内存占用问题,如何实现excel、txt、sql、oracle等不同数据来源的多态性。


实现过程 

这里就先介绍下已经解决的一些问题

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class AnalyseDataManager
{
    public Status MStatus { getset; }
    public static int SingleSize = 5000;
    public static int StatusSize = 1000;
    readonly Sqlhelper _sh = new Sqlhelper();
 
    public AnalyseDataManager()
    {
    }
 
    public AnalyseDataManager(Status st)
        this()
    {
        MStatus = st;
    }
 
    public IAsyncResult OutMethod(AopParam app)
    {
        MStatus.MTotalSize = app.MaxSize;
        var func = new Func<AopParam, bool>(ServerMethod);
        return func.BeginInvoke(app, CallbackMethod, func);
    }
 
    /// <summary>
    /// 复制数据
    /// </summary>
    /// <returns>是否成功</returns>
    public bool ServerMethod(AopParam app)
    {
        try
        {
            _sh.App = app;
            if (_sh.OpenConn().IsSuccess)
            {
                while (app.MaxSize > MStatus.MCurrentSize)
                {
                    app.CurrentSize = MStatus.MCurrentSize;
                    if (!AsyncDataToServer(app) && app.IsBreakPoints)
                    {
                        break;
                    }
                }
            }
        }
        catch (Exception ex)
        {
            Log4N.WarnLog("ServerMethod出错", ex);
            if (app.IsBreakPoints)
            {
                return false;
            }
        }
        finally
        {
            _sh.Dispose();
        }
        return true;
    }
 
    private bool AsyncDataToServer(AopParam app)
    {
        Log4N.InfoLog(string.Format("数据同步开始\r\n来源数据{0}\r\n表的名字{1}\r\n一次性提交的行数{2}\r\n当前行数{3}", app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));
 
        using (var bcp = new SqlBulkCopy(_sh.TconnSql))
        {
            MStatus.Statusflag = Status.CopyStatus.Doing;
            bcp.BatchSize = SingleSize;
            bcp.DestinationTableName = app.TableName;
 
            bcp.SqlRowsCopied +=
              OnSqlRowsCopied;
            bcp.NotifyAfter = StatusSize;
 
            try
            {
                bcp.WriteToServer(_sh.GetDtResultImp());
            }
            catch (Exception ex)
            {
                Log4N.WarnLog("AsyncDataToServer出错", ex);
                return false;
            }
            finally
            {
                _sh.IreaderSql.Close();
            }
 
            return true;
 
        }
    }
    private void OnSqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
    {
        Thread.Sleep(1000);
        MStatus.MCurrentSize += StatusSize;
    }
 
    public void CallbackMethod(IAsyncResult ar)
    {
        var caller = (Func<AopParam,bool >)ar.AsyncState;
        if (caller.EndInvoke(ar))
        {
            MStatus.Statusflag = Status.CopyStatus.Finished;
        }
    }
 
}

Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。其中 SqlRowsCopied 在每次处理完 NotifyAfter 属性指定的行数时发生。

ServerMethod为主方法提供单次客户端请求的逻辑。

OutMethod对外开放以bpm异步编程模型形式进行处理、sqlhelper之所以不设计成单列,为了保证可以多个客户端请求状态不干扰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public class Sqlhelper : IDisposable
   {
       private readonly string _sqlconn = ConfigurationSettings.AppSettings["BaseConn"];
       public bool IblnTransBegin { getset; }
       public SqlTransaction ItransSql { getset; }
       public SqlConnection IconnSql { getset; }
       public SqlConnection TconnSql { getset; }
       public SqlDataReader IreaderSql { getset; }
       public IAOPParam App { getset; }
 
       public DataTable GetDtResult(string sqlcommand)
       {
           var ds = new DataSet();
           var da = new SqlDataAdapter(sqlcommand, new SqlConnection(_sqlconn));
           da.Fill(ds);
           if (ds.Tables[0] != null)
           {
               return ds.Tables[0];
           }
           return null;
       }
 
       public DataTable GetDtResult()
       {
           //string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3}  ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());
           string sqlstr = GetCommandByApp();
           var ds = new DataSet();
           var da = new SqlDataAdapter(sqlstr, new SqlConnection(_sqlconn));
           da.Fill(ds);
           if (ds.Tables[0] != null)
           {
               return ds.Tables[0];
           }
           return null;
       }
 
       public SqlDataReader GetDtResultImp()
       {
           var sqlstr = GetCommandByApp();
           var command = new SqlCommand(
              sqlstr, IconnSql);
           IreaderSql =
             command.ExecuteReader();
           return IreaderSql;
       }
 
       public IAOPResult OpenConn()
       {
           var ar = new AOPResult(0);
           IconnSql = new SqlConnection(_sqlconn);
           TconnSql = new SqlConnection(App.T_ConnectionString);
           try
           {
               IconnSql.Open();
               TconnSql.Open();
           }
           catch (SqlException ex)
           {
               ar.ResultNo = 1;
               Log4N.InfoLog(string.Format("OpenConn失败,详细消息为{0},源表", ex.Message), App);
           }
           return ar;
       }
 
       public IAOPResult CloseConn()
       {
 
           var ar = new AOPResult(0);
           try
           {
               IconnSql.Close();
               TconnSql.Close();
           }
           catch (SqlException ex)
           {
               ar.ResultNo = 1;
               Log4N.InfoLog(string.Format("CloseConn失败,详细消息为{0},源表", ex.Message), App);
           }
           return ar;
       }
 
       public IAOPResult BeginTran()
       {
           ItransSql = IconnSql.BeginTransaction();
           return null;
       }
 
       public void Dispose()
       {
           CloseConn();
       }
 
       public string GetCommandByApp()
       {
           string sqlstr = string.Empty;
           if(App.CurrentSize == 0)
           {
               switch (App.SortName.ToLower())
               {
                   case "asc":
                       sqlstr = string.Format("Select Top {0} * From {1}  order by {2} asc", App.SingleSize.ToString(), App.TableName, App.Sortkey);
                       break;
                   case "desc":
                       sqlstr = string.Format("Select Top {0} * From {1}  order by {2} desc", App.SingleSize.ToString(), App.TableName, App.Sortkey);
                       break;
               }
           }
           else
           {
               switch (App.SortName.ToLower())
               {
                   case "asc":
                       sqlstr = string.Format("Select Top {0} * From {1} Where {2} >(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc", App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                       break;
                   case "desc":
                       sqlstr = string.Format("Select Top {0} * From {1} Where {2} <(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc", App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                       break;
               }     
 
           }
           return sqlstr;
 
       }

 数据库访问层中首先是一个类似分页sql的设计,来优化单次请求的效率。bcp的来源可以选择连接式的SqlDataReader 或者非连接式的Dataset,2者各有优缺。前者需要打开SqlConnection,但是是逐条读取,后者非连接但是占用内存大。至于具体的性能比,虫子在下一章节再和大家讨论。至于源程序目前还是草稿版,很多功能还未实现,细节处理也不够细腻,因为异步目前只设置了一个线程,还未涉及到并行框架,性能方面还有相当大的提高空间。先放出来让大家讨论,细节方面可以暂时先略过,大家可以说说在设计方面如何才能更高效、稳定。

源码地址:点击此处下载



本文转自 熬夜的虫子  51CTO博客,原文链接:http://blog.51cto.com/dubing/712455


网友评论

登录后评论
0/500
评论
科技小能手
+ 关注