hcatalog简介和使用

简介:

Hcatalog是apache开源的对于表和底层数据管理统一服务平台,目前最新release版本是0.5,不过需要hive 0.10支持,由于我们hive集群版本是0.9.0,所以只能降级使用hcatalog 0.4,由于hcatalog中所有的底层数据信息都是保存在hive metastore里,所以hive版本升级后schema变动或者api变动会对hacatalog产生影响,因此在hive 0.11中已经集成了了hcatalog,以后也会成为hive的一部分,而不是独立的项目。

HCatalog底层依赖于Hive Metastore,执行过程中会创建一个HiveMetaStoreClient,通过这个instance提供的api来获取表结构数据,如果是local metastore mode的话,会直接返回一个HiveMetaStore.HMSHandler,如果是remote mode的话(hive.metastore.local设置为false),会依据hive.metastore.uris(比如thrift://10.1.8.42:9083, thrift://10.1.8.51:9083)中设定的一串uri逐一顺序建立连接。只要有一个链接建立就可以了,同时为了避免所有client都和第一个uri建立连接,导致负载过大,我加了点小trick,对这串uris随机shuffle来做load balance

由于我们的集群开启了kerberos security,需要获取DelegationToken,但是local mode是不支持的,所以只用能remote mode

HiveMetaStoreClient.java


  public String getDelegationToken(String owner, String renewerKerberosPrincipalName) throws
      MetaException, TException {
    if (localMetaStore) {
      throw new UnsupportedOperationException("getDelegationToken() can be " +
          "called only in thrift (non local) mode");
    }
    return client.get_delegation_token(owner, renewerKerberosPrincipalName);
  }


HCatInputFormat和HCatOutputFormat提供一些mapreduce api来读取表和写入表

HCatInputFormat API:


  public static void setInput(Job job,
      InputJobInfo inputJobInfo) throws IOException;

先实例化一个InputJobInfo对象,该对象包含三个参数dbname,tablename,filter,然后传给setInput函数,来读取相应的数据

public static HCatSchema getTableSchema(JobContext context) 
    throws IOException;

在运行时(比如mapper阶段的setup函数中),可以传进去JobContext,调用静态getTableSchema来获取先前setInput时设置的table schema信息

HCatOutputFormat API:

public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;

OutPutJobInfo接受三个参数databaseName, tableName, partitionValues,其中第三个参数类型是Map<String, String>,partition key放在map key里,partition value放在对应map key的value中,该参数可传入null或空map,如果指定的partition存在的话,会抛org.apache.hcatalog.common.HCatException : 2002 : Partition already present with given partition key values

比如要要写入指定的partition(dt=’2013-06-13′,country=’china’ ),可以这样写

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("dt", "2013-06-13");
partitionValues.put("country", "china");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

public static HCatSchema getTableSchema(JobContext context) throws IOException;

获取之前HCatOutputFormat.setOutput指定的table schema信息

public static void setSchema(final Job job, final HCatSchema schema) throws IOException;

设置最终写入数据的schema信息,若不调用这个方法,则默认会使用table schema信息

下面提供一个完整mapreduce例子计算一天每个guid访问页面次数,map阶段从表中读取guid字段,reduce阶段统计该guid对应pageview的总数,然后写回另外一张带有guid和count字段的表中


import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.apache.hcatalog.mapreduce.HCatInputFormat;
import org.apache.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;

public class GroupByGuid extends Configured implements Tool {

	@SuppressWarnings("rawtypes")
	public static class Map extends
			Mapper<WritableComparable, HCatRecord, Text, IntWritable> {
		HCatSchema schema;
		Text guid;
		IntWritable one;

		@Override
		protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
				throws IOException, InterruptedException {
			guid = new Text();
			one = new IntWritable(1);
			schema = HCatInputFormat.getTableSchema(context);
		}

		@Override
		protected void map(WritableComparable key, HCatRecord value,
				Context context) throws IOException, InterruptedException {
			guid.set(value.getString("guid", schema));
			context.write(guid, one);
		}
	}

	@SuppressWarnings("rawtypes")
	public static class Reduce extends
			Reducer<Text, IntWritable, WritableComparable, HCatRecord> {
		HCatSchema schema;

		@Override
		protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
				throws IOException, InterruptedException {
			schema = HCatOutputFormat.getTableSchema(context);
		}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			Iterator<IntWritable> iter = values.iterator();
			while (iter.hasNext()) {
				sum++;
				iter.next();
			}
			HCatRecord record = new DefaultHCatRecord(2);
			record.setString("guid", schema, key.toString());
			record.setInteger("count", schema, sum);
			context.write(null, record);
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		String dbname = args[0];
		String inputTable = args[1];
		String filter = args[2];
		String outputTable = args[3];
		int reduceNum = Integer.parseInt(args[4]);

		Job job = new Job(conf,
				"GroupByGuid, Calculating every guid's pageview");
		HCatInputFormat.setInput(job,
				InputJobInfo.create(dbname, inputTable, filter));

		job.setJarByClass(GroupByGuid.class);
		job.setInputFormatClass(HCatInputFormat.class);
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(WritableComparable.class);
		job.setOutputValueClass(DefaultHCatRecord.class);
		job.setNumReduceTasks(reduceNum);

		HCatOutputFormat.setOutput(job,
				OutputJobInfo.create(dbname, outputTable, null));
		HCatSchema s = HCatOutputFormat.getTableSchema(job);
		HCatOutputFormat.setSchema(job, s);

		job.setOutputFormatClass(HCatOutputFormat.class);

		return (job.waitForCompletion(true) ? 0 : 1);
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new GroupByGuid(), args);
		System.exit(exitCode);
	}
}


