A Cluster Load Scoring Method for HBase Load Balancing

HMater is responsible for homogenizing regions into each region server. One of the threaded tasks in the hmaster is dedicated to balancing and is executed every five minutes by default.

Each load balancing operation can be divided into two steps:

  • Generating Load Balancing Schedule
  • Assignment Manager class execution schedule

Let's go into details about how HBase generates load balancing schedules.
First of all, we need to make it clear that load balancing is based on each Table. In the following cases, load balancing will not be performed:

  1. If the master is not initialized
  2. There are already load balancing methods running.
  3. Currently there are region s in splitting state
  4. There are suspended region server s in the current cluster

Generate RegionPlan tables:
Code package path:
org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer

The regionPlan table is generated by:

StochasticLoadBalancer. balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)

This method is particularly interesting. First, Stochastic Load Balancer has a set of algorithms to calculate cluster load (cluster load) score under a table. The lower the value, the more reasonable the load is. The algorithm is based on the following dimensions:

  • Region Load // Number of Regions per regin server
  • Table Load
  • Data Locality // Data Locality
  • Memstore Sizes//memstore Sizes
  • Storefile Sizes//Size of Stored Files

Firstly, the score x (0<=x<=1) of a single region server is calculated according to the five dimensions above. Then, the cluster load score of the current table is added up to all the region servers under the same table. The lower the score, the more reasonable it is.

Then there are three ways to adjust cluster load:

  • RandomRegionPicker
  • LoadPicker
  • LocalityPicker

Random RegionPicker random exchange strategy. In virtual clustering (virtual clustering is used only as a record and does not involve actual region migration operations). Cluster contains all the relevant information of region servers under a table and regions under region server.) Randomly select two region servers, then randomly acquire one region in the region server, and then exchange the regions under the two regions servers, and then calculate the score, if the score is obtained. At a lower level, this indicates that the two regional exchanges are beneficial to the load balance of the cluster, and retain this change. Otherwise, restore to the previous state and swap the region server between the two regions. A region server with fewer regions may randomly create an empty one. In fact, it becomes a migration region instead of a swap region.

LoadPicker, region Number Balancing Strategy. In a virtual cluster, the two regions servers with the largest and the smallest number of regions are obtained first, which can make the final number of regions servers more average. The process behind is the same as that above.

Locality Picker, the most local equilibrium strategy. Locality means that the underlying data of Hbase is actually stored on HDFS. If the proportion of data files stored in a region server is higher than that of other regions servers, the region server is the highest local region server of the region. In this strategy, a region server and the following regions are randomly selected. Then find the region server with the highest locality. The region server with the highest locality randomly generates another region server. The processes behind the two regional servers are the same as those above.

The specific process is as follows:

We analyze the main steps in the above flowchart:
0. Whether load balancing is required - based on the number of regions currently owned by the region server, see the following code.

protected boolean needsBalance(ClusterLoadState cs) {
    ...
    float average = cs.getLoadAverage(); // for logging Obtain cluster in region server Average owned region Number
    int floor = (int) Math.floor(average * (1 - slop));//slop defaults to0.2,Minimum acceptable range
    int ceiling = (int) Math.ceil(average * (1 + slop));//Maximum
    if (!(cs.getMaxLoad() > ceiling || cs.getMinLoad() < floor)) {//If the maximum and minimum region server s of a cluster are not within the scope, returning false indicates that a load balancing algorithm is needed.
       ...
      return false;
    }
    return true;
}

