HDFS源码分析数据块校验之DataBlockScanner

简介:         DataBlockScanner是运行在数据节点DataNode上的一个后台线程。它为所有的块池管理块扫描。针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描、校验数据块。

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程。它为所有的块池管理块扫描。针对每个块池,一个BlockPoolSliceScanner对象将会被创建,其运行在一个单独的线程中,为该块池扫描、校验数据块。当一个BPOfferService服务变成活跃或死亡状态,该类中的blockPoolScannerMap将会更新。

        我们先看下DataBlockScanner的成员变量,如下:

  // 所属数据节点DataNode实例
  private final DataNode datanode;
  // 所属存储FsDatasetSpi实例
  private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
  // 配置信息Configuration实例
  private final Configuration conf;
  
  // 线程休眠周期,5s
  static final int SLEEP_PERIOD_MS = 5 * 1000;

  /**
   * Map to find the BlockPoolScanner for a given block pool id. This is updated
   * when a BPOfferService becomes alive or dies.
   * 存储块池ID到对应BlockPoolScanner实例的映射。
   * 当一个BPOfferService服务变成活跃或死亡状态,blockPoolScannerMap将会随之更新。
   */
  private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap = 
    new TreeMap<String, BlockPoolSliceScanner>();
  
  // 数据块扫描线程
  Thread blockScannerThread = null;
        首先是由构造函数确定的三个成员变量:所属数据节点DataNode实例datanode、所属存储FsDatasetSpi实例dataset、配置信息Configuration实例conf,对应构造函数如下:

  // 构造函数
  DataBlockScanner(DataNode datanode,
      FsDatasetSpi<? extends FsVolumeSpi> dataset,
      Configuration conf) {
    this.datanode = datanode;
    this.dataset = dataset;
    this.conf = conf;
  }
        然后设定了一个静态变量,5s的线程休眠周期,即SLEEP_PERIOD_MS,另外两个重要的成员变量是:

       1、TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap

             存储块池ID到对应BlockPoolScanner实例的映射。当一个BPOfferService服务变成活跃或死亡状态,blockPoolScannerMap将会随之更新。

        2、Thread blockScannerThread

              数据块扫描线程。

        既然DataBlockScanner实现了Runnable接口,那么它肯定是作为一个线程在DataNode节点上运行的,我们看下DataNode是如何对其进行构造及启动的,代码如下:

  /**
   * See {@link DataBlockScanner}
   */
  private synchronized void initDataBlockScanner(Configuration conf) {
    
	// 如果blockScanner不为null,直接返回
	if (blockScanner != null) {
      return;
    }
	
	// 数据块校验功能无法开启的原因
    String reason = null;
    assert data != null;
    
    // 如果参数dfs.datanode.scan.period.hours未配置,或者配置为0,说明数据块校验功能已关闭
    if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
                    DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
      reason = "verification is turned off by configuration";
      
    // SimulatedFSDataset不支持数据块校验
    } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
      reason = "verifcation is not supported by SimulatedFSDataset";
    }
    
    // 如果数据块校验功能无法开启的原因为null,构造DataBlockScanner实例,并调用其start()方法启动该线程
    if (reason == null) {
      blockScanner = new DataBlockScanner(this, data, conf);
      blockScanner.start();
    } else {
    	
      // 否则在日志文件中记录周期性数据块校验扫描无法启用的原因
      LOG.info("Periodic Block Verification scan disabled because " + reason);
    }
  }
        首先,如果blockScanner不为null,直接返回,说明之前已经初始化并启动了,然后,确定数据块校验功能无法开启的原因reason:

        1、如果参数dfs.datanode.scan.period.hours未配置,或者配置为0,说明数据块校验功能已关闭;

        2、SimulatedFSDataset不支持数据块校验;

        如果数据块校验功能无法开启的原因为null,构造DataBlockScanner实例,并调用其start()方法启动该线程,否则在日志文件中记录周期性数据块校验扫描无法启用的原因。

        DataBlockScanner线程启动的start()方法如下:

  public void start() {
	  
	// 基于DataBlockScanner实例创建一个线程blockScannerThread
    blockScannerThread = new Thread(this);
    // 将线程blockScannerThread设置为后台线程
    blockScannerThread.setDaemon(true);
    // 启动线程blockScannerThread
    blockScannerThread.start();
  }
        实际上它是基于DataBlockScanner实例创建一个线程blockScannerThread,将线程blockScannerThread设置为后台线程,然后启动线程blockScannerThread。

        DataBlockScanner线程已创建,并启动,那么我们看下它是如何工作的,接下来看下它的run()方法,代码如下:

  // 线程核心run()方法
  @Override
  public void run() {
	  
	// 当前块池ID,默认为空
    String currentBpId = "";
    
    // 第一次运行标志,默认当然应该为true
    boolean firstRun = true;
    
    // 如果所属数据节点DataNode实例datanode正常运行,且当前线程没有被中断
    while (datanode.shouldRun && !Thread.interrupted()) {
      //Sleep everytime except in the first iteration.
    	
      // 如果不是第一次运行,线程休眠5s
      if (!firstRun) {
        try {
          Thread.sleep(SLEEP_PERIOD_MS);
        } catch (InterruptedException ex) {
          // Interrupt itself again to set the interrupt status
        	
          // 如果发生InterruptedException异常,中断blockScannerThread线程,然后跳过,继续下一轮循环
          blockScannerThread.interrupt();
          continue;
        }
      } else {
    	// 第一次运行时先将firstRun标志设置为false
        firstRun = false;
      }
      
      // 获取下一个块池切片扫描器BlockPoolSliceScanner实例bpScanner
      BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
      
      // 如果bpScanner为null,跳过,继续下一轮循环
      if (bpScanner == null) {
        // Possible if thread is interrupted
        continue;
      }
      
      // 设置当前块池ID,即currentBpId,从块池切片扫描器BlockPoolSliceScanner实例bpScanner中获取
      currentBpId = bpScanner.getBlockPoolId();
      
      // If BPOfferService for this pool is not alive, don't process it
      // 如果当前块池对应的心跳服务BPOfferService不是活跃的,不对它进行处理,调用removeBlockPool()方法从blockPoolScannerMap中移除数据,
      // 并关闭对应BlockPoolSliceScanner,然后跳过,执行下一轮循环
      if (!datanode.isBPServiceAlive(currentBpId)) {
        LOG.warn("Block Pool " + currentBpId + " is not alive");
        // Remove in case BP service died abruptly without proper shutdown
        removeBlockPool(currentBpId);
        continue;
      }
      
      // 调用块池切片扫描器BlockPoolSliceScanner实例bpScanner的scanBlockPoolSlice()方法,
      // 扫描对应块池里的数据块,进行数据块校验
      bpScanner.scanBlockPoolSlice();
    }

    // Call shutdown for each allocated BlockPoolSliceScanner.
    // 退出循环后,遍历blockPoolScannerMap中的每个BlockPoolSliceScanner实例bpss,
    // 挨个调用对应shutdown()方法,停止块池切片扫描器BlockPoolSliceScanner
    for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
      bpss.shutdown();
    }
  }
        run()方法逻辑比较清晰,大体如下:

        1、首先初始化当前块池ID,即currentBpId,默认为空,再确定第一次运行标志firstRun,默认当然应该为true;

        2、接下来进入一个while循环,循环的条件是如果所属数据节点DataNode实例datanode正常运行,且当前线程没有被中断:

               2.1、处理第一次运行标志位firstRun:

                         2.1.1、如果不是第一次运行,线程休眠5s:即firstRun为false,这时如果发生InterruptedException异常,中断blockScannerThread线程,然后跳过,继续下一轮循环;

                         2.1.2、第一次运行时先将firstRun标志设置为false;

               2.2、获取下一个块池切片扫描器BlockPoolSliceScanner实例bpScanner,通过调用getNextBPScanner()方法,传入当前块池ID,即currentBpId来实现,首次循环,currentBpId为空,后续会传入之前处理的值,下面会对其进行更新;

               2.3、如果bpScanner为null,跳过,继续下一轮循环;

               2.4、设置当前块池ID,即currentBpId,从块池切片扫描器BlockPoolSliceScanner实例bpScanner中获取;

               2.5、如果当前块池对应的心跳服务BPOfferService不是活跃的,不对它进行处理,调用removeBlockPool()方法从blockPoolScannerMap中移除数据,并关闭对应BlockPoolSliceScanner,然后跳过,执行下一轮循环;

               2.6、调用块池切片扫描器BlockPoolSliceScanner实例bpScanner的scanBlockPoolSlice()方法,扫描对应块池里的数据块,进行数据块校验;

        3、退出循环后,遍历blockPoolScannerMap中的每个BlockPoolSliceScanner实例bpss,挨个调用对应shutdown()方法,停止块池切片扫描器BlockPoolSliceScanner。

        我们接下来看下比较重要的getNextBPScanner()方法,代码如下:

  /**
   * Find next block pool id to scan. There should be only one current
   * verification log file. Find which block pool contains the current
   * verification log file and that is used as the starting block pool id. If no
   * current files are found start with first block-pool in the blockPoolSet.
   * However, if more than one current files are found, the one with latest 
   * modification time is used to find the next block pool id.
   * 寻找下一个块池ID以进行scan。
   * 此时应该只有一个当前验证日志文件。
   */
  private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
    
    String nextBpId = null;
    
    // 如果所属数据节点DataNode实例datanode正常运行,且当前blockScannerThread线程没有被中断
    while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
    	
      // 等待初始化
      waitForInit();
      
      synchronized (this) {
    	  
    	// 当blockPoolScannerMap大小大于0,即存在BlockPoolSliceScanner实例时,做以下处理:
        if (getBlockPoolSetSize() > 0) {          
          // Find nextBpId by the minimum of the last scan time
          // lastScanTime用于记录上次浏览时间
          long lastScanTime = 0;
          
          // 遍历blockPoolScannerMap集合,取出每个块池ID,即bpid
          for (String bpid : blockPoolScannerMap.keySet()) {
        	  
        	// 根据块池ID,即bpid,取出其对应BlockPoolSliceScanner实例的上次浏览时间t
            final long t = getBPScanner(bpid).getLastScanTime();
            
            // 如果t不为0,且如果块池ID为null,或者t小于lastScanTime,则将t赋值给lastScanTime,bpid赋值给nextBpId
            // 也就是计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId
            if (t != 0L) {
              if (bpid == null || t < lastScanTime) {
                lastScanTime =  t;
                nextBpId = bpid;
              }
            }
          }
          
          // nextBpId can still be null if no current log is found,
          // find nextBpId sequentially.
          
          // 如果对应块池ID,即nextBpId为null,则取比上次处理的块池currentBpId高的key作为nextBpId,
          // 如果还不能取出的话,那么取第一个块池ID,作为nextBpId
          if (nextBpId == null) {
            nextBpId = blockPoolScannerMap.higherKey(currentBpId);
            if (nextBpId == null) {
              nextBpId = blockPoolScannerMap.firstKey();
            }
          }
          
          // 如果nextBpId不为空,那么从blockPoolScannerMap中获取其对应BlockPoolSliceScanner实例返回
          if (nextBpId != null) {
            return getBPScanner(nextBpId);
          }
        }
      }
      
      // 记录warn日志,No block pool is up, going to wait,然后等待
      LOG.warn("No block pool is up, going to wait");
      
      try {
    	// 线程休眠5s
        Thread.sleep(5000);
      } catch (InterruptedException ex) {
        LOG.warn("Received exception: " + ex);
        blockScannerThread.interrupt();
        return null;
      }
    }
    return null;
  }
        它的主要作用就是寻找下一个块池ID以进行scan,其存在一个整体的while循环,循环的条件为如果所属数据节点DataNode实例datanode正常运行,且当前blockScannerThread线程没有被中断,循环内做以下处理:

        1、调用waitForInit()方法等待初始化;

        2、当前对象上使用synchronized进行同步,当blockPoolScannerMap大小大于0,即存在BlockPoolSliceScanner实例时,做以下处理:

               2.1、设定lastScanTime用于记录上次浏览时间,默认值为0;

               2.2、遍历blockPoolScannerMap集合,取出每个块池ID,即bpid,计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId:

                        2.2.1、根据块池ID,即bpid,取出其对应BlockPoolSliceScanner实例的上次浏览时间t;

                        2.2.2、如果t不为0,且如果块池ID为null,或者t小于lastScanTime,则将t赋值给lastScanTime,bpid赋值给nextBpId,也就是计算最早的上次浏览时间lastScanTime,和对应块池ID,即nextBpId;

               2.3、如果对应块池ID,即nextBpId为null,则取比上次处理的块池currentBpId高的key作为nextBpId,如果还不能取出的话,那么取第一个块池ID,作为nextBpId;

               2.4、如果nextBpId不为空,那么从blockPoolScannerMap中获取其对应BlockPoolSliceScanner实例返回;

        3、如果blockPoolScannerMap大小等于0,或者上述2找不到的话,记录warn日志,No block pool is up, going to wait,然后等待5s后继续下一轮循环;

        最后,实在找不到就返回null。

        可见,getNextBPScanner()方法优先选取最早处理过的块池,找不到的话再按照之前处理过的块池ID增长的顺序,找下一个块池ID,按照块池ID大小顺序到尾部的话,再折回取第一个。

        其中等待初始化的waitForInit()方法比较简单,代码如下:

  // Wait for at least one block pool to be up
  private void waitForInit() {
	  
	// 如果BlockPoolSliceScanner的个数小于数据节点所有BpOS个数,或者BlockPoolSliceScanner的个数小于1,一直等待
	// BpOS你可以理解为DataNode上每个块池或命名空间对应的一个实例,它处理该命名空间到对应活跃或备份状态NameNode的心跳。
    while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
        || (getBlockPoolSetSize() < 1)) {
      try {
    	  
    	// 线程休眠5s
        Thread.sleep(SLEEP_PERIOD_MS);
      } catch (InterruptedException e) {
    	  
    	// 如果发生InterruptedException异常,中断blockScannerThread线程,然后返回
        blockScannerThread.interrupt();
        return;
      }
    }
  }
        它本质上是等所有块池都被上报至blockPoolScannerMap集合后,才认为已完成初始化,然后再挑选块池ID,否则线程休眠5s,继续等待。代码注释比较详细,这里不再赘述!

        获取到块池ID,并获取到其对应的块池切片扫描器BlockPoolSliceScanner实例bpScanner了,接下来就是调用bpScanner的scanBlockPoolSlice()方法,扫描该块池的数据块,并做数据块校验工作了。这方面的内容,请阅读《HDFS源码分析数据块校验之BlockPoolSliceScanner》一文,这里不再做介绍。

        到了这里,各位看官可能有个疑问,选取块池所依赖的blockPoolScannerMap集合中的数据是哪里来的呢?答案就在处理数据节点心跳的BPServiceActor线程中,在完成数据块汇报、处理来自名字节点NameNode的相关命令等操作后,有如下代码被执行:

        // Now safe to start scanning the block pool.
        // If it has already been started, this is a no-op.
        // 现在可以安全地扫描块池,如果它已经启动,这是一个空操作。
        if (dn.blockScanner != null) {
          dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
        }
        很简单,数据节点汇报数据块给名字节点,并执行来自名字节点的相关命令后,就可以通过数据节点DataNode中成员变量blockScanner的addBlockPool()方法,添加块池,代码如下:

  public synchronized void addBlockPool(String blockPoolId) {
    
	// 如果blockPoolScannerMap集合中存在块池blockPoolId,直接返回
	if (blockPoolScannerMap.get(blockPoolId) != null) {
      return;
    }
	
	// 根据块池blockPoolId、数据节点datanode、存储dataset、配置信息conf等构造BlockPoolSliceScanner实例bpScanner
    BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
        datanode, dataset, conf);
    
    // 将块池blockPoolId与bpScanner的映射关系存储到blockPoolScannerMap中
    blockPoolScannerMap.put(blockPoolId, bpScanner);
    
    // 记录日志信息
    LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
        + blockPoolScannerMap.size());
  }
        逻辑很简单,首先需要看看blockPoolScannerMap集合中是否存在块池blockPoolId,存在即返回,否则根据块池blockPoolId、数据节点datanode、存储dataset、配置信息conf等构造BlockPoolSliceScanner实例bpScanner,将块池blockPoolId与bpScanner的映射关系存储到blockPoolScannerMap中,最后记录日志信息。

        我们在上面也提到了如果当前块池对应的心跳服务BPOfferService不是活跃的,那么会调用removeBlockPool()方法,移除对应的块池,代码如下:

  public synchronized void removeBlockPool(String blockPoolId) {
	  
	// 根据块池blockPoolId,从blockPoolScannerMap中移除数据,并得到对应BlockPoolSliceScanner实例bpss
    BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
    
    // 调用bpss的shutdown()方法,关闭bpss
    if (bpss != null) {
      bpss.shutdown();
    }
    
    // 记录日志信息
    LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
  }
        代码很简单,不再赘述。


        总结

        DataBlockScanner是运行在数据节点DataNode上的一个后台线程,它负责管理所有块池的数据块扫描工作。当数据节点DataNode发送心跳给名字节点NameNode进行数据块汇报并执行完返回的命令时,会在DataBlockScanner的内部集合blockPoolScannerMap中注册块池ID与为此新创建的BlockPoolSliceScanner对象的关系,然后DataBlockScanner内部线程blockScannerThread周期性的挑选块池currentBpId,并获取块池切片扫描器BlockPoolSliceScanner实例bpScanner,继而调用其scanBlockPoolSlice()方法,扫描对应块池里的数据块,进行数据块校验。块池选择的主要依据就是优先选择扫描时间最早的,也就是自上次扫描以来最长时间没有进行扫描的,按照这一依据选择不成功的话,则默认按照块池ID递增的顺序循环选取块池。










相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
存储 缓存 分布式计算
|
存储 机器学习/深度学习 分布式计算
|
存储 调度 机器学习/深度学习
HDFS源码分析心跳汇报之数据块汇报
        在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。
1112 0
|
机器学习/深度学习
HDFS源码分析心跳汇报之整体结构
        我们知道,HDFS全称是Hadoop Distribute FileSystem,即Hadoop分布式文件系统。既然它是一个分布式文件系统,那么肯定存在很多物理节点,而这其中,就会有主从节点之分。
1237 0
|
机器学习/深度学习
HDFS源码分析心跳汇报之数据结构初始化
        在《HDFS源码分析心跳汇报之整体结构》一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager、BPOfferService和BPServiceActor三者之间的关系。
1120 0
|
存储 缓存
HDFS源码分析DataXceiver之读数据块
         在《HDFS源码分析DataXceiver之整体流程》一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver。
1196 0
|
存储 分布式计算 Hadoop
HDFS源码分析DataXceiver之整体流程
        在《HDFS源码分析之DataXceiverServer》一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。
1538 0