其实hcatalog还支持动态分区dynamic partition,我们可以在OutJobInfo中指定部分partition keyvalue pair,在运行时候根据传进来的值设置HCatRecord对应的其他partition keyvalue pair,这样就能在一个job中同时写多个partition了

本文链接http://blog.csdn.net/lalaguozhe/article/details/9083905,转载请注明

目录
相关文章
|
存储 编解码
H264简介
H.264 原始码流(又称为裸流),是有一个接一个的 NALU 组成的,而它的功能分为两层:视频编码层(VCL, Video Coding Layer)和网络提取层(NAL, Network Abstraction Layer),其中,前者负责有效表示视频数据的内容,而后者则负责格式化数据并提供头信息,以保证数据适合各种信道和存储介质上的传输。
H264简介
|
Java 测试技术 开发工具
AssertJ简介
JUnit和Hamcrest 学Java的一定知道JUnit吧,它是一个著名的Java单元测试框架。我们在使用JUnit测试的时候,一般情况下会使用它的Assert类下的各种assertXXX方法。
1569 0
|
索引 容器
NSIndexSet 简介
NSIndexSet (以及它的可修改子类, NSMutableIndexSet) 是一个排好序的,无重复元素的整数集合。它看上去有点像 支持离散整数的 NSRange .它能用于快速查找特定范围的值的索引,也能用于快速计算交集, 同时,Foundation collection class 提供了很多好用的方法,方便你使用 NSIndexSet.
253 0
NSIndexSet 简介
|
存储 数据安全/隐私保护
TrueLicense简介
原文 TrueLicense是一个开源的证书管理引擎,官网 使用场景:当项目交付给客户之后用签名来保证客户不能随意使用项目 默认校验了开始结束时间,可扩展增加mac地址校验等。 其中还有ftp的校验没有尝试,本文详细介绍的是本地校验 license授权机制的原理: 生成密钥对,方法有很多。
8258 0
|
数据安全/隐私保护 网络架构 网络协议
|
关系型数据库 数据安全/隐私保护 监控
|
Java Windows Unix
|
安全 网络协议 Shell
|
JSON Java 数据库
|
前端开发 JavaScript 中间件