Redisson Source Parsing, How to Use Redis to Implement Distributed Relockable

When I first started using Redisson's api, I thought wow, this API is so awesome that it even has distributed re-entrainable locks. Just recently, I studied Redisson's source code and shared it with you

Preface

First, let's review how ReentrantLock is implemented in Java.

Here I would like to briefly introduce the idea of ReentrantLock implementation.

  • Lock identification: Using the AQS state variable as lock identification, using Java's CAS to ensure thread security when multiple threads compete for locks
  • Queue: Threads that are not competing for a lock enter the AQS queue and hang up, awakening (or timed out) while waiting to be unlocked

How to Design Distributed Reentrant Locks

First lock identification, which is easy to implement in Redis, can use lock name as key, the current thread generates a uuid as value, plus the Redis single-threaded model, to achieve thread-safe lock competition

This is also mentioned in the previous blog, you can refer to Correct implementation of Redis distributed locks

But how do you make a queue based on Redis that suspends wake-up threads like Java?That's what I never thought before I looked at the source code...

So how does Redisson do it?

Answer: Take advantage of Redis's publishing subscription, plus Java's Smarphore (semaphores, little buddies who don't know Semaphore can go to Google for a moment)

Redisson Distributed Lock Implementation

Lock Identity: Hash data structure, key is the name of the lock, filed "Unique Identity" of the current competing lock successful thread, number of value reentrances

Queue: All threads that fail to compete for a lock subscribe to the unlock event for the current lock and use Semaphore to suspend and wake the thread

Source Code Analysis

Let's look at the source code for the tryLock method

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // Attempting to acquire a lock, returning null indicates successful acquisition, and returning the release time of the current lock if acquisition fails
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        // Failed to acquire lock if waiting time is exceeded at this time
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        // Subscription Unlock Event
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // Wait for the subscription to succeed and wake up the current thread after success
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // Judge again whether the time-out has expired
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                // Attempting to acquire a lock
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    // Waiting for unlock message, here using Semaphore, permits=0 when lock is not released, thread is suspended
                    // When publishing an unlock message, release() permits=1 for the current Semaphore object
                    // All clients will have a thread awakened to try to compete for a lock
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

tryAcquire(leaseTime, unit, threadId); this method we will analyze below, and now we just need to know that this method is used to acquire locks

By this time we can clear up Redisson's re-entrainable mind

  1. Acquire locks
  2. Subscribe to unlock events if lock acquisition fails
  3. Then there is an infinite loop
while(true) {
  // Attempting to acquire a lock

  // Determine if timeout occurs

  // Waiting to unlock message release semaphore 
  //(At this point, each Java client may have multiple threads suspended, but only one will be awakened)

  // Determine if timeout occurs
}

By using semaphores, we can reasonably control thread's competition for locks, and rationally utilize system resources, so to speak, the grey Ness

Note:
!await(subscribeFuture, time, TimeUnit.MILLISECONDS), many blogs have misinterpreted it. This is not to wait for the unlock message to be published. As long as the subscription event succeeds, it will be executed downwards. The real waiting for the unlock message is getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

You probably don't believe it here. Why am I right? Just debug and you know

tryLockInnerAsync

tryAcquire relies on tryLockInnerAsync internally to implement logic for acquiring locks, so let's look at the source code

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  // Is there a lock
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                       // Create if not present
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      // Set expiration time
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      // Competition lock successfully returned null
                      "return nil; " +
                  "end; " +
                   // If the lock has already been acquired by the current thread
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                       // Reentries plus 1
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  // Lock is acquired by another thread, returning the expiration time of the lock
                  "return redis.call('pttl', KEYS[1]);",

                    // The following three parameters are KEYS[1], ARGV[1], ARGV[2]
                    // name of lock, release time of lock, unique identification of current thread
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

tryLockInnerAsync takes advantage of lua scripting and Redis single-threaded features to compete for locks

Here you can see the structure of the lock, as we mentioned above, Hash data structure, key is the name of the lock, filed is the "unique identity" of the current successful thread competing for the lock, and value reentrancies

unlockInnerAsync

Next let's look at the core code for unlocking

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // Use the lock's name and thread unique identity to determine if such a key-value pair exists
                // Bell unlocking must also be a bell person. If it does not exist, it does not have the right to unlock and return null
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                // Unlock logic
                // Number of bursts-1
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                // If greater than 0 means that the current thread re-entry lock cannot be unlocked multiple times, update the lock's validity time
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    // Unlock, delete key
                    "redis.call('del', KEYS[1]); " +
                    // Publish unlock message
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                // KEYS[1],KEYS[2]
                // Lock name, Channel for publishing subscriptions
                Arrays.<Object>asList(getName(), getChannelName()), 
                // ARGV[1] ~ ARGV[3]
                // Unlock message, release time, current thread unique identification
                LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

When an unlock message is published, onMessage is called to LockPubSub s to release the semaphore and wake up the thread waiting for the lock

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long UNLOCK_MESSAGE = 0L;
    public static final Long READ_UNLOCK_MESSAGE = 1L;

    public LockPubSub(PublishSubscribeService service) {
        super(service);
    }
    
    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(UNLOCK_MESSAGE)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            // Release semaphore
            value.getLatch().release();
        } else if (message.equals(READ_UNLOCK_MESSAGE)) {
            while (true) {
                Runnable runnableToExecute = value.getListeners().poll();
                if (runnableToExecute == null) {
                    break;
                }
                runnableToExecute.run();
            }

            value.getLatch().release(value.getLatch().getQueueLength());
        }
    }

}

Reference resources

Welcome to praise and forward.Your support is my greatest help

Tags: Java Redis Google

Posted on Wed, 04 Dec 2019 15:29:13 -0800 by sykowizard