C ා queue learning notes: RabbitMQ delay queue

I. Introduction

In daily life, many apps have the shadow of delay queue. For example, on mobile Taobao, we often encounter the limited time consumption red packet distributed by APP, which usually lasts for several hours or 24 hours. If the red packet is not consumed in the countdown process, the red packet will automatically fail. If the above behavior is understood by using RabbitMQ delay queue, that is, when you receive the limited time consumption red packet, Taobao will automatically send a delay message to the queue for consumption. Within the specified time, it can be consumed normally, otherwise it will be automatically invalid according to TTL.

In RabbitMQ, there are two ways to implement delay queue: one is based on queue, the other is based on message.

II. Examples

2.1 sending end (production end)

Create a new console project Send and add a RabbitMQConfig class.

    class RabbitMQConfig
    {
        public static string Host { get; set; }

        public static string VirtualHost { get; set; }

        public static string UserName { get; set; }

        public static string Password { get; set; }

        public static int Port { get; set; }

        static RabbitMQConfig()
        {
            Host = "192.168.2.242";
            VirtualHost = "/";
            UserName = "hello";
            Password = "world";
            Port = 5672;
        }
    }
RabbitMQConfig.cs
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# RabbitMQ There are two ways to implement delay queues:");
            Console.WriteLine("1,To implement delay queue based on queue mode, press 1 to start production.");
            Console.WriteLine("2,To implement delay queue based on message mode, press 2 to start production.");

            string chooseChar = Console.ReadLine();
            if (chooseChar == "1")
            {
                DelayMessagePublishByQueueExpires();
            }
            else if (chooseChar == "2")
            {
                DelayMessagePublishByMessageTTL();
            }
            Console.ReadLine();
        }

        /// <summary>
        /// Implementation of delay queue based on queue mode
        /// The TTL(Time To Live,Expiration time)Set to same
        /// </summary>
        private static void DelayMessagePublishByQueueExpires()
        {
            const string MessagePrefix = "message_";
            const int PublishMessageCount = 6;
            const int QuequeExpirySeconds = 1000 * 30;
            const int MessageExpirySeconds = 1000 * 10;

            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //When specified at the same time queue and message Of TTL Value, the smaller of the two will work.
                    Dictionary<string, object> dict = new Dictionary<string, object>
                    {
                        { "x-expires", QuequeExpirySeconds },//Queue expiration time
                        { "x-message-ttl", MessageExpirySeconds },//Message expiration time
                        { "x-dead-letter-exchange", "dead exchange 1" },//Expired message to route
                        { "x-dead-letter-routing-key", "dead routing key 1" }//Expired messages turn to routed routing key
                    };

                    //Declaration queue
                    channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict);


                    //Send messages to this message queue message
                    for (int i = 0; i < PublishMessageCount; i++)
                    {
                        var message = MessagePrefix + i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body);
                        Thread.Sleep(1000 * 2);
                        Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                    }
                }
            }
        }

        /// <summary>
        /// Implementation of delay queue based on message mode
        /// The messages in the queue are set separately, and the TTL It can be different.
        /// </summary>
        private static void DelayMessagePublishByMessageTTL()
        {
            const string MessagePrefix = "message_";
            const int PublishMessageCount = 6;
            int MessageExpirySeconds = 0;

            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    Dictionary<string, object> dict = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "dead exchange 2" },//Expired message to route
                        { "x-dead-letter-routing-key", "dead routing key 2" }//Expired messages turn to routed routing key
                    };

                    //Declaration queue
                    channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict);

                    //Send messages to this message queue message
                    Random random = new Random();
                    for (int i = 0; i < PublishMessageCount; i++)
                    {
                        MessageExpirySeconds = i * 1000;
                        var properties = channel.CreateBasicProperties();
                        properties.Expiration = MessageExpirySeconds.ToString();
                        var message = MessagePrefix + i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                    }
                }
            }
        }
    }
Program.cs

2.2 receiver (consumer)

Create a new console project Receive, hold down the Alt key, and drag a shortcut from the RabbitMQConfig class of the sender to the Receive project.

    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("C# RabbitMQ There are two ways to implement delay queues:");
            Console.WriteLine("1,To implement delay queue based on queue mode, please press 1 to start consumption.");
            Console.WriteLine("2,To implement delay queue based on message mode, please press 2 to start consumption.");

            string chooseChar = Console.ReadLine();
            if (chooseChar == "1")
            {
                DelayMessageConsumeByQueueExpires();
            }
            else if (chooseChar == "2")
            {
                DelayMessageConsumeByMessageTTL();
            }
            Console.ReadLine();
        }

        public static void DelayMessageConsumeByQueueExpires()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                    };
                    channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }

        public static void DelayMessageConsumeByMessageTTL()
        {
            var factory = new ConnectionFactory()
            {
                HostName = RabbitMQConfig.Host,
                Port = RabbitMQConfig.Port,
                VirtualHost = RabbitMQConfig.VirtualHost,
                UserName = RabbitMQConfig.UserName,
                Password = RabbitMQConfig.Password,
                Protocol = Protocols.DefaultProtocol
            };

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct");
                    string name = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                    };
                    channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                    Console.ReadKey();
                }
            }
        }
    }
Program.cs

2.3 operation results

-----------------------------------------------------------------------------------------------------------

Tags: C# encoding RabbitMQ Mobile

Posted on Sat, 11 Apr 2020 07:09:58 -0700 by frao_0