Yarn questions

Some common questions about Yarn

Three Scheduling Strategies of Yarn

   ideally, the application's request for Yarn resources should be satisfied immediately, but in reality, resources are often limited, especially in a very busy cluster, an application's request often needs to wait for a period of time to reach the corresponding resources. In Yarn, the Scheduler is responsible for allocating resources to applications. There are three kinds of schedulers in Yarn: FIFO Scheduler, Capacity Scheduler and FairScheduler.

  1. FIFO Scheduler
    FIFO Scheduler arranges applications into a queue according to the order of submission. This is a first in first out queue. When allocating resources, first allocate resources to the application at the top of the queue, and then allocate resources to the next application after the application requirements at the top are met, and so on.
    FIFO Scheduler is the simplest and easiest to understand scheduler. It does not need any configuration, but it is not suitable for shared cluster. Large applications may occupy all cluster resources, which causes other applications to be blocked. In the shared cluster, it is more suitable to use Capacity Scheduler or Fair Scheduler, both of which allow large and small tasks to obtain certain system resources while submitting.

    In the figure, in the FIFO scheduler, small tasks are blocked by large tasks.

  2. Capacity Scheduler
                                 .

                         . In order to prevent the job tasks of the same user from monopolizing the resources in the queue, the scheduler will restrict the resources occupied by the job tasks submitted by the same user. When assigning new job tasks, first calculate the ratio between the number of running tasks in each queue and the amount of resources that the queue should allocate, and then select the queue with the lowest ratio. Secondly, according to the priority and time sequence of job tasks, and considering the resource and memory constraints of users, the job tasks in the queue should be sorted and executed. Multiple queues execute at the same time in the order of the task queue.
    The Capacity scheduler places the Container on the most suitable node based on the information from the NodeManager. When the workload is predictable, the Capacity scheduler works best because it helps to allocate the minimum Capacity. For the scheduler to work effectively, each queue needs to be allocated a minimum Capacity that is less than the maximum expected load. Within each queue, a hierarchical FIFO is used to schedule multiple applications, similar to the way it is used in a separate FIFO schedule.

  3. Fair Scheduler
                           . It supports multiple queues, each queue can be configured with certain resources, and the job tasks in each queue share all the resources in the queue fairly. The job tasks in the queue are allocated according to the priority. The higher the priority is, the more resources will be allocated. However, in order to ensure fairness, each job task will be allocated to the resource. Priority is determined by the difference between the ideal amount of resources obtained by each job task and the actual amount of resources obtained. The larger the difference, the higher the priority.

    As shown in the figure, when the first big job is submitted, only one job is running, and then it obtains all cluster resources; when the second small task is submitted, Fair scheduler will allocate half of the resources to the small task, so that the two tasks can share cluster resources fairly.
    Note that in the Fair scheduler, there is a certain delay from the second task submission to resource acquisition, because it needs to wait for the first task to release the occupied Container. After the small tasks are executed, they will also release the resources they occupy, and the large tasks will get all the system resources. The final effect is that Fair scheduler can not only achieve high resource utilization, but also ensure the timely completion of small tasks.
    The Fair scheduler supports preemption. You can call back the Container from the ApplicationMaster. That is to say, when a single job runs, the job obtains all cluster resources. When another job is submitted, the scheduler will return to the Container and allocate appropriate resources to the job (the number of resources to be allocated in the future depends on the size of the job, for the sake of fairness). In this process, there will be a certain delay for the later submitted tasks to obtain resources, because it is necessary to wait for the Container of the previous task to be returned by the ApplicationMaster. The difference between Fair scheduler and Capacity scheduler is that Fair scheduler does not need to occupy certain resources in advance. It dynamically adjusts system resources according to submitted jobs.

Yarn preemptive

During FairScheduler initialization, a deamon thread called updateThread will be generated:

