squid 日志分析 - hadoop hive

简介:

 #!/usr/bin/env python

#-*-coding:UTF-8-*-
"""
@Item   :  Hadoop analysis squid log
@Author :  Villiam Sheng
@Group  :  Linux Group
@Date   :  2012-09-13
@Funtion:
            Use hadoop squid processing log ......................
"""
import os,re,threading,tarfile,shutil,time,sys,datetime
import sys,traceback
sys.path.append('/usr/local/hive/lib/py')
from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hive_service import ThriftHive
from hive_service.ttypes import HiveServerException
 
 
class split_log(threading.Thread):
    def __init__(self,files,lock):
        threading.Thread.__init__(self)
        self.files  = files
        self.path = '/data1/squid_log'
        self.lock = lock
 
    def log(self,info):
        files = open('%s/split_log'%self.path,'a')
        try:
            files.write(info)
        except IOError:
            files.close()
        files.close()
        size = os.path.getsize('%s/split_log'%self.path) / 1024
        if size >= 1024:
            fp = open('%s/split_log'%self.path,'w')
            fp.write('')
            fp.close()
 
    def run(self):
        try:
            if not os.path.isdir(self.files):
                tar = tarfile.open('%s/%s'%(self.path,self.files))
                for name in tar.getnames():
                    tar.extract(name,'/%s/%s/'%(self.path,self.files.split('.tgz')[0]))
                tar.close()
                self.log('%s file:%s run.info:tar done \n'%(time.ctime(),self.files))
        except Exception,e:
            self.log('%s files:%s run.info:%s \n'%(time.ctime(),self.files,e))
        try:
            fp = open('%s/%s/access.log.0'%(self.path,self.files.split('.tgz')[0]),'r')
            for i in fp.readlines():
                b = i.split()
                fps = open('%s/%s/tmp.log'%(self.path,self.files.split('.tgz')[0]),'a+')
                fps.write("%s\t%s\t%s\t%s\t%s\n" %(datetime.datetime.strptime(b[3].split('[')[1],'%d/%b/%Y:%H:%M:%S').strftime('%Y%m%d%H%M%S'),b[0],i.split('/')[4],b[-5],b[-1]))
            fps.close()
            fp.close()
        except Exception,e:
             self.log('%s file:%s inert.info:insert done \n'%(time.ctime(),self.files.split('.tgz')[0]))
        try:
            asql="load data local inpath \'%s/%s/tmp.log\' overwrite into table squid.squid_tmp partition(pt_ip = \'%s\')" %(self.path,self.files.split('.tgz')[0],self.files.split('.tgz')[0])
            self.lock.acquire()
            transport = TSocket.TSocket('127.0.0.1',10000)
            transport = TTransport.TBufferedTransport(transport)
            protocol = TBinaryProtocol.TBinaryProtocol(transport)
            client = ThriftHive.Client(protocol)
            transport.open()
            try:
                client.execute(asql)
            except:
                pass
            transport.close()
            self.lock.release()
            os.remove ('%s/%s'%(self.path,self.files))
            shutil.rmtree('%s/%s' %(self.path,self.files.split('.tgz')[0]))
        except:
            self.log('Exception error : %s '%traceback.print_exc())
   
 
def work():
    ip_list = re.compile('^(([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))\.)(([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))\.){2}([1-9]|([1-9]\d)|(1\d\d)|(2([0-4]\d|5[0-5])))$')
    lock = threading.RLock()
    for i in os.listdir('/data1/squid_log/'):
        if not ip_list.match(i.split('.tgz')[0]):
            continue
        st = split_log(i,lock)
        st.start()
    while threading.active_count() != 1:
        time.sleep(1)
 
    transport = TSocket.TSocket('127.0.0.1',10000)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = ThriftHive.Client(protocol)
    transport.open()
    client.execute("use squid")
    client.execute("set hive.exec.dynamic.partition=true")
    client.execute("set hive.exec.dynamic.partition.mode=nonstrict")
    sql = "insert into table squid_log partition(pt_ip, pt_dt) select visittime, clientip, visitdom, visiturl, visitstat, pt_ip, substr(visittime, 1, 8) as pt_dt from squid_tmp"
    client.execute(sql)
    transport.close()
 
if __name__ == "__main__":
    try:
        pid = os.fork()
        if pid > 0 :
            sys.exit(0)
        os.setsid()
        os.chdir('/')
        sys.stdin = open("/dev/null","r+")
        sys.stdout = os.dup(sys.stdin.fileno())
        sys.stderr = os.dup(sys.stdin.fileno())
        while True:
            work()
            time.sleep(300)
        work()
    except:
        pass

本文转自 swq499809608 51CTO博客,原文链接:http://blog.51cto.com/swq499809608/1142212
相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
4月前
|
SQL 分布式计算 Hadoop
干翻Hadoop系列文章【02】:Hadoop、Hive、Spark的区别和联系
干翻Hadoop系列文章【02】:Hadoop、Hive、Spark的区别和联系
|
4月前
|
SQL 存储 大数据
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
【大数据技术Hadoop+Spark】Hive基础SQL语法DDL、DML、DQL讲解及演示(附SQL语句)
77 0
|
4月前
|
SQL 存储 分布式计算
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
【大数据技术Hadoop+Spark】Hive数据仓库架构、优缺点、数据模型介绍(图文解释 超详细)
186 0
|
14天前
|
SQL 分布式计算 Hadoop
利用Hive与Hadoop构建大数据仓库:从零到一
【4月更文挑战第7天】本文介绍了如何使用Apache Hive与Hadoop构建大数据仓库。Hadoop的HDFS和YARN提供分布式存储和资源管理,而Hive作为基于Hadoop的数据仓库系统,通过HiveQL简化大数据查询。构建过程包括设置Hadoop集群、安装配置Hive、数据导入与管理、查询分析以及ETL与调度。大数据仓库的应用场景包括海量数据存储、离线分析、数据服务化和数据湖构建,为企业决策和创新提供支持。
55 1
|
3月前
|
SQL 数据采集 分布式计算
Hadoop和Hive中的数据倾斜问题及其解决方案
Hadoop和Hive中的数据倾斜问题及其解决方案
44 0
|
3月前
|
SQL 分布式计算 安全
HIVE启动错误:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeExcept
HIVE启动错误:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.SafeModeExcept
136 0
|
3月前
|
SQL 存储 分布式计算
Hadoop中的Hive是什么?请解释其作用和用途。
Hadoop中的Hive是什么?请解释其作用和用途。
38 0
|
4月前
|
SQL 分布式计算 Hadoop
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
[AIGC ~大数据] 深入理解Hadoop、HDFS、Hive和Spark:Java大师的大数据研究之旅
|
4月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
263 0
|
4月前
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.15 安装HIVE
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
106 1
Hadoop学习笔记(HDP)-Part.15 安装HIVE