HDFS源码分析数据块之CorruptReplicasMap

简介:         CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息。仅当它的所有副本损坏时一个数据块才被认定为损坏。当汇报数据块的副本时,我们隐藏所有损坏副本。一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除。

        CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息。仅当它的所有副本损坏时一个数据块才被认定为损坏。当汇报数据块的副本时,我们隐藏所有损坏副本。一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除。

        我们先看下CorruptReplicasMap都有哪些成员变量,如下所示:

  // 存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
  private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
    new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
        是的,你没看错,就这一个corruptReplicasMap集合,它是一个用来存储损坏数据块Block实例与它对应每个数据节点和损坏原因集合的映射关系的集合。

        我们看下CorruptReplicasMap都提供了哪些有用的方法。

        一、addToCorruptReplicasMap()

        标记属于指定数据节点的数据块为损坏

  /**
   * Mark the block belonging to datanode as corrupt.
   * 标记属于指定数据节点的数据块为损坏
   *
   * @param blk Block to be added to CorruptReplicasMap
   * @param dn DatanodeDescriptor which holds the corrupt replica
   * @param reason a textual reason (for logging purposes)
   * @param reasonCode the enum representation of the reason
   */
  void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
      String reason, Reason reasonCode) {
	  
	// 先从corruptReplicasMap集合中查找是否存在对应数据块blk
    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
    
    // 如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap
    if (nodes == null) {
      nodes = new HashMap<DatanodeDescriptor, Reason>();
      corruptReplicasMap.put(blk, nodes);
    }
    
    String reasonText;
    if (reason != null) {
      reasonText = " because " + reason;
    } else {
      reasonText = "";
    }
    
    // 判断nodes中是否存在对应数据节点dn,分别记录日志信息
    if (!nodes.keySet().contains(dn)) {
      NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                   blk.getBlockName() +
                                   " added as corrupt on " + dn +
                                   " by " + Server.getRemoteIp() +
                                   reasonText);
    } else {
    	
      NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
                                   "duplicate requested for " + 
                                   blk.getBlockName() + " to add as corrupt " +
                                   "on " + dn +
                                   " by " + Server.getRemoteIp() +
                                   reasonText);
    }
    
    // Add the node or update the reason.
    // 将数据节点dn、损坏原因编码reasonCode加入或更新入nodes
    nodes.put(dn, reasonCode);
  }
        处理逻辑很简单,大体如下:

        1、先从corruptReplicasMap集合中查找是否存在对应数据块blk;

        2、如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap;

        3、判断nodes中是否存在对应数据节点dn,分别记录日志信息;

        4、将数据节点dn、损坏原因编码reasonCode加入或更新入nodes。

        二、removeFromCorruptReplicasMap()

        将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除

  // 将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除
  boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
      Reason reason) {
	  
	// 先从corruptReplicasMap集合中查找是否存在对应数据块blk,获得datanodes
    Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
    
    // 如果不存在,直接返回false,表明移除失败
    if (datanodes==null)
      return false;

    // if reasons can be compared but don't match, return false.
    
    // 取出数据节点datanode对应的存储损坏原因storedReason
    Reason storedReason = datanodes.get(datanode);
    
    // 判断存储损坏原因storedReason与参数损坏原因reason是否一致,不一致直接返回false,表明移除失败,
    // 判断的依据为参数损坏原因reason不是ANY且存储损坏原因storedReason不为空的情况下,两者不一致
    if (reason != Reason.ANY && storedReason != null &&
        reason != storedReason) {
      return false;
    }

    // 将datanode对应数据从datanodes中移除
    if (datanodes.remove(datanode) != null) { // remove the replicas
      
      // 移除datanode后,如果datanodes为空
      if (datanodes.isEmpty()) {
        // remove the block if there is no more corrupted replicas
    	// 将数据块blk从集合corruptReplicasMap中移除
        corruptReplicasMap.remove(blk);
      }
      
      // 返回true,表明移除成功
      return true;
    }
    
    // 其他情况下直接返回false,表明移除失败
    return false;
  }
        三、getCorruptReplicaBlockIds()

        获取指定大小和起始数据块ID的损坏数据块ID数组

  /**
   * Return a range of corrupt replica block ids. Up to numExpectedBlocks 
   * blocks starting at the next block after startingBlockId are returned
   * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId 
   * is null, up to numExpectedBlocks blocks are returned from the beginning.
   * If startingBlockId cannot be found, null is returned.
   * 获取指定大小和起始数据块ID的损坏数据块ID数组
   *
   * @param numExpectedBlocks Number of block ids to return.
   *  0 <= numExpectedBlocks <= 100
   * @param startingBlockId Block id from which to start. If null, start at
   *  beginning.
   * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
   *
   */
  long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
                                   Long startingBlockId) {
	  
	// 校验numExpectedBlocks,需要获取的数据块ID数组最多有100个元素
    if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
      return null;
    }
    
    // 获得corruptReplicasMap集合的数据块迭代器blockIt
    Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
    
    // if the starting block id was specified, iterate over keys until
    // we find the matching block. If we find a matching block, break
    // to leave the iterator on the next block after the specified block. 
    
    // 如果设定了起始数据块艾迪startingBlockId
    if (startingBlockId != null) {
      boolean isBlockFound = false;
      
      // 遍历corruptReplicasMap,查看是否存在startingBlockId,如果存在,跳出循环,此时已记录住迭代器的位置了
      while (blockIt.hasNext()) {
        Block b = blockIt.next();
        if (b.getBlockId() == startingBlockId) {
          isBlockFound = true;
          break; 
        }
      }
      
      // 如果不存在,直接返回null
      if (!isBlockFound) {
        return null;
      }
    }

    // 构造一个存储数据块ID的列表corruptReplicaBlockIds
    ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();

    // append up to numExpectedBlocks blockIds to our list
    
    // 遍历corruptReplicasMap,将最多numExpectedBlocks个数据块ID添加到列表corruptReplicaBlockIds,
    // 此时的迭代器可能不是从头开始取数据的,在startingBlockId需要并存在的情况下,它是从下一个元素开始获取的
    for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
      corruptReplicaBlockIds.add(blockIt.next().getBlockId());
    }
    
    // 将数据块ID列表corruptReplicaBlockIds转换成数组ret
    long[] ret = new long[corruptReplicaBlockIds.size()];
    for(int i=0; i<ret.length; i++) {
      ret[i] = corruptReplicaBlockIds.get(i);
    }
    
    // 返回数据块ID数组ret
    return ret;
  }
        四、getNodes()

        根据损坏数据块获取对应数据节点集合

  /**
   * Get Nodes which have corrupt replicas of Block
   * 根据损坏数据块获取对应数据节点集合
   * 
   * @param blk Block for which nodes are requested
   * @return collection of nodes. Null if does not exists
   */
  Collection<DatanodeDescriptor> getNodes(Block blk) {
    Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
    if (nodes == null)
      return null;
    return nodes.keySet();
  }
        五、isReplicaCorrupt()

        检测指定数据块和数据节点是否为损坏的

  /**
   * Check if replica belonging to Datanode is corrupt
   * 检测指定数据块和数据节点是否为损坏的
   *
   * @param blk Block to check
   * @param node DatanodeDescriptor which holds the replica
   * @return true if replica is corrupt, false if does not exists in this map
   */
  boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
    Collection<DatanodeDescriptor> nodes = getNodes(blk);
    return ((nodes != null) && (nodes.contains(node)));
  }
        六、numCorruptReplicas()

        获取给定数据块对应数据节点数量

  // 获取给定数据块对应数据节点数量
  int numCorruptReplicas(Block blk) {
    Collection<DatanodeDescriptor> nodes = getNodes(blk);
    return (nodes == null) ? 0 : nodes.size();
  }
        七、size()

        获取损坏数据块数量

  // 获取损坏数据块数量
  int size() {
    return corruptReplicasMap.size();
  }






        

