hadoop中典型Writable类详解

简介:

Hadoop将很多Writable类归入org.apache.hadoop.io包中,在这些类中,比较重要的有Java基本类、Text、Writable集合、ObjectWritable等,重点介绍Java基本类和ObjectWritable的实现。

1. Java基本类型的Writable封装

目前Java基本类型对应的Writable封装如下表所示。所有这些Writable类都继承自WritableComparable。也就是说,它们是可比较的。同时,它们都有get()和set()方法,用于获得和设置封装的值。

Java基本类型对应的Writable封装

Java基本类型 Writable 序列化后长度
布尔型(boolean) BooleanWritable 1
字节型(byte) ByteWritable 1
整型(int)

IntWritable

VIntWritable

4

1~5

浮点型(float) FloatWritable 4
长整型(long)

LongWritable

VLongWritable

8

1~9

双精度浮点型(double) DoubleWritable 8

在表中,对整型(int和long)进行编码的时候,有固定长度格式(IntWritable和LongWritable)和可变长度格式(VIntWritable和VLongWritable)两种选择。固定长度格式的整型,序列化后的数据是定长的,而可变长度格式则使用一种比较灵活的编码方式,对于数值比较小的整型,它们往往比较节省空间。同时,由于VIntWritable和VLongWritable的编码规则是一样的,所以VIntWritable的输出可以用VLongWritable读入。下面以VIntWritable为例,说明Writable的Java基本类封装实现。代码如下:

public class VIntWritable implements WritableComparable {
   private int value;
   ……
   // 设置VIntWritable的值
   public void set(int value) { this.value = value; }

   // 获取VIntWritable的值
   public int get() { return value; }

   public void readFields(DataInput in) throws IOException {
      value = WritableUtils.readVInt(in);
   }

   public void write(DataOutput out) throws IOException {
      WritableUtils.writeVInt(out, value);
   }
   ……
}

首先,每个Java基本类型的Writable封装,其类的内部都包含一个对应基本类型的成员变量value,get()和set()方法就是用来 对该变量进行取值/赋值操作的。而Writable接口要求的readFields()和write()方法,VIntWritable则是通过调用 Writable工具类中提供的readVInt()和writeVInt()读/写数据。方法readVInt()和writeVInt()的实现也只 是简单调用了readVLong()和writeVLong(),所以,通过writeVInt()写的数据自然可以通过readVLong()读入。

writeVLong ()方法实现了对整型数值的变长编码,它的编码规则如下:

如果输入的整数大于或等于–112同时小于或等于127,那么编码需要1字节;否则,序列化结果的第一个字节,保存了输入整数的符号和后续编码的字节数。符号和后续字节数依据下面的编码规则(又一个规则):

如果是正数,则编码值范围落在–113和–120间(闭区间),后续字节数可以通过–(v+112)计算。

如果是负数,则编码值范围落在–121和–128间(闭区间),后续字节数可以通过–(v+120)计算。

后续编码将高位在前,写入输入的整数(除去前面全0字节)。代码如下:

public final class WritableUtils  {
   public stati cvoid writeVInt(DataOutput stream, int i) throws IOException
   {
      writeVLong(stream, i);
   }
   /**
    * @param stream保存系列化结果输出流
    * @param i 被序列化的整数
    * @throws java.io.IOException
    */
   public static void writeVLong(DataOutput stream, long i) throws……
   {
      //处于[-112, 127]的整数
      if (i >= -112 && i <= 127) {
         stream.writeByte((byte)i);
         return;
      }
      //计算情况2的第一个字节
      int len = -112;
      if (i < 0) {
         i ^= -1L;
         len = -120;
      }
      long tmp = i;
      while (tmp != 0) {
         tmp = tmp >> 8;
         len--;
      }
      stream.writeByte((byte)len);
      len = (len < -120) ? -(len + 120) : -(len + 112);
      //输出后续字节
      for (int idx = len; idx != 0; idx--) {
         int shiftbits = (idx - 1) * 8;
         long mask = 0xFFL << shiftbits;
         stream.writeByte((byte)((i & mask) >> shiftbits));
      }
   }
}

2. ObjectWritable类的实现

针对Java基本类型、字符串、枚举、Writable、空值、Writable的其他子类,ObjectWritable提供了一个封装,适用于字段需要使用多种类型。ObjectWritable可应用于Hadoop远程过程调用中参数的序列化和反序列化;ObjectWritable的另一个典型应用是在需要序列化不同类型的对象到某一个字段,如在一个SequenceFile的值中保存不同类型的对象(如LongWritable值或Text值)时,可以将该值声明为ObjectWritable。

ObjectWritable的实现比较冗长,需要根据可能被封装在ObjectWritable中的各种对象进行不同的处理。 ObjectWritable有三个成员变量,包括被封装的对象实例instance、该对象运行时类的Class对象和Configuration对 象。

ObjectWritable的write方法调用的是静态方法ObjectWritable.writeObject(),该方法可以往DataOutput接口中写入各种Java对象。

writeObject()方法先输出对象的类名(通过对象对应的Class 对象的getName()方法获得),然后根据传入对象的类型,分情况序列化对象到输出流中,也就是说,对象通过该方法输出对象的类名,对象序列化结果对 到输出流中。在ObjectWritable.writeObject()的逻辑中,需要分别处理null、Java数组、字符串String、Java 基本类型、枚举和Writable的子类6种情况,由于类的继承,处理Writable时,序列化的结果包含对象类名,对象实际类名和对象序列化结果三部 分。

为什么需要对象实际类名呢?根据Java的单根继承规则,ObjectWritable中传入的declaredClass,可以是传入instance对象对应的类的类对象,也可以是instance对象的父类的类对象。但是,在序列化和反序列化的时候,往往不能使用父类的序列化方法(如write方法)来序列化子类对象,所以,在序列化结果中必须记住对象实际类名。相关代码如下:

