python大数据分析代码案例

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介:

#查询用户余额代码案例


import sys

import MySQLdb

import pandas as pd


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.14',

                'dbport' : 3306,

                'dbname' : 'HBAODB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusercoin(userid):

    i = int(userid) % 10

    reqsql = "select ID,COINREFER from CHARCOIN%u where ID=%u" % (int(i), int(userid))

    #print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    return ret[0]

    

def getall(userlist):

    userdata = pd.DataFrame(columns=('userid', 'coin'))

    index = 0

    for userid in userlist:

        coins = getusercoin(userid) #调用前面的函数

        #print coins[0],coins[1]/100.0

        if coins[0] is not None:

            userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        else:

            userdata.loc[index] = (str(userid), 0)

        index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(50)

   




#用户消费查询代码案例


import sys

import MySQLdb

import pandas as pd

import datetime

import time


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.12',

                'dbport' : 3306,

                'dbname' : 'JIESUANDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

#用户人民币消费

def getuserconsume(userid, startday): #定义带参函数

    strdate = startday.strftime("%y%m%d")

    # 送礼物 +  守护 +  点歌 +  表情贴

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DBIOPTYPE=2 AND (OPTYPE=1 OR OPTYPE=4 OR OPTYPE=17 OR OPTYPE=25)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql) #调用前面的函数

    #print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

        

#用户充值

def getusercharge(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,SUM(DUBIOPNUM) from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u AND DUBIOPTYPE=1 AND (OPTYPE=1016 OR OPTYPE=1020 OR OPTYPE=1021)" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)#调用前面的函数

    print ret

    if ret[0][0] is not None:

        return float(ret[0][1])/100.0

    else:

        return 0

    

#用户当天结余人民币

def getusercurcoin(userid, startday):

    strdate = startday.strftime("%y%m%d")

    reqsql = "select CONSUMERID,CURRENTNUM from `DUBIJIESUANTONGJI_%s` where CONSUMERID=%u ORDER BY OPTIME DESC LIMIT 1" % (strdate, int(userid))

    print reqsql

    ret = sql_select(reqsql)

    print ret

    if ret:

        return float(ret[0][1])/100.0

    else:

        return 0

        

def getconsume():

    startdate = datetime.date(2017, 1, 1)

    enddate = datetime.date(2017, 2, 2)

    userid = 3101011990

    

    userdata = pd.DataFrame(columns=('date','userid','charge', 'consume', 'dayleftcoin'))


    index = 0

    

    # 计算日差

    td = enddate - startdate

    datelen = td.days + 1

    #print datelen

    delta = datetime.timedelta(days=1)

    allcoins = 0 

    for i in range(0,datelen):

        startday = startdate + delta * i

        consume_coin = getuserconsume(userid, startday)#调用前面的函数

        charge = getusercharge(userid, startday)#调用前面的函数

        dayleftcoin = getusercurcoin(userid, startday)#调用前面的函数

        

        

        userdata.loc[index] = (startday.strftime("%Y-%m-%d"),str(userid), charge, consume_coin, dayleftcoin)

        index += 1

        

    #userdata.loc[index] = ('total',str(userid), allcoins, 0)

    print userdata.tail(100)

    return

    

getconsume()






#查询用户机器ID 代码案例


import sys

import MySQLdb

import pandas as pd

import datetime


optmap = {

                'dbuser' : 'aduser',

                'dbpass' : '123654',

                'dbhost' : '192.168.10.15',

                'dbport' : 3306,

                'dbname' : 'JIQIDB'

                 }


def sql_select(reqsql):

    try:

        db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname'])

        db_cursor=db_conn.cursor()

        db_conn.query("use %s"%optmap['dbname'])

        count = db_cursor.execute(reqsql)

        ret = db_cursor.fetchall()


        db_cursor.close()

        db_conn.close

        return ret

    except MySQLdb.Error,e:

        print "Mysql ERROR %d:%s"  %(e.args[0], e.args[1])

    return ''

    

def getusermid(userid, months):

    i = int(userid) % 50

    reqsql = "select USERID,MACHINEID from LOGINHISTORY%s%u where USERID=%u group by MACHINEID" % (months,int(i), int(userid))

    print reqsql

    ret = sql_select(reqsql)

    #print ret

    #print ret[0]

    return ret

    

def getall(userlist):

    today = datetime.date.today()

    months = today.strftime("%Y%m")

    userdata = pd.DataFrame(columns=('USERID', 'MACHINEID'))

    index = 0

    for userid in userlist:

        coins = getusermid(userid, months)

        for i in range(len(coins)):

            #print coins[i]

            userdata.loc[index] = (str(coins[i][0]), str(coins[i][1]))

            index += 1

        

        #print coins[0],coins[1]/100.0

        #userdata.loc[index] = (str(coins[0]), coins[1]/100.0)

        #index += 1

        #print userdata.tail(10)

        

    df = spark.createDataFrame(userdata)

    #df.createOrReplaceTempView('userdata')

    df.show(1000)





#人民币统计代码案例

from pyspark.sql import Row

from pyspark.sql.types import *

from pyspark.sql.functions import udf

import MySQLdb

import mysql_op

import datetime

import time

from mysql_op import MySQL

import pandas as pd

import numpy as np

from fastparquet import ParquetFile

from fastparquet import write


def fromDayToDay(startdate, datelen, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        endday = startdate + delta * (i + 1)

        func(startday, endday)

    return

def fromDayToEndDay(startdate, datelen, endday, func):

    delta = datetime.timedelta(days=1)

    for i in range(0,datelen):

        startday = startdate + delta * i

        #endday = startdate + delta * (i + 1)

        func(startday, endday)

    return


# 获取人民币数据

def saveDayPackageData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.12',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'JIESUANDB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "SELECT OPTIME,CONSUMERID,PRESERVENUM,CURRENTNUM,DUBIOPTYPE,DUBIOPNUM,OPTYPE,OPDETAIL,OPNUM FROM `DUBIJIESUANTONGJI_%s`" % (strdate)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/billdata"+strday+".parq", pddf)

    return


def savePackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayPackageData)

    

# 获取WF册数据

def saveDayWifiPhoneRegData(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.15',

                        'port': 3306,

                        'user':'user',

                        'passwd':'123654',

                        'db':'AADB',

                        'charset':'utf8'}


    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    strdate = startday.strftime("%y%m%d")


    sql = "select USERID from NEW_WEB_USER where TIME< %d AND TYPE=17" % (tsend)

    print sql

    pddf = pd.read_sql(sql, con=mysql_cn)

    mysql_cn.close()

    print pddf.head(5)

    dflen = len(pddf.index)

    if dflen > 0:

        print pddf.describe()

        write("/home/haoren/logstatis/wifiphonereg"+strday+".parq", pddf)

    return


def saveWifiPhoneReg():

    startday = datetime.date(2016, 12, 1)

    endday = datetime.date(2016, 12, 1)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, saveDayWifiPhoneRegData)

    

