Hive、MapReduce、Spark分布式生成唯一数值型ID

简介:

在实际业务场景下,经常会遇到在Hive、MapReduce、Spark中需要生成唯一的数值型ID。

一般常用的做法有:

MapReduce中使用1个Reduce来生成;

Hive中使用row_number分析函数来生成,其实也是1个Reduce;

借助HBase或Redis或Zookeeper等其它框架的计数器来生成;

数据量不大的情况下,可以直接使用1和2方法来生成,但如果数据量巨大,1个Reduce处理起来就非常慢。

在数据量非常大的情况下,如果你仅仅需要唯一的数值型ID,注意:不是需要”连续的唯一的数值型ID”,那么可以考虑采用本文中介绍的方法,否则,请使用第3种方法来完成。

Spark中生成这样的非连续唯一数值型ID,非常简单,直接使用zipWithUniqueId()即可。

参考zipWithUniqueId()的方法,在MapReduce和Hive中,实现如下:

在Spark中,zipWithUniqueId是通过使用分区Index作为每个分区ID的开始值,在每个分区内,ID增长的步长为该RDD的分区数,那么在MapReduce和Hive中,也可以照此思路实现,Spark中的分区数,即为MapReduce中的Map数,Spark分区的Index,即为Map Task的ID。Map数,可以通过JobConf的getNumMapTasks(),而Map Task ID,可以通过参数mapred.task.id获取,格式如:attempt_1478926768563_0537_m_000004_0,截取m_000004_0中的4,再加1,作为该Map Task的ID起始值。注意:这两个只均需要在Job运行时才能获取。另外,从图中也可以看出,每个分区/Map Task中的数据量不是绝对一致的,因此,生成的ID不是连续的。

下面的UDF可以在Hive中直接使用:

 
  1. package com.lxw1234.hive.udf; 
  2.   
  3. import org.apache.hadoop.hive.ql.exec.MapredContext; 
  4. import org.apache.hadoop.hive.ql.exec.UDFArgumentException; 
  5. import org.apache.hadoop.hive.ql.metadata.HiveException; 
  6. import org.apache.hadoop.hive.ql.udf.UDFType; 
  7. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; 
  8. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; 
  9. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; 
  10. import org.apache.hadoop.io.LongWritable; 
  11.   
  12. @UDFType(deterministic = false, stateful = true
  13. public class RowSeq2 extends GenericUDF { 
  14.      
  15.     private static LongWritable result = new LongWritable(); 
  16.     private static final char SEPARATOR = '_'
  17.     private static final String ATTEMPT = "attempt"
  18.     private long initID = 0l; 
  19.     private int increment = 0; 
  20.      
  21.      
  22.     @Override 
  23.     public void configure(MapredContext context) { 
  24.         increment = context.getJobConf().getNumMapTasks(); 
  25.         if(increment == 0) { 
  26.             throw new IllegalArgumentException("mapred.map.tasks is zero"); 
  27.         } 
  28.          
  29.         initID = getInitId(context.getJobConf().get("mapred.task.id"),increment); 
  30.         if(initID == 0l) { 
  31.             throw new IllegalArgumentException("mapred.task.id"); 
  32.         } 
  33.          
  34.         System.out.println("initID : " + initID + "  increment : " + increment); 
  35.     } 
  36.      
  37.     @Override 
  38.     public ObjectInspector initialize(ObjectInspector[] arguments) 
  39.             throws UDFArgumentException { 
  40.         return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 
  41.     } 
  42.   
  43.     @Override 
  44.     public Object evaluate(DeferredObject[] arguments) throws HiveException { 
  45.         result.set(getValue()); 
  46.         increment(increment); 
  47.         return result; 
  48.     } 
  49.      
  50.     @Override 
  51.     public String getDisplayString(String[] children) { 
  52.         return "RowSeq-func()"
  53.     } 
  54.      
  55.     private synchronized void increment(int incr) { 
  56.         initID += incr; 
  57.     } 
  58.      
  59.     private synchronized long getValue() { 
  60.         return initID; 
  61.     } 
  62.      
  63.     //attempt_1478926768563_0537_m_000004_0 // return 0+1 
  64.     private long getInitId (String taskAttemptIDstr,int numTasks) 
  65.             throws IllegalArgumentException { 
  66.         try { 
  67.             String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); 
  68.             if(parts.length == 6) { 
  69.                 if(parts[0].equals(ATTEMPT)) { 
  70.                     if(!parts[3].equals("m") && !parts[3].equals("r")) { 
  71.                         throw new Exception(); 
  72.                     } 
  73.                     long result = Long.parseLong(parts[4]); 
  74.                     if(result >= numTasks) { //if taskid >= numtasks 
  75.                         throw new Exception("TaskAttemptId string : " + taskAttemptIDstr 
  76.                                 + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); 
  77.                     } 
  78.                     return result + 1; 
  79.                 } 
  80.             } 
  81.         } catch (Exception e) {} 
  82.         throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
  83.                 + " is not properly formed"); 
  84.     } 
  85.      
  86.   