public class ObjectWritable implements Writable, Configurable {
   private Class declaredClass;//保存于ObjectWritable的对象对应的类对象
   private Object instance;//被保留的对象
   private Configuration conf;

   public ObjectWritable() {}

   public ObjectWritable(Object instance) {
      set(instance);
   }

   public ObjectWritable(Class declaredClass, Object instance) {
      this.declaredClass = declaredClass;
      this.instance = instance;
   }           
   ……
   public void readFields(DataInput in) throws IOException {
      readObject(in, this, this.conf);
   }

   public void write(DataOutput out) throws IOException {
      writeObject(out, instance, declaredClass, conf);
   }
   ……
   public static void writeObject(DataOutput out, Object instance,
         Class declaredClass,Configuration conf) throws……{

      if (instance == null) {//
         instance = new NullInstance(declaredClass, conf);
         declaredClass = Writable.class;
      }

      // 写出declaredClass的规范名
      UTF8.writeString(out, declaredClass.getName());

      if (declaredClass.isArray()) {//数组
         ……
      } else if (declaredClass == String.class) {//字符串
         ……
      } else if (declaredClass.isPrimitive()) {//基本类型
         if (declaredClass == Boolean.TYPE) {        //boolean
            out.writeBoolean(((Boolean)instance).booleanValue());
         } else if (declaredClass == Character.TYPE) { //char
            ……
         }
      } else if (declaredClass.isEnum()) {//枚举类型
         ……
      } else if (Writable.class.isAssignableFrom(declaredClass)) {
         //Writable的子类
         UTF8.writeString(out, instance.getClass().getName());
         ((Writable)instance).write(out);
      } else {
         ……
   }

   public static Object readObject(DataInput in,
                  ObjectWritable objectWritable, Configuration conf){
      ……
      Class instanceClass = null;
      ……
      Writable writable = WritableFactories.newInstance(instanceClass,
            conf);
      writable.readFields(in);
      instance = writable;
      ……
   }
}

和输出对应,ObjectWritable的readFields()方法调用的是静态方法 ObjectWritable.readObject(),该方法的实现和writeObject()类似,唯一值得研究的是Writable对象处理部 分,readObject()方法依赖于WritableFactories类。WritableFactories类允许非公有的Writable子类 定义一个对象工厂,由该工厂创建Writable对象,如在上面的readObject()代码中,通过WritableFactories的静态方法 newInstance(),可以创建类型为instanceClass的Writable子对象。相关代码如下:

public class WritableFactories {
   //保存了类型和WritableFactory工厂的对应关系
   private static final HashMap<Class, WritableFactory>CLASS_TO_FACTORY
      = new HashMap<Class, WritableFactory>();
   ……
   public static Writable newInstance(Class<? extends Writable> c,
                                      Configuration conf) {
      WritableFactory factory = WritableFactories.getFactory(c);
      if (factory != null) {
         Writable result = factory.newInstance();
         if (result instanceof Configurable) {
            ((Configurable) result).setConf(conf);
         }
         return result;
      } else {
         //采用传统的反射工具ReflectionUtils,创建对象
         return ReflectionUtils.newInstance(c, conf);
      }
   }
}

WritableFactories.newInstance()方法根据输入的类型查找对应的WritableFactory工厂对象,然后调用 该对象的newInstance()创建对象,如果该对象是可配置的,newInstance()还会通过对象的setConf()方法配置对象。

WritableFactories提供注册机制,使得这些Writable子类可以将该工厂登记到WritableFactories的静态成员 变量CLASS_TO_FACTORY中。下面是一个典型的WritableFactory工厂实现,来自于HDFS的数据块Block。其 中,WritableFactories.setFactory()需要两个参数,分别是注册类对应的类对象和能够构造注册类的 WritableFactory接口的实现,在下面的代码里,WritableFactory的实现是一个匿名类,其newInstance()方法会创 建一个新的Block对象。

public class Block implements Writable, Comparable<Block> {
   static {
      WritableFactories.setFactory
         (Block.class,//类对象
         new WritableFactory() {//对应类的WritableFactory实现
            public Writable newInstance() { return new Block(); }
         });
   }           
   ……
}

ObjectWritable作为一种通用机制,相当浪费资源,它需要为每一个输出写入封装类型的名字。如果类型的数量不是很多,而且可以事先知 道,则可以使用一个静态类型数组来提高效率,并使用数组索引作为类型的序列化引用。GenericWritable就是因为这个目的被引入 org.apache.hadoop.io包中。

目录
相关文章
|
4月前
|
分布式计算 Hadoop 大数据
|
存储 分布式计算 自然语言处理
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
Hadoop序列化、概述、自定义bean对象实现序列化接口(Writable)、序列化案例实操、编写流量统计的Bean对象、编写Mapper类、编写Reducer类、编写Driver驱动类
|
分布式计算 Java Hadoop
|
分布式计算 Hadoop
|
分布式计算 Hadoop Java
|
存储 分布式计算 Hadoop
|
11天前
|
存储 分布式计算 Hadoop
大数据处理架构Hadoop
【4月更文挑战第10天】Hadoop是开源的分布式计算框架,核心包括MapReduce和HDFS,用于海量数据的存储和计算。具备高可靠性、高扩展性、高效率和低成本优势,但存在低延迟访问、小文件存储和多用户写入等问题。运行模式有单机、伪分布式和分布式。NameNode管理文件系统,DataNode存储数据并处理请求。Hadoop为大数据处理提供高效可靠的解决方案。
33 2
|
11天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。

热门文章

最新文章

相关实验场景

更多