Talk about Client Producer Credits for artemis

order

This article mainly studies Client Producer Credits of artemis

ClientProducerCredits

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCredits.java

public interface ClientProducerCredits {

   void acquireCredits(int credits) throws ActiveMQException;

   void receiveCredits(int credits);

   void receiveFailCredits(int credits);

   boolean isBlocked();

   void init(SessionContext sessionContext);

   void reset();

   void close();

   void incrementRefCount();

   int decrementRefCount();

   void releaseOutstanding();

   SimpleString getAddress();
}
  • The ClientProducerCredits interface defines the acquireCredits, receiveCredits, receiveFailCredits, isBlocked, init, reset, close, incrementRefCount, decrementRefCount, releaseOutstanding, getAddress methods

AbstractProducerCreditsImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/AbstractProducerCreditsImpl.java

public abstract class AbstractProducerCreditsImpl implements ClientProducerCredits {

   protected int pendingCredits;

   private final int windowSize;

   protected volatile boolean closed;

   protected boolean blocked;

   protected final SimpleString address;

   private final ClientSessionInternal session;

   protected int arriving;

   private int refCount;

   protected boolean serverRespondedWithFail;

   protected SessionContext sessionContext;

   public AbstractProducerCreditsImpl(final ClientSessionInternal session,
                                      final SimpleString address,
                                      final int windowSize) {
      this.session = session;

      this.address = address;

      this.windowSize = windowSize / 2;
   }

   @Override
   public SimpleString getAddress() {
      return address;
   }

   @Override
   public void init(SessionContext sessionContext) {
      // We initial request twice as many credits as we request in subsequent requests
      // This allows the producer to keep sending as more arrive, minimising pauses
      checkCredits(windowSize);

      this.sessionContext = sessionContext;

      this.sessionContext.linkFlowControl(address, this);
   }

   @Override
   public void acquireCredits(final int credits) throws ActiveMQException {
      checkCredits(credits);

      actualAcquire(credits);

      afterAcquired(credits);

   }

   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
      // check to see if the blocking mode is FAIL on the server
      synchronized (this) {
         pendingCredits -= credits;
      }
   }

   protected abstract void actualAcquire(int credits);

   @Override
   public boolean isBlocked() {
      return blocked;
   }

   @Override
   public void receiveFailCredits(final int credits) {
      serverRespondedWithFail = true;
      // receive credits like normal to keep the sender from blocking
      receiveCredits(credits);
   }


   @Override
   public void receiveCredits(final int credits) {
      synchronized (this) {
         arriving -= credits;
      }
   }


   @Override
   public synchronized void reset() {
      // Any pendingCredits credits from before failover won't arrive, so we re-initialise

      int beforeFailure = pendingCredits;

      pendingCredits = 0;
      arriving = 0;

      // If we are waiting for more credits than what's configured, then we need to use what we tried before
      // otherwise the client may starve as the credit will never arrive
      checkCredits(Math.max(windowSize * 2, beforeFailure));
   }

   @Override
   public void close() {
      // Closing a producer that is blocking should make it return
      closed = true;
   }

   @Override
   public synchronized void incrementRefCount() {
      refCount++;
   }

   @Override
   public synchronized int decrementRefCount() {
      return --refCount;
   }

   public abstract int getBalance();

   protected void checkCredits(final int credits) {
      int needed = Math.max(credits, windowSize);

      int toRequest = -1;

      synchronized (this) {
         if (getBalance() + arriving < needed) {
            toRequest = needed - arriving;

            pendingCredits += toRequest;
            arriving += toRequest;
         }
      }

      if (toRequest != -1) {
         requestCredits(toRequest);
      }
   }

   private void requestCredits(final int credits) {
      session.sendProducerCreditsMessage(credits, address);
   }
}
  • AbstractProducerCreditsImpl implements a partial method of ClientProducerCredits; its constructor receives session, address, windowSize parameters; its init method first executes checkCredits, then sessionContext.linkFlowControl(address, this)
  • The acquireCredits method first executes checkCredits, executes the actualAcquire and afterAcquired methods; the afterAcquired method subtracts credits from pendingCredits; the receiveFailCredits method sets serverRespondedWithFail to true, and then executes the receiveCredits method; the receiveCredits method subtracts credits from arriving; the reset method resets pendingCredits, arriving, and thenExecute checkCredits method
  • The checkCredits method first evaluates needed, then toRequest updates pendingCredits and arriving; when toRequest is not -1, the requestCredits method is executed; the requestCredits method is session.sendProducerCreditsMessage(credits, address) method

ClientProducerCreditsImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerCreditsImpl.java

public class ClientProducerCreditsImpl extends AbstractProducerCreditsImpl {


   private final Semaphore semaphore;

   public ClientProducerCreditsImpl(ClientSessionInternal session, SimpleString address, int windowSize) {
      super(session, address, windowSize);


      // Doesn't need to be fair since session is single threaded
      semaphore = new Semaphore(0, false);

   }


   @Override
   protected void afterAcquired(int credits) throws ActiveMQAddressFullException {
      // check to see if the blocking mode is FAIL on the server
      synchronized (this) {
         super.afterAcquired(credits);

         if (serverRespondedWithFail) {
            serverRespondedWithFail = false;

            // remove existing credits to force the client to ask the server for more on the next send
            semaphore.drainPermits();
            pendingCredits = 0;
            arriving = 0;

            throw ActiveMQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits);
         }
      }
   }

   @Override
   protected void actualAcquire(int credits) {

      boolean tryAcquire;
      synchronized (this) {
         tryAcquire = semaphore.tryAcquire(credits);
      }

      if (!tryAcquire && !closed) {
         this.blocked = true;
         try {
            while (!semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS)) {
               // I'm using string concatenation here in case address is null
               // better getting a "null" string than a NPE
               ActiveMQClientLogger.LOGGER.outOfCreditOnFlowControl("" + address);
            }
         } catch (InterruptedException interrupted) {
            Thread.currentThread().interrupt();
            throw new ActiveMQInterruptedException(interrupted);
         } finally {
            this.blocked = false;
         }
      }
   }


   @Override
   public synchronized void reset() {
      // Any pendingCredits credits from before failover won't arrive, so we re-initialise

      semaphore.drainPermits();

      super.reset();
   }


   @Override
   public void close() {
      super.close();

      // Closing a producer that is blocking should make it return
      semaphore.release(Integer.MAX_VALUE / 2);
   }

   @Override
   public void receiveCredits(final int credits) {
      synchronized (this) {
         super.receiveCredits(credits);
      }

      semaphore.release(credits);
   }


   @Override
   public synchronized void releaseOutstanding() {
      semaphore.drainPermits();
   }

   @Override
   public int getBalance() {
      return semaphore.availablePermits();
   }


}
  • ClientProducerCreditsImpl inherits AbstractProducerCreditsImpl, and its constructor creates permits of 0 and fair s of false; the afterAcquired method executes semaphore.drainPermits() when serverRespondedWithFail is true and throws ActiveMQClientMessageBundle.BUNDLE.addressIsFull(address.toString(), credits)
  • The actualAcquire method first executes semaphore.tryAcquire(credits), sets blocked to true for false, then while s through semaphore.tryAcquire(credits, 10, TimeUnit.SECONDS) until true, and finally finally sets blocked to false for final
  • The receiveCredits method mainly executes semaphore.release(credits); the getBalance method returns semaphore.availablePermits(); the release Outstanding executes semaphore.release(credits); the reset method executes semaphore.drainPermits(); and the close method executes semaphore.release (Integer.MAX_VALUE/2)

Summary

  • AbstractProducerCreditsImpl implements a partial method of ClientProducerCredits; its constructor receives session, address, windowSize parameters; its init method first executes checkCredits, then sessionContext.linkFlowControl(address, this)
  • The acquireCredits method first executes checkCredits, executes the actualAcquire and afterAcquired methods; the afterAcquired method subtracts credits from pendingCredits; the receiveFailCredits method sets serverRespondedWithFail to true, and then executes the receiveCredits method; the receiveCredits method subtracts credits from arriving; the reset method resets pendingCredits, arriving, and thenExecute checkCredits method
  • The checkCredits method first evaluates needed, then toRequest updates pendingCredits and arriving; when toRequest is not -1, the requestCredits method is executed; the requestCredits method is session.sendProducerCreditsMessage(credits, address) method

doc

Tags: Programming Session Java Apache

Posted on Tue, 14 Jan 2020 08:26:03 -0800 by Jabop