private void initScheduler(Configuration conf) throws IOException {
    synchronized (this) {
      //ellipsis
      //Create update thread, which is responsible for monitoring the status of the queue and waiting for preemption
      updateThread = new UpdateThread();
      updateThread.setName("FairSchedulerUpdateThread");
      updateThread.setDaemon(true);
      //ellipsis
  }
 private class UpdateThread extends Thread {
    @Override
    public void run() {
      while (!Thread.currentThread().isInterrupted()) {
        try {
          Thread.sleep(updateInterval);
          long start = getClock().getTime();
          update();
          preemptTasksIfNecessary();
          long duration = getClock().getTime() - start;
          fsOpDurations.addUpdateThreadRunDuration(duration);
        } catch (InterruptedException ie) {
          LOG.warn("Update thread interrupted. Exiting.");
          return;
        } catch (Exception e) {
          LOG.error("Exception in fair scheduler UpdateThread", e);
        }
      }
    }
  }

This thread is responsible for continuously calculating and preempting the resources needed by the cluster. It occurs in the UpdateThread.preemptTasksIfNecessary() method:

  /**
 * Check all schedulers that lack resources, regardless of whether they are in minShare for more than minsharepremptiontimeout
 * Or because it has been in fairShare for more than fairSharePreemptionTimeout. After counting all the schedulers
 * After the lack of resources and summing up, we began to try to seize resources.
   */
  protected synchronized void preemptTasksIfNecessary() {
    if (!shouldAttemptPreemption()) { //Check whether the cluster allows preemption
      return;
    }
    long curTime = getClock().getTime();
    if (curTime - lastPreemptCheckTime < preemptionInterval) {
      return;//It's not time to seize it. Let's wait for the next chance
    }
    lastPreemptCheckTime = curTime;
    //The initial preemption parameter is none, that is, nothing is preempted
    Resource resToPreempt = Resources.clone(Resources.none());
    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
      //Calculate the resources that need to be preempted by all leaf queues and add them to the resource variable resToPreempt
      Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); 
    }
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
        Resources.none())) { //If the resource to be preempted is greater than Resources.none(), it is greater than 0
      preemptResources(resToPreempt);//We have calculated how many resources we need to seize, so we will start to seize them
    }
  }

Shouldatemptpreemption() is used to judge whether to attempt to preempt resources from the level of the whole cluster. If the level of the whole cluster does not meet the preemption conditions, it is not allowed to preempt

 private boolean shouldAttemptPreemption() {
    if (preemptionEnabled) {//First check whether the configuration file is open for preemption
      return (preemptionUtilizationThreshold < Math.max(
          (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
          (float) rootMetrics.getAllocatedVirtualCores() /
              clusterResource.getVirtualCores()));
    }
    return false;
  }

There are two main criteria for shouldatemptpreemption

  • Whether preemption has been enabled: that is, whether yarn.scheduler.fair.preemption is configured as true
  • Whether the overall cluster resource utilization has exceeded the configuration value of yarn.scheduler.fair.preemption.cluster-utilization-threshold

If all the above conditions are met, the relevant work of preemption can be carried out, including the calculation of resources to be preempted and preemption.
The                     . Restopreempt() method is used to calculate the size of resources that the current schedule

/**
 * Calculate the amount of resources that this queue allows to preempt other queues. If the resource used by this queue is less than its minimum resource for more than the preemption timeout, then,
 * The amount of resources that should be seized is the difference between its current fair share and its min share. If the queue resource is lower than its fair share
 * If the time exceeds fairSharePreemptionTimeout, the resource he should seize is to meet the total amount of his fair share resources.
 * If both happen, seize the more of the two.
   */
  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
    long minShareTimeout = sched.getMinSharePreemptionTimeout();//minSharePreemptionTimeout
    long fairShareTimeout = sched.getFairSharePreemptionTimeout();//fairSharePreemptionTimeout
    Resource resDueToMinShare = Resources.none();//Total resources that need to be preempted because resources are lower than minShare
    Resource resDueToFairShare = Resources.none();//Total resources that need to be preempted because resources are lower than fairShare
    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {//If the time exceeds minShare preemptiontimeout, you can judge whether the resource is lower than minShare
         //Select the smaller values of sched.getMinShare() and sched.getDemand(). The demand represents the resource demand of the queue, that is, the resource demand of the application in the waiting or running state
      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
          sched.getMinShare(), sched.getDemand());
      //Select the larger values in Resources.none (i.e. 0) and Resources.subtract(target, sched.getResourceUsage()), i.e
      //If the minimum resource demand is greater than the resource usage, the difference will be taken; otherwise, 0 will be taken, which means minShare has met the conditions and does not need to be preempted
      resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
    }

    if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {// //If the time exceeds fairSharePreemptionTimeout, you can judge whether the resource is lower than fairShare
        //Select the smaller values of sched.getFairShare() and sched.getDemand(). The demand represents the resource demand of the queue, that is, the resource required by the application in the waiting or running state
        //If 2G resources are needed, the current fairshare is 2.5G, then 2.5G is needed
      Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
          sched.getFairShare(), sched.getDemand());

      //Select the larger values in Resources.none (i.e. 0) and Resources.subtract(target, sched.getResourceUsage()), i.e
      //If the demand for fair share is greater than the resource usage, the difference will be taken; otherwise, 0 will be taken, which means minShare has met the conditions and does not need to be preempted
      //Compare 2.5G with the resources already used by the current system. If 2.5g-usedresource is less than 0, then use Resources.none(), that is, no preemption is needed
      //Otherwise, the preemptive resource is 2.5g-usedresource < 0
      resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
          Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
    }
    Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
        resDueToMinShare, resDueToFairShare);
    if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
        resToPreempt, Resources.none())) {
      String message = "Should preempt " + resToPreempt + " res for queue "
          + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
          + ", resDueToFairShare = " + resDueToFairShare;
      LOG.info(message);
    }
    return resToPreempt;
  }

