使用Hive进行OSS数据处理的一个最佳实践

简介:

本文主要介绍如何使用Hive来处理保存在OSS上的数据源,并通过E-MapReduce计算,最终的结果保存在OSS上,并能够每天自动的进行Hive的分区数据的调度

处理条件:

数据源:我们假设在OSS上我们的数据是按照一定的目录格式来保存的,比如时间,按照类似2016/06/01这样的年/月/日的方式存放。而原始数据内容都是一些非格式化的数据,完全没有经过处理。
类似如下的一个格式:

123|service control exceed 100. others content|192.168.0.1|2016-05-31

结果数据:我们需要把每个目录下的数据经过处理,写到OSS上类似2016/06/01的一个结果目录下

处理过程:

创建元数据表

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

通过这一步,我们有了一张Hive的分区表,Hive只是在它的元数据库中记录了这个表的信息,这个时候还没有数据的处理。而数据也还在我们的OSS上躺着。

接着把需要的分区都加入到表中,这里我假设我们有很多个分区

ALTER TABLE logoss ADD PARTITION (year='2016', month='05', day='31') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/05/31' PARTITION (year='2016', month='06', day='01') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/01' PARTITION (year='2016', month='06', day='02') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/02' PARTITION (year='2016', month='06', day='03') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/2016/06/03';

接下来我们select数据看一下,执行如下

select * from logoss limit 100;

我们就会看到我们的分区中的内容了。

处理原始数据

我们要把原来OSS上的原始数据,经过处理然后写到一个HDFS上的表,然后用这个HDFS的表进行后续的一系列处理。这里把所有的中间步骤都在HDFS上走,这样速度会快很多。

首先建立一个基于HDFS的Hive表,目前数据也还是空

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

然后将OSS的数据进行处理并写入到HDFS的表中,这里我们使用IF NOT EXISTS,为了防止这个分区已经存在被我们覆盖掉,如果你希望数据直接覆盖,可以去掉这个条件判断。

INSERT OVERWRITE TABLE loghdfs PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

业务处理

好了,到了这一步,我们就已经有了一个hdfs上的表了,我们可以对这个表进行任意的后续处理,
比如groupby 所有的ip,然后看他们的总数值

CREATE TABLE userip as select ip, count(id) from loghdfs group by ip;

中间可以进行类似的各种操作,由你的业务决定。
当所有的操作都完成以后,如果要把数据写到OSS上,那么来到最后一步

写回OSS

首先我们会创建一个对应OSS路径的Hive表,与第一步很类似

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path';

最后把我们的业务数据写入到对应的分区中去

INSERT OVERWRITE TABLE resultoss PARTITION (year='2016', month='05', day='31') IF NOT EXISTS select ip, count FROM userip;

这样我们的结果数据就写到了OSS上对应的目录下,类似这样的路径

/path/year=2016/month=05/day=31/

如何自动化

看了上面的这个过程,会发现这中间这个时间的分区需要我们手工写在里面,实在是太麻烦了,完全没有办法自动跑啊,那么下面我们就来更加进化一下。

job上配置自动时间

我们首先在E-MapReduce控制台上编辑的时候使用hivevar来指定时间变量,如下

-hivevar year='2016' -hivevar month='05' -hivevar day='31' -f ossref://mypath/job.hql

然后,我们需要把这个里面的常量变成每天自动变化的时间,我们使用E-MapReduce提供的时间变量
如下

-hivevar year=' ${yyyy-1d}' -hivevar month=' ${MM-1d}' -hivevar day=' ${dd-1d}' -f ossref://mypath/job.hql

时间配置的说明请参考这里

完整的作业配置及代码

screenshot

现在我们看看修改完成以后的完整的代码,中间的分区时间都是用变量进行了替换

CREATE EXTERNAL TABLE logoss (logcontent string) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/';

ALTER TABLE logoss ADD PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/path/${hivevar:year}/${hivevar:month}/${hivevar:day}';

