HBase RegionServer挂掉后的源码分析

编程技术  /  houtizong 发布于 3年前   158
首先肯定是HMaster通过ZK发现某RS挂掉了,HMaster使用private ServerManager serverManager来处理rs的信息:
  public synchronized void expireServer(final HServerInfo hsi) {    // 首先从onlineServers中获取HServerInfo    // <hostname> , <port> , <startcode>    String serverName = hsi.getServerName();    HServerInfo info = this.onlineServers .get(serverName);    //假如info为空,则抛出server不在线的警告并且返回    if (info == null) {      LOG.warn( "Received expiration of " + hsi.getServerName() +        " but server is not currently online" );      return;    }    //这个作者都怀疑是否会发生这样的情况了,我也想不出什么情况下会发生满足一下if条件的情景    if (this. deadservers.contains(serverName)) {      // TODO : Can this happen?  It shouldn't be online in this case?      LOG.warn( "Received expiration of " + hsi.getServerName() +          " but server shutdown is already in progress" );      return;    }    // 已挂列表中add,在线列表中remove    this. deadservers.add(serverName);    this. onlineServers.remove(serverName);    this. serverConnections.remove(serverName);       //如果集群正在关闭,则不处理, 直接return了    if (this. clusterShutdown) {      LOG.info( "Cluster shutdown set; " + hsi.getServerName() +        " expired; onlineServers=" + this .onlineServers .size());      if (this. onlineServers.isEmpty()) {        master.stop( "Cluster shutdown set; onlineServer=0" );      }      return;    }    //通过CatalogTracker判断挂掉的rs中是否还有root和meta,注意CatalogTracker是只读的    CatalogTracker ct = this.master .getCatalogTracker();    // Was this server carrying root?    boolean carryingRoot;    try {      HServerAddress address = ct.getRootLocation();      //看看挂掉的rs是否带有root信息      carryingRoot = address != null && hsi.getServerAddress().equals(address);    } catch (InterruptedException e) {      Thread.currentThread().interrupt();      LOG.info( "Interrupted");      return;    }        HServerAddress address = ct.getMetaLocation();        //看看挂掉的rs中是否带有meta信息    boolean carryingMeta = address != null && hsi.getServerAddress().equals(address);         // MetaServerShutdownHandler是继承ServerShutdownHandler, 前者在恢复时需要assign root或者 assign meta    if (carryingRoot || carryingMeta) {      this. services. getExecutorService().submit(new MetaServerShutdownHandler(this .master ,        this.services , this.deadservers, info, carryingRoot, carryingMeta));    } else {      this. services. getExecutorService().submit(new ServerShutdownHandler(this .master ,        this.services , this.deadservers, info));    }    LOG.debug( "Added=" + serverName +      " to dead servers, submitted shutdown handler to be executed, root=" +        carryingRoot + ", meta=" + carryingMeta);  }


接下来走到ServerShutdownHandler, 继承于 EventHandler, 最终走到process方法
  public void process() throws IOException {    final String serverName = this.hsi .getServerName();    LOG.info( "Splitting logs for " + serverName);    try {      // split log, split出的log跟着region走 split log在每个版本中的做法相差很大,92中就引入了分布式处理的方式, 看下几段的代码      this. services.getMasterFileSystem().splitLog(serverName);         // 这部分说到底就是返回rsi,当然里面涉及到了regionPlans,deadRegions之类的东西,都是与传进去的参数hsi有关的      List<RegionState> regionsInTransition =        this.services .getAssignmentManager().processServerShutdown(this. hsi);         // 假如有root需要恢复,那就先分配root,分配之前还需要检查一下root的location是不是已经确定      if (isCarryingRoot()) { // -ROOT-        LOG.info("Server " + serverName + " was carrying ROOT. Trying to assign.");        verifyAndAssignRootWithRetries();      }         // 假如有meta需要恢复,那就先分配meta,随机分配rs      if (isCarryingMeta()) {        LOG.info("Server " + serverName + " was carrying META. Trying to assign.");        this.services .getAssignmentManager().assignMeta();      }         // 需要等到所有持有meta的region都open好以后才能进行下面的工作      NavigableMap<HRegionInfo, Result> hris = null;      while (! this.server .isStopped()) {        try {          this.server .getCatalogTracker().waitForMeta();          hris = MetaReader. getServerUserRegions(this. server.getCatalogTracker(),              this.hsi );          break;        } catch (InterruptedException e) {          Thread. currentThread().interrupt();          throw new IOException("Interrupted", e);        } catch (IOException ioe) {          LOG.info("Received exception accessing META during server shutdown of " +              serverName + ", retrying META read", ioe);        }      }         // Skip regions that were in transition unless CLOSING or PENDING_CLOSE      for (RegionState rit : regionsInTransition) {        if (!rit.isClosing() && !rit.isPendingClose()) {          LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() +            " from list of regions to assign because in RIT" );          hris.remove(rit.getRegion());        }      }         LOG.info( "Reassigning " + (hris == null? 0: hris.size()) +        " region(s) that " + serverName +        " was carrying (skipping " + regionsInTransition.size() +        " regions(s) that are already in transition)" );         // 开始分配region,分配前先检查region所属的表是否正在被disable,或者集群是否正在被关闭      // 接下来将region的状态强制设为offline,最后的assign请参看下面对这个方法的分析      for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {        if (processDeadRegion(e.getKey(), e.getValue(),            this.services .getAssignmentManager(),            this.server .getCatalogTracker())) {          this.services .getAssignmentManager().assign(e.getKey(), true);        }      }    } finally {      this. deadServers.finish(serverName);    }       LOG.info( "Finished processing of shutdown of " + serverName);  }


split log的相关代码:
 public void splitLog(final String serverName ) {    // 首先获取锁    this. splitLogLock.lock();    long splitTime = 0, splitLogSize = 0;    // 在hdfs中获取该rs的WAL位置 一般在/hbase/.logs文件夹中    Path logDir = new Path( this.rootdir , HLog.getHLogDirectoryName (serverName));    try {      //创建splitter对象负责split工作      HLogSplitter splitter = HLogSplitter. createLogSplitter(        conf, rootdir, logDir, oldLogDir, this .fs );      try {        //假如hdfs处于安全模式,那么阻塞知道文件系统退出安全模式        FSUtils. waitOnSafeMode(conf,          conf.getInt(HConstants. THREAD_WAKE_FREQUENCY, 1000));         //进行split        splitter.splitLog();      } catch (OrphanHLogAfterSplitException e) {        LOG.warn("Retrying splitting because of:" , e);        //发生异常后重新获得实例,再试一遍        splitter = HLogSplitter. createLogSplitter(conf, rootdir, logDir,  oldLogDir, this .fs );        splitter.splitLog();      }      //下面两个参数用来统计      splitTime = splitter.getTime();      splitLogSize = splitter.getSize();    } catch (IOException e) {      checkFileSystem();      LOG.error( "Failed splitting " + logDir.toString(), e);    } finally {      this. splitLogLock.unlock();    }    if (this. metrics != null) {      this. metrics.addSplit(splitTime, splitLogSize);    }  }


 //这边大多数就做些检查的工作  public List<Path> splitLog() throws IOException {    Preconditions.checkState(!hasSplit ,        "An HLogSplitter instance may only be used once" );       hasSplit = true;    long startTime = System. currentTimeMillis();    List<Path> splits = null;    //检查是否存在存放log的目录,假如不存在的话,直接返回    if (! fs.exists( srcDir)) {      // Nothing to do      return splits;    }    FileStatus[] logfiles = fs.listStatus( srcDir);    //检查目录下的文件是否有问题    if (logfiles == null || logfiles.length == 0) {      // Nothing to do      return splits;    }    LOG.info( "Splitting " + logfiles.length + " hlog(s) in "        + srcDir.toString());    //执行split动作    splits = splitLog(logfiles);       splitTime = System. currentTimeMillis() - startTime;    LOG.info( "hlog file splitting completed in " + splitTime + " ms for " + srcDir .toString());    return splits;  }


 private List<Path> splitLog(final FileStatus[] logfiles) throws IOException {    //以下两个列表分别维护了已经处理成功的log及已经损坏的log    List<Path> processedLogs = new ArrayList<Path>();    List<Path> corruptedLogs = new ArrayList<Path>();    List<Path> splits = null;       //这里是配置遇到错误怎样处理,默认为true,会把那些读的时候出问题的log转到.corrupt目录下    //假如设为false,那么只要在split过程中出现问题,就直接抛出IOException, 整个split过程立即终止    boolean skipErrors = conf.getBoolean( "hbase.hlog.split.skip.errors" , true);    splitSize = 0;    //启动写线程    outputSink.startWriterThreads( entryBuffers);       try {      int i = 0;      //遍历需要处理的文件      for (FileStatus log : logfiles) {       Path logPath = log.getPath();        long logLength = log.getLen();        splitSize += logLength;        LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles. length            + ": " + logPath + ", length=" + logLength);        try {          recoverFileLease(fs, logPath, conf);                    //解析Log,将其放进entryBuffer,里面涉及到了HLogKey,它里面存储了region名和表名          //还有一个Entry类,它封装了WALEDIT和HLogKey, 到最后就放到了一个map结构中          //key和value分别为一些列的region及相应的edits          parseHLog(log, entryBuffers, fs , conf );          //将处理完的收集起来          processedLogs.add(logPath);        } catch (EOFException eof) {          // truncated files are expected if a RS crashes (see HBASE-2643)          LOG.info("EOF from hlog " + logPath + ". Continuing");          processedLogs.add(logPath);        } catch (FileNotFoundException fnfe) {          // A file may be missing if the region server was able to archive it          // before shutting down. This means the edits were persisted already          LOG.info("A log was missing " + logPath +              ", probably because it was moved by the" +              " now dead region server. Continuing" );          processedLogs.add(logPath);        } catch (IOException e) {          // If the IOE resulted from bad file format,          // then this problem is idempotent and retrying won't help          if (e.getCause() instanceof ParseException) {            LOG.warn("Parse exception from hlog " + logPath + ".  continuing", e);            processedLogs.add(logPath);          } else {            if (skipErrors) {              LOG.info("Got while parsing hlog " + logPath +                ". Marking as corrupted", e);              corruptedLogs.add(logPath);            } else {              throw e;            }          }        }      }      if (fs.listStatus( srcDir). length > processedLogs.size()          + corruptedLogs.size()) {        throw new OrphanHLogAfterSplitException(            "Discovered orphan hlog after split. Maybe the "            + "HRegionServer was not dead when we started" );      }      //就做一些移动文件的事情      //归档logfiles,把有损坏及跳过的log放到.corrupt文件中,再把处理好的放到.oldlogs中,最后把./logs下面那个挂掉的rs给删除了      archiveLogs(srcDir , corruptedLogs, processedLogs, oldLogDir, fs, conf);         } finally {      //等待线程执行结束      splits = outputSink.finishWritingAndClose();    }    return splits;  }


分配region的时候:
 private void assign(final RegionState state, final boolean setOfflineInZK, final boolean forceNewPlan) {    for ( int i = 0; i < this.maximumAssignmentAttempts; i++) {      if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;      //master挂了,还搞个啥?直接退出      if (this. master.isStopped()) {        LOG.debug("Server stopped; skipping assign of " + state);        return;      }      // 获取RegionPlan,注意到LoadBalancer.randomAssignment(servers),即把这个region随机分配到server上      RegionPlan plan = getRegionPlan(state, forceNewPlan);      // 假如获取plan失败,那就退出      if (plan == null) {        debugLog(state.getRegion(),            "Unable to determine a plan to assign " + state);        return; // Should get reassigned later when RIT times out.      }      try {        debugLog(state.getRegion(),          "Assigning region " + state.getRegion().getRegionNameAsString() +          " to " + plan.getDestination().getServerName());        // 将region的状态设置为PENDING_OPEN        state.update(RegionState.State. PENDING_OPEN);        // 向rs发送请求,要求其open        serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());        break;      } catch (Throwable t) {        LOG.warn("Failed assignment of " +          state.getRegion().getRegionNameAsString() + " to " +          plan.getDestination() + ", trying to assign elsewhere instead; " +          "retry=" + i, t);        // Clean out plan we failed execute and one that doesn't look like it'll        // succeed anyways; we need a new plan!        // Transition back to OFFLINE        state.update(RegionState.State. OFFLINE);        // Force a new plan and reassign.  Will return n        // ull if no servers.        if (getRegionPlan(state, plan.getDestination(), true) == null) {          LOG.warn("Unable to find a viable location to assign region " +            state.getRegion().getRegionNameAsString());          return;        }      }    }  }


最后,regionserver会执行openRegionHandler来打开这个region, 这是一个异步的过程。这里面其实还涉及到了region在zk中的状态变化,包括在打开region之前的HLog恢复,恢复过程和正常的向HBase写数据一样,都会先写到memstore中,最后需要更新meta表。
当然了,如果region带有meta或者root信息,那处理方式也会变得不太一样。
   @Override  @QosPriority(priority= HIGH_QOS )  public void openRegion(HRegionInfo region)  throws IOException {    if ( this. regionsInTransitionInRS .contains(region.getEncodedNameAsBytes())) {      throw new RegionAlreadyInTransitionException("open" , region.getEncodedName());    }    LOG.info( "Received request to open region: " +      region.getRegionNameAsString());    if ( this. stopped) throw new RegionServerStoppedException();    this. regionsInTransitionInRS .add(region.getEncodedNameAsBytes());    //会因为region的类型不同而进入不一样的分支    if (region.isRootRegion()) {      this. service.submit( new OpenRootHandler( this, this, region));    } else if (region.isMetaRegion()) {      this. service.submit( new OpenMetaHandler( this, this, region));    } else {      this. service.submit( new OpenRegionHandler( this , this , region));    }  }

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客