According to Yarn's design, resource preemption itself is a kind of forced deprivation of resources, which will bring some system overhead. Therefore, Yarn will wait patiently for a period of time before the actual preemption occurs, so as to use the resources released by other applications directly as much as possible, and avoid the preemption as much as possible. Therefore, in FairScheduler.xml, you need to configure these two timeout times:

  • Minsharepremptiontimeout means that if the specified time is exceeded and the Scheduler has not obtained minShare's resources, it will seize them
  • fairSharePreemptionTimeout means that if the specified time is exceeded and the Scheduler has not obtained the resources of fairShare, it will preempt

When computing the resources that need to be preempted for the whole body, the resources can be preempted. In combination with Yarn's official documents about FairScheduler, it must be clear that FairScheduler internally defines different policies to decide how to preempt resources, including fair policy, drf policy and fifo policy. By default, it is fair policy.

/**
   * Based on the calculated resources that need to be preempted (toPreempt() method), resource preemption is performed. For each round of preemption, we start from the root queue,
   * Level by level, until we select a candidate application. Of course, preempt the priority.
   * According to the policy of each queue, the mode of preemption is different. For the fair policy or drf policy, the
   * fair share(The fair scheduler here refers to instant fair share)
   * The most childscheduleable is used for preemption. However, if it is fifo policy, the last application is selected for preemption
   * Seize. Of course, the same application often contains multiple containers, so the same application internal container
   * Preemption is also prioritized.
   */
  protected void preemptResources(Resource toPreempt) {
    long start = getClock().getTime();
    if (Resources.equals(toPreempt, Resources.none())) {
      return;
    }
     //warnedContainers, the warned containers, that is, containers that are considered to meet the conditions of being forcibly occupied in a previous round of preemption. Similarly, yarn finds that a container meets the rules of being preempted, which is not to preempt immediately, but to wait for a timeout, trying to let the app automatically release the container. If there is no timeout, it can be kill ed directly
    Iterator<RMContainer> warnedIter = warnedContainers.iterator();
    //To preempt represents resources that still need to be preempted
    while (warnedIter.hasNext()) {
      RMContainer container = warnedIter.next();
      if ((container.getState() == RMContainerState.RUNNING ||
              container.getState() == RMContainerState.ALLOCATED) &&
          Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
              toPreempt, Resources.none())) {
        warnOrKillContainer(container);
        Resources.subtractFrom(toPreempt, container.getContainer().getResource());//If a container is occupied, remove the resource from the toPreempt
      } else {
        warnedIter.remove();
      }
    }
    try {
      // Reset preemptedResource for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.resetPreemptedResources();
      }
      //toPreempt represents the resources that still need to be preempted at present. Through continuous circulation, one round of preemption, toPreempt gradually decreases
      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
          toPreempt, Resources.none())) { //As long as it doesn't meet the requirements of preemption
        RMContainer container =
            getQueueManager().getRootQueue().preemptContainer();
        if (container == null) {
          break;
        } else {
        //Find a container to be preempted. Also, warn or kill the container
          warnOrKillContainer(container);
          warnedContainers.add(container);
          //Recalculate remaining resources to be preempted
          Resources.subtractFrom(
              toPreempt, container.getContainer().getResource());
        }
      }
    } finally {
      // Clear preemptedResources for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.clearPreemptedResources();
      }
    }
    long duration = getClock().getTime() - start;
    fsOpDurations.addPreemptCallDuration(duration);
  }