OPTypeName = {

    0:"会员",

    1:"道具",


}


OpDetailName19 = {

    1:"购物保存收益",

    2:"下注和返注",

    3:"发红包",

    4:"抢红包",


}


OpDetailName22 = {

    1:"活动1收益到总账号",

    2:"活动2收益到总账号",

    3:"活动3收益到总账号",


}


OpDetailName23 = {

    0:"购买会员",

    1:"购买道具",

    2:"扫雷",


}


def getOpTypeName(func):

    name = OPTypeName.get(func)

    if name == None:

        return ""

    else:

        return name.decode('utf8')

    

def getOpDetailName(func, detail):

    if func == 19:

        if detail > 10000 and detail < 30000:

            return "包裹回滚".decode('utf8')

        elif detail > 50000 and detail < 60000:

            return "红包接龙".decode('utf8')

        else:

            name = OpDetailName19.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 22:

            name = OpDetailName22.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    elif func == 23:

            name = OpDetailName23.get(detail)

            if name == None:

                return ""

            else:

                return name.decode('utf8')

    else:

        return ""


def getDayPackageData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    df.show(10)

    #df.createOrReplaceTempView('billdata')

    #df.registerTempTable("billdata")

    #sqlret = sqlc.sql("SELECT count(*) from billdata")

    #sqlret.show(1)

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('billdata')

    return

    

def getPackageData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, getDayPackageData)#调用前面的函数

    print 'getPackageData finish'


# 获取充值数据

def getChargeInfo(startday, endday):

    #数据库连接参数  

    dbconfig = {'host':'192.168.10.14', 

     'port': 3306, 

     'user':'user', 

     'passwd':'123654', 

     'db':'BAOIMDB', 

     'charset':'utf8'}

    

    #连接数据库,创建这个类的实例

    mysql_cn= MySQLdb.connect(host=dbconfig['host'], port=dbconfig['port'],user=dbconfig['user'], passwd=dbconfig['passwd'], db=dbconfig['db'])

    strday = startday.strftime("%Y%m%d")

    tsstart=time.mktime(startday.timetuple())

    tsend=time.mktime(endday.timetuple())

    regdata = pd.DataFrame()

    for i in range(0, 20): 

        sql = "SELECT * FROM `USERCONSUMPTIONRECORD%d` where TIME > %d AND TIME < %d" % (i, tsstart, tsend)

        print sql

        #pddf = pd.DataFrame()

        pddf = pd.read_sql(sql, con=mysql_cn)

        #print pddf.head(5)

        if len(pddf.index) > 0:

            regdata = regdata.append(pddf,ignore_index=True)

            print regdata.tail(5)

    

    if len(regdata.index) > 0:

        print regdata.describe()

        write("/home/haoren/logstatis/register"+strday+".parq", regdata)

    mysql_cn.close()

    return

    

def pudf(x):

    return getOpTypeName(x.OPTYPE)

    

