0
0
0
1. 云栖社区>
2. 博客>
3. 正文

2、估算需要同步的时间
3、整个过程中的主要瓶颈是什么？
4、通过系统参数调优，优化整个数据同步过程。

# -*- coding=UTF-8 -*-
'''
Created on Nov 11, 2018

'''

import pandas as pd
import numpy as np
import datetime

print('===Step:' + comments + '|' + datetime.datetime.now().strftime(
'%Y-%m-%d %H:%M:%S') + '===========================================')

def difference(left, right, on):
"""
difference of two dataframes
:param left: left dataframe
:param right: right dataframe
:param on: join key
:return: difference dataframe
"""
df = pd.merge(left, right, how='left', on=on)
left_columns = left.columns
col_y = df.columns[left_columns.size]
df = df[df[col_y].isnull()]
df = df.ix[:, 0:left_columns.size]
df.columns = left_columns
return df

pt('start')
#50-->25M
#60-->25M
#70-->25M
arrlength = 10000000
# lambda x: 0.0 if pd.isnull(x) else x      自己调整测试的数据量大小
rng = pd.date_range('1980-11-22', periods=arrlength, freq='S')
yehaibo = pd.date_range(start='1980-1-1', periods=arrlength, freq='D')
dfpcs = pd.DataFrame({'personid': np.random.choice(['normal', 'pending', 'die'], arrlength),
'pcscode': np.random.randint(43000000, 43999900, arrlength),
'age': np.random.randint(1, 100, size=arrlength),
'createtime': rng,
'partition': np.random.randint(1, 18, arrlength)
})

# xjcode
pt('lambda')
lambdaxj = lambda x: str(x)[0:6] + '00'  # 4:length
lambdasj = lambda x: str(x)[0:4] + '0000'  # 4:length
dfxjcode = pd.DataFrame({'fxjcode': dfpcs['pcscode'].map(lambdaxj)})
dfsjcode = pd.DataFrame({'sjcode': dfpcs['pcscode'].map(lambdasj)})
dfconcat = pd.concat([dfpcs, dfxjcode, dfsjcode], axis=1)
# print dfconcat
# pt('shape')
# print dfconcat.shape
# pt('')
# print dfconcat.ix[1:3, ['pcscode', 'age', 'sjcode']]
# pt('')
# print dfconcat[dfconcat['personid'] == 'pending']
# pt('not')
# print dfconcat[~(dfconcat['personid'] == 'pending')]
# pt('not in (-~)')
# print dfconcat[-(dfconcat['personid'].isin(['pending'])) & (dfconcat['age'] > 20)]
# pt('pl/sql  like')
# # print dfconcat[dfconcat['personid'].str.contains("^end\d+$")] # print dfconcat[dfconcat['personid'].str.contains("end")] # pt('pl/sql substr') # lambdasubstr = lambda x: str(x)[0:4] + '00' # print dfconcat[dfconcat['personid'].map(lambdasubstr) == '430500'] # pt('') # # /home/hadoop/pandas/eclipse-workspace/TestPandas/yehaibo # dfgroupby = dfconcat.groupby(['personid', 'fxjcode']).count() # print dfgroupby # pt('pl/sql ') # dfindex = dfconcat.set_index('createtime') # print dfindex # # print dfconcat[dfconcat.truncate(before='2017-11-22 00:01:38')] # s = pd.Series(np.random.randn(10).cumsum(), index=np.arange(0, 100, 10)) # s.plot() # df = pd.DataFrame(np.random.randn(10, 4).cumsum(0), columns=['A', 'B', 'C', 'D'], index=np.arange(0, 100, 10)) # df.plot() # # # print df2.iloc[:,0:1] # # print '439932'.map(lambdaxj) # # pt('1') # # print dfpcs # # pt('2') print(dfconcat) # 将来datax就是直接读取这个文件，导入hbase dfconcat.to_csv('/home/hadoop/PycharmProjects/GenerateCSVFilesForPressureTest/pcs' + str(arrlength) + '.csv', index=True, header=False) # dfgroupby.to_csv('groupby.csv', index=True, header=True) pt('end')  开始跑之前，队当前的csv文件大小，以及gzip压缩后的大小进行记录，后面好估算hadoop磁盘文件的大小 [hadoop@master GenerateCSVFilesForPressureTest]$ tar czf pcs.tar.gz pcs10000000.csv
[hadoop@master GenerateCSVFilesForPressureTest]$ls -alh total 805M drwxrwxr-x. 3 hadoop hadoop 147 Sep 25 14:20 . drwxrwxr-x. 19 hadoop hadoop 4.0K Sep 24 15:10 .. lrwxrwxrwx. 1 hadoop hadoop 29 Sep 24 15:21 datax -> /home/hadoop/Datax3/datax/bin -rw-rw-r--. 1 hadoop hadoop 3.2K Sep 24 15:18 generatedata.py -rw-rw-r--. 1 hadoop hadoop 3.5K Sep 25 10:51 g.py drwxrwxr-x. 2 hadoop hadoop 105 Sep 25 11:13 .idea -rw-rw-r--. 1 hadoop hadoop 636M Sep 25 10:53 pcs10000000.csv 4.5倍（gzip压缩） 13.5倍（未压缩） -rw-rw-r--. 1 hadoop hadoop 6.2M Sep 25 10:30 pcs100000.csv -rw-rw-r--. 1 hadoop hadoop 6.1K Sep 24 15:18 pcs100.csv -rw-rw-r--. 1 hadoop hadoop 163M Sep 25 14:21 pcs.tar.gz 17倍（gzip压缩） 49.3倍（未压缩）  进行转化的datax脚本 [hadoop@master datax]$ cat t2h10000000.json
{
"job": {
"setting": {
"speed": {
"channel": 5
}
},
"content": [
{
"parameter": {
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "String"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.rootdir": "hdfs://master:9000/hbase_db",
"hbase.cluster.distributed": "true",
"hbase.zookeeper.quorum": "master,Slave1,Slave2"
},
"table": "rkxx",
"mode": "normal",
"rowkeyColumn": [
{
"index": 0,
"type":"string"
}
],
"column": [
{
"index": 1,
"name": "rkxx:age",
"type": "string"
},
{
"index": 2,
"name": "rkxx:createday",
"type": "string"
},
{
"index":3,
"name": "rkxx:pcscode",
"type": "string"
},
{
"index":4,
"name": "rkxx:status",
"type": "string"
},
{
"index":5,
"name": "rkxx:fxjcode",
"type": "string"
},
{
"index":6,
"name": "rkxx:sjcode",
"type": "string"
}
],
"versionColumn":{
"index": -1,
"value":"123456789"
},
"encoding": "utf-8"
}
}
}
]
}
}
[hadoop@master datax]$ [hadoop@master bin]$ whereis python
[hadoop@master bin]$/usr/bin/python2.7 datax.py ./t2h100000.json 2019-09-25 11:11:13.776 [job-0] INFO JobContainer - 任务启动时刻 : 2019-09-25 10:57:13 任务结束时刻 : 2019-09-25 11:11:13 任务总计耗时 : 840s 任务平均流量 : 588.29KB/s 记录写入速度 : 11904rec/s 1秒1万条 读出记录总数 : 10000000 读写失败总数 : 0 CPU 磁盘是瓶颈 [hadoop@master aftp]$ hadoop fs -du -h /
8.1 G /hbase_db
54 /hdfs
0 /rkxx
4.6 M /spark
368.3 M /tmp
968.5 M /user
2.3 K /yehaibo