有一张去重后的用户id(字符串类型)表,需要位每个用户id生成一个唯一的数值型seq:

 
  1. ADD jar file:///tmp/udf.jar; 
  2. CREATE temporary function seq2 as 'com.lxw1234.hive.udf.RowSeq2'
  3.   
  4. hive>> desc lxw_all_ids; 
  5. OK 
  6. id                      string                                       
  7. Time taken: 0.074 seconds, Fetched: 1 row(s) 
  8. hive> select * from lxw_all_ids limit 5; 
  9. OK 
  10. 01779E7A06ABF5565A4982_cookie 
  11. 031E2D2408C29556420255_cookie 
  12. 03371ADA0B6E405806FFCD_cookie 
  13. 0517C4B701BC1256BFF6EC_cookie 
  14. 05F12ADE0E880455931C1A_cookie 
  15. Time taken: 0.215 seconds, Fetched: 5 row(s) 
  16. hive> select count(1) from lxw_all_ids; 
  17. 253402337 
  18.   
  19. hive> create table lxw_all_ids2 as select id,seq2() as seq from lxw_all_ids; 
  20. … 
  21. Hadoop job information for Stage-1: number of mappers: 27; number of reducers: 0 
  22. … 
  23.   
  24.   
  25.   

该Job使用了27个Map Task,没有使用Reduce,那么将会产生27个结果文件。

再看结果表中的数据:

 
  1. hive> select * from lxw_all_ids2 limit 10; 
  2. OK 
  3. 766CA2770527B257D332AA_cookie   1 
  4. 5A0492DB0000C557A81383_cookie   28 
  5. 8C06A5770F176E58301EEF_cookie   55 
  6. 6498F47B0BCAFE5842B83A_cookie   82 
  7. 6DA33CB709A23758428A44_cookie   109 
  8. B766347B0D27925842AC2D_cookie   136 
  9. 5794357B050C99584251AC_cookie   163 
  10. 81D67A7B011BEA5842776C_cookie   190 
  11. 9D2F8EB40AEA525792347D_cookie   217 
  12. BD21077B09F9E25844D2C1_cookie   244 
  13.   
  14. hive> select count(1),count(distinct seq) from lxw_all_ids2; 
  15. 253402337       253402337 
  16.   

limit 10只从第一个结果文件,即MapTaskId为0的结果文件中拿了10条,这个Map中,start=1,increment=27,因此生成的ID如上所示。

count(1),count(distinct seq)的值相同,说明seq没有重复值,你可以试试max(seq),结果必然大于253402337,说明seq是”非连续唯一数值型ID“.


本文作者:佚名

来源:51CTO

相关文章
|
7天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
3月前
|
缓存 算法 NoSQL
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式系统通过副本控制协议,使得从系统外部读取系统内部各个副本的数据在一定的约束条件下相同,称之为副本一致性(consistency)。副本一致性是针对分布式系统而言的,不是针对某一个副本而言。强一致性(strong consistency):任何时刻任何用户或节点都可以读到最近一次成功更新的副本数据。强一致性是程度最高的一致性要求,也是实践中最难以实现的一致性。单调一致性(monotonic consistency):任何时刻,任何用户一旦读到某个数据在某次更新后的值,这个用户不会再读到比这个值更旧的值。
380 0
|
1天前
|
SQL 缓存 分布式计算
【Hive】Hive的两张表关联,使用MapReduce怎么实现?
【4月更文挑战第16天】【Hive】Hive的两张表关联,使用MapReduce怎么实现?
|
20天前
|
缓存 算法 关系型数据库
深度思考:雪花算法snowflake分布式id生成原理详解
雪花算法snowflake是一种优秀的分布式ID生成方案,其优点突出:它能生成全局唯一且递增的ID,确保了数据的一致性和准确性;同时,该算法灵活性强,可自定义各部分bit位,满足不同业务场景的需求;此外,雪花算法生成ID的速度快,效率高,能有效应对高并发场景,是分布式系统中不可或缺的组件。
深度思考:雪花算法snowflake分布式id生成原理详解
|
1月前
|
算法 Java 数据中心
分布式ID生成系统之雪花算法详解
在当今的云计算和微服务架构盛行的时代,分布式系统已成为软件开发的重要组成部分。随着系统规模的扩大和业务的复杂化,对数据一致性和唯一性的要求也越来越高,尤其是在全局唯一标识符(ID)的生成上。因此,分布式ID生成系统应运而生,成为保证数据唯一性和提高系统可扩展性的关键技术之一。雪花算法(Snowflake)是Twitter开源的一种算法,用于生成64位的全局唯一ID,非常适用于分布式系统中生成唯一标识符。下面我们将深入探讨雪花算法的原理、结构和实现方式。
93 2
 分布式ID生成系统之雪花算法详解
|
1月前
|
NoSQL 算法 Java
【Redis】4、全局唯一 ID生成、单机(非分布式)情况下的秒杀和一人一单
【Redis】4、全局唯一 ID生成、单机(非分布式)情况下的秒杀和一人一单
62 0
|
2月前
|
存储 算法 NoSQL
全网最全的分布式ID生成方案解析
全网最全的分布式ID生成方案解析
85 0
|
3月前
|
算法 NoSQL 关系型数据库
9种 分布式ID生成方式
9种 分布式ID生成方式
411 0
|
3月前
|
SQL 存储 分布式计算
Spark与Hive的集成与互操作
Spark与Hive的集成与互操作
|
3月前
|
分布式计算 大数据 数据处理
Spark RDD(弹性分布式数据集)
Spark RDD(弹性分布式数据集)