def getMergeData(strday):

    dfbill = ParquetFile("/home/haoren/logstatis/billdata"+strday+".parq").to_pandas()

    dfwifireg = ParquetFile("/home/haoren/logstatis/wifiphonereg"+strday+".parq").to_pandas()

    tempdf = pd.merge(dfbill, dfwifireg, left_on='CONSUMERID', right_on='USERID')

    #write("/home/haoren/logstatis/analyze"+strday+".parq", tempdf)

    #print tempdf.head(10)

    tempdf['OPTYPENAME'] = tempdf.apply(lambda x:getOpTypeName(x.OPTYPE), axis=1)

    #print tempdf.head(10)

    tempdf['DETAILNAME'] = tempdf.apply(lambda x:getOpDetailName(x.OPTYPE,x.OPDETAIL), axis=1)

    df = spark.createDataFrame(tempdf)

    df.show(10)

    return df

    

def analyzeDayBillData(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'


    df = spark.read.load("/home/haoren/logstatis/billdata"+strday+".parq")

    dfwifireg = spark.read.load("/home/haoren/logstatis/wifiphonereg"+strday+".parq")

    df3 = df.join(dfwifireg, df.CONSUMERID == dfwifireg.USERID)

    df3.show(10)

    df3.write.parquet("/home/haoren/logstatis/analyze"+strday+".parq")

    

    #df2 = df3.withColumn('OPTYPENAME', udf(getOpTypeName)(df3.OPTYPE))

    #df2.show(10)

    #df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    #df.show(10)

    #df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeDayBillData2(startday, endday):

    strday = startday.strftime("%Y%m%d")

    print strday + '人民币数据'

    #df = spark.read.load("/home/haoren/logstatis/analyze"+strday+".parq")

    df = getMergeData(strday)

    return

    df2 = df.withColumn('OPTYPENAME', udf(getOpTypeName)(df.OPTYPE))

    df2.show(10)

    df = df2.withColumn('DETAILNAME', udf(getOpDetailName)(df2.OPTYPE, df2.OPDETAIL))

    df.show(10)

    df.createOrReplaceTempView('analyzebilldata')

    return

    

def analyzeBillData():

    startday = datetime.date(2016, 12, 28)

    endday = datetime.date(2016, 12, 28)

    td = endday - startday

    datelen = td.days + 1

    # 获取包裹数据

    fromDayToDay(startday, datelen, analyzeDayBillData2)#调用前面的函数

    print 'analyzeBillData finish'

    

savePackageData()

getPackageData()

#saveWifiPhoneReg()

#analyzeBillData()










本文转自 chengxuyonghu 51CTO博客,原文链接:http://blog.51cto.com/6226001001/1895757,如需转载请自行联系原作者
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
1天前
|
机器学习/深度学习 人工智能 自然语言处理
总结几个GPT的超实用之处【附带Python案例】
总结几个GPT的超实用之处【附带Python案例】
|
2天前
|
vr&ar Python
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列4
Python 用ARIMA、GARCH模型预测分析股票市场收益率时间序列
20 0
|
2天前
|
机器学习/深度学习 算法 数据可视化
python用支持向量机回归(SVR)模型分析用电量预测电力消费
python用支持向量机回归(SVR)模型分析用电量预测电力消费
25 7
机器学习/深度学习 算法 Python
16 0
|
2天前
|
算法 数据可视化 Python
Python中LARS和Lasso回归之最小角算法Lars分析波士顿住房数据实例
Python中LARS和Lasso回归之最小角算法Lars分析波士顿住房数据实例
11 0
|
2天前
|
数据安全/隐私保护 Python
Python中的装饰器:提升代码可读性与灵活性
Python中的装饰器是一种强大的工具,可以在不改变函数原有逻辑的情况下,为函数添加额外的功能。本文将介绍装饰器的基本概念和用法,并通过实例演示如何利用装饰器提升代码的可读性和灵活性,使代码更加简洁、易于维护。
|
2天前
|
机器学习/深度学习 数据采集 数据可视化
Python数据处理与分析
【4月更文挑战第13天】Python在数据处理与分析中扮演重要角色,常用库包括Pandas(数据处理)、NumPy(数值计算)、Matplotlib和Seaborn(数据可视化)、SciPy(科学计算)、StatsModels(统计建模)及Scikit-learn(机器学习)。数据处理流程涉及数据加载、清洗、探索、特征工程、模型选择、评估与优化,以及结果展示。选择哪个库取决于具体需求和数据类型。
13 1
|
2天前
|
BI 开发者 数据格式
Python代码填充数据到word模板中
【4月更文挑战第16天】
|
3天前
|
数据采集 NoSQL 搜索推荐
五一假期畅游指南:Python技术构建的热门景点分析系统解读
五一假期畅游指南:Python技术构建的热门景点分析系统解读
|
3天前
|
机器学习/深度学习 人工智能 分布式计算
R和Python机器学习:广义线性回归glm,样条glm,梯度增强,随机森林和深度学习模型分析
R和Python机器学习:广义线性回归glm,样条glm,梯度增强,随机森林和深度学习模型分析

热门文章

最新文章