2.8 G /hbase_db
54 /hdfs
0 /rkxx
4.6 M /spark
368.6 M /tmp
968.5 M /user
2.3 K /yehaibo

CREATE EXTERNAL TABLE hisrkxx(key String, age String,createday String,fxjcode String,pcscode String,sjcode String,status String)
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,rkxx:age,rkxx:createday,rkxx:fxjcode,rkxx:pcscode,rkxx:sjcode,rkxx:status","hbase.table.default.storage.type"="binary")
TBLPROPERTIES("hbase.table.name" = "rkxx");

hive> select count(key) from hisrkxx;
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1569374479979_0006, Tracking URL = http://Master:8088/proxy/application_1569374479979_0006/
Hadoop job information for Stage-1: number of mappers: 3; number of reducers: 1
2019-09-25 11:16:27,343 Stage-1 map = 0%,  reduce = 0%
2019-09-25 11:17:11,870 Stage-1 map = 33%,  reduce = 0%, Cumulative CPU 41.85 sec
2019-09-25 11:17:21,138 Stage-1 map = 67%,  reduce = 0%, Cumulative CPU 45.19 sec
2019-09-25 11:17:33,837 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 52.0 sec
2019-09-25 11:17:41,332 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 54.59 sec
MapReduce Total cumulative CPU time: 54 seconds 590 msec
Ended Job = job_1569374479979_0006
MapReduce Jobs Launched:
Stage-Stage-1: Map: 3  Reduce: 1   Cumulative CPU: 54.59 sec   HDFS Read: 18489 HDFS Write: 108 SUCCESS
Total MapReduce CPU Time Spent: 54 seconds 590 msec
OK
10000000                                                                  **1千万数据**
Time taken: 100.433 seconds, Fetched: 1 row(s)
hive>

1千万数据 csv文件大小：636M gzip压缩后：163M