相关文章
|
存储 缓存 分布式计算
|
存储 机器学习/深度学习 分布式计算
|
存储 调度 机器学习/深度学习
HDFS源码分析心跳汇报之数据块汇报
        在《HDFS源码分析心跳汇报之数据块增量汇报》一文中,我们详细介绍了数据块增量汇报的内容,了解到它是时间间隔更长的正常数据块汇报周期内一个smaller的数据块汇报,它负责将DataNode上数据块的变化情况及时汇报给NameNode。
1112 0
|
机器学习/深度学习
HDFS源码分析心跳汇报之整体结构
        我们知道,HDFS全称是Hadoop Distribute FileSystem,即Hadoop分布式文件系统。既然它是一个分布式文件系统,那么肯定存在很多物理节点,而这其中,就会有主从节点之分。
1237 0
|
机器学习/深度学习
HDFS源码分析心跳汇报之数据结构初始化
        在《HDFS源码分析心跳汇报之整体结构》一文中,我们详细了解了HDFS中关于心跳的整体结构,知道了BlockPoolManager、BPOfferService和BPServiceActor三者之间的关系。
1120 0
|
存储 缓存
HDFS源码分析DataXceiver之读数据块
         在《HDFS源码分析DataXceiver之整体流程》一文中我们知道,无论来自客户端还是其他数据节点的请求达到DataNode时,DataNode上的后台线程DataXceiverServer均为每个请求创建一个单独的后台工作线程来处理,这个工作线程就是DataXceiver。
1196 0
|
存储 分布式计算 Hadoop
HDFS源码分析DataXceiver之整体流程
        在《HDFS源码分析之DataXceiverServer》一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。
1538 0