RabbitMQ tutorial (PHP) implementation of delay function

php uses rabbitmq delayed message exchange plug-in to implement delay function

1. installation

  • 3.6.x download address
  • 3.7.x download address

After downloading, extract it and copy it to the rabbitmq server directory (deployed using Linux Debian/RPM): / usr/local/rabbitmq/plugins (windows installation directory \ rabbitmq? Server version \ plugins).

2. Enable plug-ins

Enable the plug-in using the command rabbitmq plugins enable rabbitmq ﹐ delayed ﹐ message ﹐ exchange

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

The output is as follows:

The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

View the installed list through rabbitmq plugins list, as follows:

[ ] rabbitmq_delayed_message_exchange 20171215-3.6.x

3. Mechanism explanation

After the plug-in is installed, a new Exchange type x-delayed-message will be generated, which supports the delayed delivery mechanism, After receiving the message, the message is not immediately delivered to the target queue, but stored in mnesia (a distributed data system) table to detect the message delay time, such as when the delivery time is reached and deliver it to the target queue through the switch type marked with x-delayed-type.

 

4.php implementation process

Consumer delay_consumer2.php:

<?php

//header('Content-Type:text/html;charset=utf8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);
$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp'));

//exit();

try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO logging
        echo 'rabbit-mq Connection error:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO logging
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    //$exchange->setFlags(AMQP_DURABLE);//Declare an existing exchanger. If it does not exist, an exception will be thrown. This is usually used on the consume r side
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message type
    /*RabbitMQ There are three commonly used exchange types: fanout, direct, and topic.

      fanout:Post all messages sent to this Exchange to all queues bound to it.

      direct:Post messages to queues where the binding key exactly matches the routing key.

      topic:Route the message to a queue with a binding key matching the routing key pattern.*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //binding
    $queue->bind($params['exchangeName'], $params['routeKey']);
} catch(Exception $e) {
    echo $e->getMessage();
    exit();
}

function callback(AMQPEnvelope $message) {
    global $queue;
    if ($message) {
        $body = $message->getBody();
        echo 'Receiving time:'.date("Y-m-d H:i:s", time()). PHP_EOL;
        echo 'Received content:'.$body . PHP_EOL;
        //In order to prevent the receiver from Downing when processing the message, the ack message is sent only after the message processing is completed
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

//$queue->consume('callback');  The first mode of consumption,But it will block,Program stuck here all the time

//The second mode of consumption, non blocking
/*$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //Answer, indicating that the message has been consumed
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }
    else
    {
        //echo 'message not found' . PHP_EOL;
    }
}*/

//Note: this method should be noted here:$queue->consume,queue Object has two methods for fetching messages: consume and get. The former is blocking, and will be suspended if there is no message, which is suitable for use in the loop; the latter is non blocking, if there is a message, it will be taken, if there is no message, it will be returnedfalse. 
//That is to say, after using consume, it will block synchronously. The program is resident in memory and cannot be called with nginx or apache. 
$action = '2';

if($action == '1'){
    $queue->consume('callback');  //The first mode of consumption, but it will block, and the program will always be stuck here
}else{
    //The second mode of consumption, non blocking
    $start = time();
    while(true)
    {
        $message = $queue->get();
        if(!empty($message))
        {
            echo 'Receiving time:'.date("Y-m-d H:i:s", time()). PHP_EOL;
            echo 'Received content:'.$message->getBody().PHP_EOL;
            $queue->ack($message->getDeliveryTag());    //Answer, indicating that the message has been consumed
            $end = time();
            echo 'Run time:'.($end - $start).'second'.PHP_EOL;
            //exit();
        }
        else
        {
            //echo 'message not found' . PHP_EOL;
        }
    }
}

Producer delay_publisher2.php:

<?php

//header('Content-Type:text/html;charset=utf-8;');

$params = array(
    'exchangeName' => 'delayed_exchange_test',
    'queueName' => 'delayed_queue_test',
    'routeKey' => 'delayed_route_test',
);

$connectConfig = array(
    'host' => 'localhost',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
);

//var_dump(extension_loaded('amqp')); Determine whether to load amqp extend
//exit();
try {
    $conn = new AMQPConnection($connectConfig);
    $conn->connect();
    if (!$conn->isConnected()) {
        //die('Conexiune esuata');
        //TODO logging
        echo 'rabbit-mq Connection error:', json_encode($connectConfig);
        exit();
    }
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) {
        // die('Connection through channel failed');
        //TODO logging
        echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
        exit();
    }
    $exchange = new AMQPExchange($channel);
    $exchange->setName($params['exchangeName']);
    $exchange->setType('x-delayed-message'); //x-delayed-message type
    /*RabbitMQ There are three commonly used exchange types: fanout, direct, and topic.

      fanout:Post all messages sent to this Exchange to all queues bound to it.

      direct:Post messages to queues where the binding key exactly matches the routing key.

      topic:Route the message to a queue with a binding key matching the routing key pattern.*/
    $exchange->setArgument('x-delayed-type','direct');
    $exchange->declareExchange();

    //$channel->startTransaction();
    //RabbitMQ does not allow two queues with the same name and different configuration to be declared, otherwise an error will be reported
    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //Binding queues and switches
    $queue->bind($params['exchangeName'], $params['routeKey']);
    //$channel->commitTransaction();
} catch(Exception $e) {

}

for($i=5;$i>0;$i--){
    //Generate message
    echo 'Sent:'.date("Y-m-d H:i:s", time()).PHP_EOL;
    echo 'i='.$i.',delay'.$i.'second'.PHP_EOL;
    $message = json_encode(['order_id'=>time(),'i'=>$i]);
    $exchange->publish($message, $params['routeKey'], AMQP_NOPARAM, ['headers'=>['x-delay'=> 1000*$i]]);
    sleep(2);
}
$conn->disconnect();

For code, first of all, for consumer core code

$exchange->setType('x-delayed-message'); //x-delayed-message type
$exchange->setArgument('x-delayed-type','direct');

Producer core code

$exchange = new AMQPExchange($channel);
$exchange->setName($params['exchangeName']);
$exchange->setType('x-delayed-message'); //x-delayed-message type
$exchange->setArgument('x-delayed-type','direct');
$exchange->declareExchange();

Usage: first run delay_consumer1.php, and then run delay_publisher1.php

Operation effect:

 

 

Tags: Programming RabbitMQ PHP Linux RPM

Posted on Thu, 16 Apr 2020 07:48:59 -0700 by Bac