In each round of preemption, all warnedContainers will be checked and processed through the method warnOrKillContainer.

  protected void warnOrKillContainer(RMContainer container) {
    ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
    FSAppAttempt app = getSchedulerApp(appAttemptId);
    FSLeafQueue queue = app.getQueue();
    LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
        "res=" + container.getContainer().getResource() +
        ") from queue " + queue.getName());

    Long time = app.getContainerPreemptionTime(container);

    if (time != null) {
      // if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
      // proceed with kill
      //If the container has been marked to be preempted before, and the time has exceeded the maxWaitTimeBeforeKill, then the container can be killed directly
      if (time + waitTimeBeforeKill < getClock().getTime()) {
        ContainerStatus status =
          SchedulerUtils.createPreemptedContainerStatus(
            container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);

        // TODO: Not sure if this ever actually adds this to the list of cleanup
        // containers on the RMNode (see SchedulerNode.releaseContainer()).
        completedContainer(container, status, RMContainerEventType.KILL); //Perform cleanup
        LOG.info("Killing container" + container +
            " (after waiting for premption for " +
            (getClock().getTime() - time) + "ms)");
      }
    } else {
        //Mark the container as possible to be preempted, that is, the so-called container warning. In the next round or rounds, the container will be taken out to determine whether it exceeds the maxWaitTimeBeforeKill. If it exceeds the maximum waittimebeforekill, it can be killed directly.
      // track the request in the FSAppAttempt itself
      app.addPreemption(container, getClock().getTime());
    }
  }

As you can see from the method name, there are two results:

  • Kill: if the container has been marked to be preempted before, and the time from the mark has exceeded the waitTimeBeforeKill, but it is still not actively released by its own application master (too unconscious). If so, since the warning has been given to its owner (application master) before waitTimeBeforeKill, now the FairScheduler is lost Go to patience and kill the container directly.
  • Before the death date: if the Container has been marked as preemptive before, but it is less than waitTimeBeforeKill, it will escape this time and judge next time
  • Mark and warning: if the container has never been marked for preemption, mark this time, record the time of marking, and the next time the updateThread arrives, the container will be killed or the temporary death period will not come.

The completedContainer(container, status, RMContainerEventType.KILL) is a typical state machine process. The current event is RMContainerEventType.KILL, that is, the kill event occurs. Then the resource manager's container implementation of RMContainerImpl will get the target state according to its own current state and the kill event.

   if the preempted resource of the warnedContainer is still less than to preempt, then you have to select some containers from the queue to preempt. The preemption rule is the specific Policy defined by the queue. This logic is in the code of the preemptResources() method:

 try {
      // Reset preemptedResource for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.resetPreemptedResources();
      }
      //toPreempt represents the resources that still need to be preempted at present. Through continuous circulation, one round of preemption, toPreempt gradually decreases
      while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
          toPreempt, Resources.none())) { //As long as it doesn't meet the requirements of preemption
          //Select a container to be preempted according to the Policy requirements of the specific queue
        RMContainer container =
            getQueueManager().getRootQueue().preemptContainer();  
        if (container == null) {
          break;
        } else {
          warnOrKillContainer(container);
          //Add the container to the warning list, and check whether it is released or preempted in each round. If it is not preempted or actively released after a certain period of time, it can be directly kill ed and preempted
          warnedContainers.add(container);
          Resources.subtractFrom(
              toPreempt, container.getContainer().getResource());
        }
      }
    } finally {
      // Clear preemptedResources for each app
      for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
        queue.clearPreemptedResources();
      }
    }

Starting from rmcontainer = getqueuemanager(). Getrootqueue(). Preemptcontainer(); let's see how the specific Policy decides which container to choose for preemption (execution). Select FairScheduler's default Policy FairSharePolicy for analysis. The process of analysis is actually to follow the queue tree with depth priority, and gradually traverse down until a container is found for preemption.

FSParentQueue.preemptContainer()
  /**
   * Starting from root queue, find a container that can be preempted for preemption.
   * The decision-making and traversal process is actually a recursive call process, which starts from root queue and continues
   * It is up to the subordinate queue to decide which queue, application or container to occupy its next level
   * Finally, it's up to LeafQueue to select an Application, and then Application to select an Application
   * Container
   */
  @Override
  public RMContainer preemptContainer() {
    RMContainer toBePreempted = null;

    // Find the childQueue which is most over fair share
    FSQueue candidateQueue = null;
    Comparator<Schedulable> comparator = policy.getComparator();
    //Select a queue that should be preempted from all its sub queues
    for (FSQueue queue : childQueues) {
      if (candidateQueue == null ||
          comparator.compare(queue, candidateQueue) > 0) {
        candidateQueue = queue;
      }
    }

    // Let the selected queue choose which of its container to preempt
    //After selecting a queue to be preempted, let the queue decide which container to preempt by itself, using the method of * * recursive * * call
    if (candidateQueue != null) {
      toBePreempted = candidateQueue.preemptContainer();
    }
    return toBePreempted;
  }