1. Calculate the score of the current cluster. Simply put, in each dimension, the cost value of the region server is calculated, and finally the total score is added up according to (weight * cost value). The smaller the score, the more balanced it is, the smaller the difference between each region server. This cost value is derived from cluster (maximum difference /(current difference - Minimum difference).

/* Calculate the total score of cluster*/

  protected double computeCost(Cluster cluster, double previousCost) {
    double total = 0;for (CostFunction c:costFunctions) {//CostFunction calculates scores based on a dimension, and the implementation of costFunctions is shown in the following code.
      if (c.getMultiplier() <= 0) {//Multier is the weight.
        continue;
      }
      total += c.getMultiplier() * c.cost(cluster);//Weight * Rating of the current dimension
      if (total > previousCost) {
        return total;
      }
    }
    return total;
  }

//CosttFunctions initialization
    regionLoadFunctions = new CostFromRegionLoadFunction[] {
      new ReadRequestCostFunction(conf),//Reading Request Dimension Score
      new WriteRequestCostFunction(conf),//Write Request Dimension Score
      new MemstoreSizeCostFunction(conf),//memstore Size Dimension Score
      new StoreFileCostFunction(conf)//StoreFile dimension score
    };

    costFunctions = new CostFunction[]{
      new RegionCountSkewCostFunction(conf),//Regional Number Dimension Score
      new MoveCostFunction(conf),//Migration region dimension score
      localityCost,//Local Relevant Dimension Score
      new TableSkewCostFunction(conf), //Table Dimension Score
      regionLoadFunctions[0],
      regionLoadFunctions[1],
      regionLoadFunctions[2],
      regionLoadFunctions[3],
    };
//Take RegionCountSkewCostFunction as an example:

public static class RegionCountSkewCostFunction extends CostFunction {
    private static final String REGION_COUNT_SKEW_COST_KEY =
        "hbase.master.balancer.stochastic.regionCountCost";
    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;//The default weight is 500
    private double[] stats = null;
    RegionCountSkewCostFunction(Configuration conf) {
      super(conf);
      // Load multiplier should be the greatest as it is the most general way to balance data.
      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));//Setting Weights
    }

    @Override
    double cost(Cluster cluster) {
      if (stats == null || stats.length != cluster.numServers) {
        stats = new double[cluster.numServers];
      }
      for (int i =0; i < cluster.numServers; i++) {
        stats[i] = cluster.regionsPerServer[i].length;//The current dimension is scored according to the number of regions per region server.
      }
      return costFromArray(stats);
    }
  }

    protected double costFromArray(double[] stats) {//Scores calculated by each region server based on a dimension
      double totalCost = 0;
      double total = getSum(stats);//final scoring
      double mean = total/((double)stats.length);//Get the average score for each region server
      double count = stats.length;//Total number of regional servers
      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
      // a zero sum cost for this to make sense.
      //Let's assume that the worst case scenario is that the region server score of count-1 is zero, and the remaining region server occupies all the scores, that is, the load is very unbalanced, and all the pressure is on the same region server. The maximum difference max is calculated.

      double max = ((count - 1) * mean) + (total - mean);
      for (double n : stats) {//Calculate the current difference
        double diff = Math.abs(mean - n);
        totalCost += diff;
      }

      double scaled =  scale(0, max, totalCost);//(Maximum Difference/(Current Difference-Minimum Difference)
      return scaled;
    }

2. Set the number of cycles - related to the total number of cluster region server s and the total number of regions. The maximum mapSteps is 1000000.

long computedMaxSteps = Math.min( this .maxSteps, (( long )cluster.numRegions * ( long ) this .stepsPerRegion * ( long )cluster.numServers));

Random RegionPicker, Load Picker, Locality Picker swap or migrate once and then calculate the score. If the score is lower than before, it will be restored.

// Keep track of servers to iterate through them.
Cluster cluster = new Cluster(clusterState, loads, regionFinder);
//Calculate the current overhead    
double currentCost = computeCost(cluster, Double.MAX_VALUE);
double initCost = currentCost;
double newCost = currentCost;
 for (step = 0; step < computedMaxSteps; step++) {
         //Random selection of a "number selector"
         int pickerIdx = RANDOM.nextInt(pickers.length);
         RegionPicker p = pickers[pickerIdx];
         //A pair of < server, region > pairs are randomly jumped out of the cluster with a selector.
         Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
         int leftServer = picks.getFirst().getFirst();
         int leftRegion = picks.getFirst().getSecond();
         int rightServer = picks.getSecond().getFirst();
         int rightRegion = picks.getSecond().getSecond();
         cluster.moveOrSwapRegion(leftServer,
              rightServer,
              leftRegion,
              rightRegion);
        //After moving or switching, see if the new overhead continues
         newCost = computeCost(cluster, currentCost);
         // Should this be kept?
         if (newCost < currentCost) {
            currentCost = newCost;
         } else {
   //If the operation is not cost-effective, it will go back.
           cluster.moveOrSwapRegion(leftServer,
                rightServer,
                rightRegion,
                leftRegion);
       }
     if (initCost > currentCost) {
         //A satisfactory balance scheme has been found.
         List<RegionPlan> plans = createRegionPlans(cluster);
         return plans;
    }

The 7,8,9 loop continues until the end, and the output region plan is handed over to assignment manager to actually perform the migration region operation. The format of the regionPlan is as follows:

RegionPlan rp = new RegionPlan(region, initialServer, newServer); //The region of initial Server needs to be migrated to new Server

At this point, the load balancing algorithm is over. In version Hbase 0.94, the default load balancing algorithm is using SimpleLoadBalancer class. The main idea of balanceCluster is that the average number of regions per region server is relatively single in dimension. In version 0.96, StochasticLoadBalancer takes more dimensions into account. In order to implement the default load balancing algorithm. https://issues.apache.org/jira/browse/HBASE-5959 This patch comment can see the submission process of Stochastic Load Balancer.

Tags: HBase Apache Hadoop Load Balance

Posted on Sat, 13 Jul 2019 15:15:04 -0700 by phpnewbie8