Lavavel5.5 source code - how to implement RedisQueue

Basic functions of queue:

1. Immediate execution
2. Delayed execution
3. Guarantee to execute at least once; yes
4. It must be executed at most once; no

 

Data structure used:

  list,Sorted sets 

Delayed execution mechanism:
1. First put the data into queues:queue_000:delayed of SortedSets type
2. When executing the pop, execute the lua script, rpush the executable data rpus in the queues: queue_: delayed of SortedSets type to the queues: queue_of list type

Mechanisms to ensure successful implementation:
1. Put the data to be executed into queues: queue Ou 000: reserved of SortedSets type
2. When executing pop, execute lua script, rpush the executable data rpus in queues: queue Ou 000: reserved of SortedSets type to queues: queue Ou 000 of list type
3. The task is executed successfully. Delete the pre stored data from the queues: queue Ou 000: reserved of SortedSets type

  

 

class RedisQueue extends Queue implements QueueContract
{
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        $this->getConnection()->rpush(
            $this->getQueue($queue), // Queues of type list: queue 000
            $payload // $payload = = = "standardized data, json formatted data"
        );

        return json_decode($payload, true)['id'] ?? null;
    }


    protected function laterRaw($delay, $payload, $queue = null)
    {
        $this->getConnection()->zadd(
            $this->getQueue($queue).':delayed',  // queues:queue_000:delayed of SortedSets type
            $this->availableAt($delay), // Delayed execution
            $payload // $payload = = = "standardized data, json formatted data"
        );

        return json_decode($payload, true)['id'] ?? null;
    }


    public function pop($queue = null)
    {
        // Execute lua script, rpush the executable data in queues: queue_: delayed of SortedSets type to queues: queue of list type
        // Execute lua script, rpush the executable data in queues: queue UK 000: reserved of SortedSets type to queues: queue UK 000 of list type
        $this->migrate($prefixed = $this->getQueue($queue));

        // Execute lua script, output data from lpop in queues: queue 000 of list type, add 1 to attempts, set timeout and put it into the structure to sort queues: queue 000: reserved of SortedSets type
        list($job, $reserved) = $this->retrieveNextJob($prefixed);

        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
    }

}

Tags: PHP JSON

Posted on Mon, 02 Dec 2019 12:38:29 -0800 by ole968