From the recursive way of FSParentQueue.preemptContainer(), the process of finding the preempted container starts from the root queue of the queue tree and takes the depth first way. The resource comparator used by FairSharePolicy is DefaultResourceCalculator. It's not hard to see from DefaultResourceCalculator that when comparing resource sizes, only memory is considered, and vCore is not considered. Therefore, in the case of FSLeafQueue.preemptContainer(), LeafQueue means that there are no subqueues below.

FSLeafQueue.preemptContainer()
@Override
  public RMContainer preemptContainer() {
    RMContainer toBePreempted = null;

    // If this queue is not over its fair share, reject
    if (!preemptContainerPreCheck()) {
      return toBePreempted;
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("Queue " + getName() + " is going to preempt a container " +
          "from its applications.");
    }

    // Choose the app that is most over fair share
    Comparator<Schedulable> comparator = policy.getComparator();
    FSAppAttempt candidateSched = null;
    readLock.lock();
    try {
      //From all applications in the leaf queue, select an application that should be more dominated
      //If the default Policy FairSharePolicy is used, the selection criteria is the current resource of the Application
      //The more abundant resources are, the more likely they are to be selected
      for (FSAppAttempt sched : runnableApps) {
        if (candidateSched == null ||
            comparator.compare(sched, candidateSched) > 0) {
          candidateSched = sched;
        }
      }
    } finally {
      readLock.unlock();
    }

    // Preempt from the selected app
    if (candidateSched != null) {
      //Because it is a leaf queue, candidateSched must be an APP, that is, FSAppAttempt object
      toBePreempted = candidateSched.preemptContainer();
    }
    return toBePreempted;
  }

The preemption logic of FSLeafQueue is almost the same as that of FSParentQueue, which is depth first traversal through recursive traversal. The only difference is that the child of FSParentQueue is FSParentQueue or FSLeafQueue, while the child of FSLeafQueue is fsappattemptp.

FSAppAttempt.preemptContainer()
  /**
   * According to the priority, select a container from all containers of the application to be preempted
   */
  @Override
  public RMContainer preemptContainer() {
    //ellipsis
    RMContainer toBePreempted = null;
    //Get all your running container s
    for (RMContainer container : getLiveContainers()) {
    //Use the comparator RMContainerComparator to select a container that should be preempted most
      if (!getPreemptionContainers().contains(container) &&
          (toBePreempted == null ||
              comparator.compare(toBePreempted, container) > 0)) {
        toBePreempted = container;
      }
    }
    return toBePreempted;
  }
}

FSAppAttempt is used to decide which container to take out and be preempted. The comparator RMContainerComparator is used. The comparator code is simple and posted:

  static class RMContainerComparator implements Comparator<RMContainer>,
      Serializable {
    @Override
    public int compare(RMContainer c1, RMContainer c2) {
      int ret = c1.getContainer().getPriority().compareTo(
          c2.getContainer().getPriority());
      if (ret == 0) {
        return c2.getContainerId().compareTo(c1.getContainerId());
      }
      return ret;
    }
  }

It can be seen that the rule is to compare the priority. Select a container with a lower priority. If the priority is the same, compare the container id and select a container with a smaller id.
   here you can see the subtlety of the design of the Yarn queue. Whether it is ParentQueue, leaf Queue, or Application, although they are at different level s of a tree, their nature is the same, and they are all abstracted as scheduleable, so they all need to implement the preemptContainer() method. When deciding which container is preempted, it can be done recursively. ParentQueue is handed over to the following leaf Queue or ParentQueue for decision, while LeafQueue is handed over to the following Application for decision. Appliion determines which container is preempted according to the priority of the container.
    from the overall logic of the above code, we can see that when yarn performs resource preemption, when calculating how many resources need to be preempted, it is calculated from the scope of the whole yarn cluster, rather than for a single application to meet its resources.

112 original articles published, 20 praised, 70000 visitors+
Private letter follow

Tags: less NodeManager IE xml

Posted on Sat, 01 Feb 2020 23:47:07 -0800 by ShadowX