上一篇说了HDFSEventSink的实现,这里根据hdfs sink的配置和调用分析来看下sink中整个hdfs数据写入的过程:
线上hdfs sink的几个重要设置

1
2
3
4
5
6
7
8
hdfs.path = hdfs: //xxxxx/%{logtypename}/%Y%m%d/%H:
hdfs.rollInterval =  60
hdfs.rollSize =  0  //想让文件只根据实际来roll
hdfs.rollCount =  0
hdfs.batchSize =  2000
hdfs.txnEventMax =  2000
hdfs.fileType = DataStream 
hdfs.writeFormat = Text

这里说下和类相关的hdfs.fileType和hdfs.writeFormat,一个定义了文件流式用的类,一个定义了具体的数据序列化的类.
1)hdfs.fileType 有3个可选项:SequenceFile/DataStream/CompressedStream,DataStream可以想象成hdfs的textfile,默认是SequenceFileType,CompressedStream是用于压缩时设置
2)hdfs.writeFormat 定义了3种序列化方法,TEXT只写Event的body部分,HEADER_AND_TEXT写Event的body和header,AVRO_EVENT是avro的序列化方式

上面的设置,其数据写入流程大概如下:

1
SinkRunner.process->SinkProcessor.process->HDFSEventSink.process->HDFSEventSink.append->BucketWriter.append->HDFSWriter.append->HDFSDataStream.append->BodyTextEventSerializer.write->java.io.OutputStream.write

简单说下:
在HDFSEventSink中会实例化BucketWriter和HDFSWriter:

1
2
3
4
5
6
7
         if  (bucketWriter ==  null ) {
           HDFSWriter hdfsWriter = writerFactory.getWriter(fileType );  //获取HDFSWriter 对象
....
           bucketWriter =  new  BucketWriter(rollInterval , rollSize , rollCount ,
               batchSize, context , realPath, realName, inUsePrefix, inUseSuffix,
               suffix, codeC, compType, hdfsWriter, timedRollerPool,
               proxyTicket, sinkCounter , idleTimeout , idleCallback, lookupPath);  //根据HDFSWriter 对象获取BucketWriter对象

这里获取HDFSWriter 对象时用到了org.apache.flume.sink.hdfs.HDFSWriterFactory的getWriter方法,根据hdfs.fileType的设置会返回具体的org.apache.flume.sink.hdfs.HDFSWriter实现类的对象
目前只支持3种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   static  final  String SequenceFileType =  "SequenceFile"  ;
   static  final  String DataStreamType =  "DataStream"  ;
   static  final  String CompStreamType =  "CompressedStream"  ;
....
   public  HDFSWriter getWriter(String fileType)  throws  IOException {
     if  (fileType.equalsIgnoreCase( SequenceFileType)) {  //SequenceFile,sequencefile
       return  new  HDFSSequenceFile();
     else  if  (fileType.equalsIgnoreCase(DataStreamType)) {  //DataStream
       return  new  HDFSDataStream();
     else  if  (fileType.equalsIgnoreCase(CompStreamType)) {  //CompressedStream
       return  new  HDFSCompressedDataStream();
     else  {
       throw  new  IOException( "File type "  + fileType +  " not supported" );
     }

BucketWriter可以理解成是对下层数据操作的一个封装,比如数据写入时其实调用了其append方法,append主要有下面几个步骤:
1)首先判断文件是否打开:

1
2
3
4
5
6
7
     if  (! isOpen) {
       if (idleClosed) {
         throw  new  IOException( "This bucket writer was closed due to idling and this handle "  +
             "is thus no longer valid" );
       }
       open();  //如果没有打开,则调用open->doOpen->HDFSWriter.open方法打开bucketPath (bucketPath是临时写入目录,比如tmp结尾的目录,targetPath是最终目录)
     }

doOpen的主要步骤
a.设置两个文件名:

1
2
3
         bucketPath = filePath + DIRECTORY_DELIMITER + inUsePrefix
           + fullFileName + inUseSuffix;
         targetPath = filePath + DIRECTORY_DELIMITER + fullFileName;

b.调用HDFSWriter.open方法打开bucketPath

1
2
3
4
5
6
7
8
9
10
11
12
          if  (codeC ==  null ) {
           // Need to get reference to FS using above config before underlying
           // writer does in order to avoid shutdown hook & IllegalStateExceptions
           fileSystem =  new  Path(bucketPath ).getFileSystem(config);
           LOG.info( "Creating "  + bucketPath );
           writer.open( bucketPath);
         else  {
           // need to get reference to FS before writer does to avoid shutdown hook
           fileSystem =  new  Path(bucketPath ).getFileSystem(config);
           LOG.info( "Creating "  + bucketPath );
           writer.open( bucketPath, codeC , compType );
         }

c.如果设置了rollInterval ,则执行计划任务调用close方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
     // if time-based rolling is enabled, schedule the roll
     if  (rollInterval >  0 ) {
       Callable<Void> action =  new  Callable<Void>() {
         public  Void call()  throws  Exception {
           LOG.debug( "Rolling file ({}): Roll scheduled after {} sec elapsed."  ,
               bucketPath, rollInterval );
           try  {
             close();
           catch (Throwable t) {
             LOG.error( "Unexpected error"  , t);
           }
           return  null  ;
         }
       };
       timedRollFuture = timedRollerPool.schedule(action, rollInterval ,
           TimeUnit. SECONDS);
     }

2)判断文件是否需要翻转(达到hdfs.rollSize或者hdfs.rollCount设置):

1
2
3
4
5
     // check if it's time to rotate the file
     if  (shouldRotate()) {
       close();  //close调用flush+doClose,flush调用doFlush,doFlush调用HDFSWriter.sync方法把数据同步到hdfs中
       open();
     }

其中shouldRotate(基于数量和大小的roll方式):

1
2
3
4
5
6
7
8
9
10
11
12
   private  boolean  shouldRotate() {
     boolean  doRotate =  false ;
     if  (( rollCount >  0 ) && (rollCount <= eventCounter )) {  //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
       LOG.debug(  "rolling: rollCount: {}, events: {}"  , rollCount , eventCounter );
       doRotate =  true ;
     }
     if  (( rollSize >  0 ) && ( rollSize <= processSize)) {  //hdfs.rollCount大于0并且处理的event的数量大于或等于hdfs.rollCount,doRotate 设置为true
       LOG.debug(  "rolling: rollSize: {}, bytes: {}"  , rollSize , processSize );
       doRotate =  true ;
     }
     return  doRotate;
   }

其中doClose主要的步骤
a.调用HDFSWriter.close方法
b.调用renameBucket方法把tmp文件命名为最终文件:

1
2
3
4
     if  (bucketPath !=  null  && fileSystem !=  null ) {
       renameBucket();  // could block or throw IOException
       fileSystem =  null ;
     }

其中renameBucket:

1
fileSystem.rename(srcPath, dstPath)

3)调用HDFSWriter.append方法写入Event

1
writer.append(event);

4) 更新计数器

1
2
3
4
     // update statistics
     processSize += event.getBody(). length;
     eventCounter++;
     batchCounter++;

5)判断是否需要flush(达到hdfs.batchSize的设置),batch写入数据到hdfs

1
2
3
     if  (batchCounter == batchSize) {
       flush();
     }

Event写入时BucketWriter的append方法调用org.apache.flume.sink.hdfs.HDFSWriter实现类的append方法,比如这里的HDFSDataStream类,HDFSDataStream的主要方法:
configure用于设置serializer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   public  void  configure(Context context) {
     serializerType = context.getString(  "serializer" "TEXT"  );  //默认序列化方式为TEXT
     useRawLocalFileSystem = context.getBoolean(  "hdfs.useRawLocalFileSystem" ,
         false );
     serializerContext =
         new  Context(context.getSubProperties(EventSerializer.CTX_PREFIX));
     logger.info(  "Serializer = "  + serializerType +  ", UseRawLocalFileSystem = "
         + useRawLocalFileSystem);
   }
append方法用于Event的写入,调用EventSerializer.write方法:
   public  void  append(Event e)  throws  IOException {
     // shun flumeformatter...
     serializer.write(e);  //调用EventSerializer.write方法写入Event
   }

open方法主要步骤:
1)根据hdfs.append.support的设置(默认为false)打开或者新建文件

1
2
3
4
5
6
7
8
     boolean  appending =  false ;
     if  (conf.getBoolean(  "hdfs.append.support" false  ) ==  true  && hdfs.isFile
             (dstPath)) {  //默认hdfs.append.support为false
       outStream = hdfs.append(dstPath);
       appending =  true ;
     else  {
       outStream = hdfs.create(dstPath);  //如果不支持append,则创建文件
     }

2)使用EventSerializerFactory.getInstance方法创建EventSerializer的对象

1
2
     serializer = EventSerializerFactory.getInstance(
         serializerType, serializerContext , outStream );  //实例化EventSerializer对象

3)如果EventSerializer对象支持reopen,并且hdfs.append.support设置为true时会抛出异常

1
2
3
4
5
6
     if  (appending && ! serializer.supportsReopen()) {
       outStream.close();
       serializer =  null ;
       throw  new  IOException( "serializer ("  + serializerType +
           ") does not support append" );
     }

4)调用文件打开或者reopen之后的操作

1
2
3
4
5
6
     if  (appending) {
       serializer.afterReopen();
     else  {
       serializer.afterCreate();
     }
   }

这里hdfs.writeFormat的3种设置和对应的类:

1
2
3
   TEXT(BodyTextEventSerializer.Builder.  class ),  //支持reopen
   HEADER_AND_TEXT(HeaderAndBodyTextEventSerializer.Builder.  class ),  //支持reopen
   AVRO_EVENT(FlumeEventAvroEventSerializer.Builder.  class ),  // 不支持reopen

默认设置为TEXT,即BodyTextEventSerializer类:

1
2
3
4
5
6
7
8
9
10
   private  BodyTextEventSerializer(OutputStream out, Context ctx) {  //构造方法
     this . appendNewline = ctx.getBoolean(APPEND_NEWLINE , APPEND_NEWLINE_DFLT );  //默认为true
     this . out = out;
   }
....
   public  void  write(Event e)  throws  IOException {  //write方法
     out.write(e.getBody());  //java.io.OutputStream.write,只写Event的body
     if  (appendNewline) {  //每一行之后增加一个回车
       out.write( '\n' );
     }