CREATE TABLE loghdfs (id string, content string, ip string, oridate string) partitioned by (year string, month string, day string) stored AS textfile;

INSERT OVERWRITE TABLE loghdfs PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select split(logcontent,'\\|')[0] as id, split(logcontent,'\\|')[1] as content, split(logcontent,'\\|')[2] as ip, split(logcontent,'\\|')[3] as oridate FROM logoss;

CREATE TABLE userip as select ip, count(id) as count from loghdfs group by ip;

CREATE EXTERNAL TABLE resultoss (ip string, count int) partitioned by (year string, month string, day string) stored AS textfile location 'oss://akid:aksecret@bucket.oss-cn-hangzhou-internal.aliyuncs.com/outpath/';

INSERT OVERWRITE TABLE resultoss PARTITION (year='${hivevar:year}', month='${hivevar:month}', day='${hivevar:day}') IF NOT EXISTS select ip, count FROM userip;

然后你可以把这个作业加到一个周期执行的执行计划中,每天运行一次,就可以完全的自动每天跑数据啦。

相关实践学习
借助OSS搭建在线教育视频课程分享网站
本教程介绍如何基于云服务器ECS和对象存储OSS,搭建一个在线教育视频课程分享网站。
目录
相关文章
|
5月前
|
SQL 消息中间件 数据处理
DataX读取Hive Orc格式表丢失数据处理记录
DataX读取Hive Orc格式表丢失数据处理记录
135 0
|
1月前
|
存储 Cloud Native Serverless
云原生最佳实践系列 7:基于 OSS Object FC 实现非结构化文件实时处理
阿里云OSS对象存储方案利用函数计算FC,在不同终端请求时实时处理OSS中的原图,减少衍生图存储,降低成本。
|
2月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
157 0
|
8月前
|
SQL JSON 数据处理
大数据Hive JSON数据处理
大数据Hive JSON数据处理
89 0
|
4月前
|
SQL 分布式计算 关系型数据库
Sqoop数据导入到Hive表的最佳实践
Sqoop数据导入到Hive表的最佳实践
|
11月前
|
SQL 存储 监控
通过sdk查看oss投递(新版)延迟情况最佳实践
在投递任务中,日志服务会将运行日志写入到给定的logstore中,因而可以使用SDK来查看投递任务的当前状态,并进行批量查询,以了解多个Project和投递任务的状态。下面以查看oss投递的延迟为例,介绍客户提供操作步骤和常见的使用场景,以帮助客户更加方便地监控和管理投递任务。
通过sdk查看oss投递(新版)延迟情况最佳实践
|
存储 数据采集 运维
Alibaba Cloud Lens for OSS最佳实践
——其冀 阿里云智能产品专家
Alibaba Cloud Lens for OSS最佳实践
|
SQL 分布式计算 关系型数据库
Hive 数仓迁移 JindoFS/OSS 数据湖最佳实践
Hive 数仓是大多数迁移客户都会遇到的场景。在迁移过程中,不建议同时在新集群进行业务升级(比如从 Hive on MR 迁移到 Hive on Tez 或 Spark SQL等),这些业务升级可以在迁移完成后进行。1. 元数据同步Hive 元数据是对于 Hive 表来说非常关键,除了表结构信息,里面还记录着 Hive 表与底层文件系统的关联关系,许多上层服务都依赖 Hive 元数据提供服务。a.
575 0
|
存储 SQL 机器学习/深度学习
OSS加速器最佳实践-总述篇
OSS加速器最佳实践(总述篇)本最佳实践提供OSS加速器相关的信息和适合的场景,面向对oss和数据湖相关技术有一定了解的开发者。     大家可以通过这俩篇先做一些了解相关文档:《配置OSS加速器》https://help.aliyun.com/document_detail/190726.html《OSS加速器介绍》https://developer.aliyun.com/article/780
OSS加速器最佳实践-总述篇
|
5月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
108 1