Advanced Feature Use for Redis

Optimistic Locks and Transactions

  • Use watch to monitor a key;
  • Open transaction for operation;
  • Submit the transaction after the operation is completed;
  • If the watch key changes before committing, the committed transaction will not work;
/**
 * Test transactions and optimistic locks
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisTransactionTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * Test transactions.
     * A simulation scenario is where a user sells goods to a store.
     */
    @Test
    public void test() {
        Assert.assertEquals(listItem(), true);
    }

    boolean listItem() {
        // Optimistic Lock
        redisTemplate.watch("inventory:17");
        // Determine if there is an ItemN in the backpack for sale and if not, end the process
        if (!redisTemplate.opsForSet().isMember("inventory:17", "ItemN")) {
            redisTemplate.unwatch();
            return false;
        }
        // Enable Transactions
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.multi();
        // Add to Store
        redisTemplate.opsForZSet().add("market:", "ItemN", 9);
        // Remove from Backpack
        redisTemplate.opsForSet().remove("inventory:17", "ItemN");
        // Execute Transaction
        List<Object> result = redisTemplate.exec();
        // Determine if the transaction is empty
        if (result == null || result.size() == 0) return false;
        return true;
    }

}

Expiration Time

Use the expiration time feature to achieve access limit.

test method

Create a Key for each IP with a counter of Value and an expiration time of 1 minute. Each request first determines if the key exists, and if it does, increments the counter by itself. When the number of counters is greater than the specified number, it represents the number of times the limit has been reached.The disadvantage of this method is that it can be accessed continuously in the last second of the current minute and the first second of the next minute.

test1 method

Create a List for each IP that stores the request time. Each request first determines if the length in the List is less than the specified number of times. If less than, it means there is no limit on arrival and puts the current request time in the List.

If the length of the last request in the List is longer than the specified number of times, take out the time of the last request and compare it with the current time to determine whether the number of requests in the specified time is exceeded within the specified time. If not, put the current time in the queue and eject the queue at the earliest time.

/**
 * Test expiration time
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisExpireTest {

    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * Set the ip access frequency, which can only be accessed twice per minute
     */
    @Test
    public void test() {
        String key = "rate.limiting:localhost";
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // Determine if key exists
        if (redisTemplate.countExistingKeys(keys) > 0) {
            // Access counter for this ip+1 if present
            long times = redisTemplate.opsForValue().increment(key);
            // If the number is greater than 2, the access frequency is prompted
            if (times > 2) {
                System.out.println("Access frequency exceeded.");
            }
        } else {
            // If key does not exist, create a counter for this ip
            redisTemplate.opsForValue().increment(key);
            // Set the timeout of the counter to 60 seconds
            // This allows you to access 2 + 2 = 4 consecutive times in the last second of the current minute and the first second of the next minute
            redisTemplate.expire(key, 60, TimeUnit.SECONDS);
        }
    }

    @Test
    public void test1() {
        String key = "1rate.limiting:localhost";
        long len = redisTemplate.opsForList().size(key);
        // Do not request more than twice in a minute
        if (len < 2) {
            // Save the current time
            redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
        } else {
            // Take out the last time
            long time = (long) redisTemplate.opsForList().index(key, -1);
            // Two requests have been made, but the time interval from the last request is less than one minute
            if (System.currentTimeMillis() - time < 60000) {
                System.out.println("Access frequency exceeded.");
            } else {
                // Save the current time
                redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
                // Clip Queue
                redisTemplate.opsForList().trim(key, 0, 1);
            }
        }
    }
}

Sort Sorting

  • In the sort:test method, you can sort List type, Set type, ZSet type;
  • In the by:test1 method, you can sort fields by joining Hash or String types by;
  • The get:test2 method is implemented to return the value specified by the get parameter, which can be the value of the object used for by, or the #number if you want to return the value of the primary key.
/**
 * Test Sort
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisSortTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * Call Sort Sorting
     */
    @Test
    public void test() {
        SortQuery<String> query = SortQueryBuilder.sort("mylist").build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * Call Sort + By Sort
     */
    @Test
    public void test1() {
        SortQuery<String> query = SortQueryBuilder.sort("sortbylist")
                .by("itemscore:*")
                .order(SortParameters.Order.DESC)
                .limit(0, -1).build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * Call Sort + By + Get sort
     */
    @Test
    public void test2() {
        SortQuery<String> query = SortQueryBuilder.sort("tag:ruby:posts")
                .by("post:*->time")
                .order(SortParameters.Order.DESC)
                .limit(0, -1)
                .get("post:*->title")
                .get("post:*->time")
                .get("#")
                .build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }


}

Blocking Queue

The data in the queue is consumed through the brpop function, and the consumption is cancelled if the data has not been consumed beyond the specified blocking time.

brpop can consume multiple queues at the same time, from left to right, according to which priority queues can be implemented.

/**
 * Blocking Queue
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisQueueTest {

    private static Lock lock = new ReentrantLock();

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Test
    public void test() throws InterruptedException {
        lock.lockInterruptibly();
        RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
        RedisConnection connection = connectionFactory.getConnection();

        try {
            while (true) {
                List<byte[]> results = connection.bRPop(60, "queue".getBytes());
                if (!CollectionUtils.isEmpty(results)) {
                    for (byte[] bytes : results) {
                        System.out.println(new String(bytes));
                    }
                }
            }
        } finally {
            lock.unlock();
            RedisConnectionUtils.releaseConnection(connection, connectionFactory);
        }

    }

}

Publish/Subscribe

Join the listen, listen to the message key.

/**
     * Configure consumers with parameters that implement MessageListener.
     */
    @Bean
    public MessageListenerAdapter adapter(RedisMessage message) {
        // onMessage If the interface is not implemented in RedisMessage, this parameter must be the same as the method name used to read the information in RedisMessage
        return new MessageListenerAdapter(message, "message");
    }

    /**
     * Configure the listening container.
     */
    @Bean
    public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //Listen for the corresponding channel
        container.addMessageListener(adapter, new PatternTopic("message"));
        return container;
    }

informant

/**
 * Test Publication Subscription
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisMQTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * MQ that sends messages to Redis.
     */
    @Test
    public void test() {
        redisTemplate.convertAndSend("message", "hello world");
    }

}

Message consumer

/**
 * MQ Consuming Redis
 */
@Component
public class RedisMessage implements MessageListener {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
        String msg = serializer.deserialize(message.getBody());
        System.out.println("The message received is:" + msg);
    }

}

The Conduit

If you have scenarios where redis are batched, pipeline can be used to reduce the cost of connecting redis multiple times.

package cc.xuepeng.ch06;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;

/**
 * Test Pipeline Publishing
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisPipelineTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * MQ that sends messages to Redis.
     */
    @Test
    public void test() {
        List<Object> redisResult = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                for (int i = 0; i < 1000; i++) {
                    redisConnection.incr("incr:test".getBytes());
                }
                return null;
            }
        });
        System.out.println(redisResult);
    }

}

Tags: Programming Redis less Junit Ruby

Posted on Mon, 23 Dec 2019 10:21:11 -0800 by dvdflashbacks