Listener   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 100
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 10
eloc 29
c 1
b 0
f 0
dl 0
loc 100
rs 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A removeMessagesFromQueue() 0 13 2
A getMessagesFromQueue() 0 14 2
A moveMessagesToDeadQueue() 0 6 1
A restoreMessagesFromProcessingQueue() 0 6 1
A processMessages() 0 11 4
1
<?php
2
3
namespace LeoCarmo\RedisQueue;
4
5
use LeoCarmo\RedisQueue\Exceptions\QueueWithoutConnectionException;
6
7
class Listener extends Queue
8
{
9
10
    /**
11
     * @param string $queue
12
     * @param string $worker
13
     *
14
     * @throws QueueWithoutConnectionException
15
     */
16
    public static function restoreMessagesFromProcessingQueue(string $queue, string $worker)
17
    {
18
        self::moveMessages(
19
            $queue,
20
            self::getProcessingQueueName($queue, $worker),
21
            $queue
22
        );
23
    }
24
25
    /**
26
     * @param string $queue
27
     * @param string $worker
28
     * @param int $quantity
29
     * @param callable $callback
30
     *
31
     * @throws QueueWithoutConnectionException
32
     */
33
    public static function processMessages(string $queue, string $worker, int $quantity, callable $callback)
34
    {
35
        if ($events = self::getMessagesFromQueue($queue, $worker, $quantity)) {
36
            try {
37
                $callback(
38
                    $quantity === 1 ? $events[0] : $events
39
                );
40
41
                self::removeMessagesFromQueue($queue, $worker, $events);
42
            } catch (\Throwable $e) {
43
                self::moveMessagesToDeadQueue($queue, $worker);
44
            }
45
        }
46
    }
47
48
    /**
49
     * @param string $queue
50
     * @param string $worker
51
     * @param int $quantity
52
     * @return array
53
     *
54
     * @throws QueueWithoutConnectionException
55
     */
56
    protected static function getMessagesFromQueue(string $queue, string $worker, int $quantity)
57
    {
58
        $clientMulti = self::client($queue)->multi();
59
60
        for ($i = 0; $i < $quantity; $i++) {
61
            $clientMulti->brpoplpush(
62
                $queue,
63
                self::getProcessingQueueName($queue, $worker),
64
                0
65
            );
66
        }
67
68
        return array_filter(
69
            $clientMulti->exec()
70
        );
71
    }
72
73
    /**
74
     * @param string $queue
75
     * @param string $worker
76
     * @param $events
77
     *
78
     * @throws QueueWithoutConnectionException
79
     */
80
    protected static function removeMessagesFromQueue(string $queue, string $worker, $events)
81
    {
82
        $clientMulti = self::client($queue)->multi();
83
84
        foreach ($events as $value) {
85
            $clientMulti->lRem(
86
                self::getProcessingQueueName($queue, $worker),
87
                $value,
88
                1
89
            );
90
        }
91
92
        $clientMulti->exec();
93
    }
94
95
    /**
96
     * @param string $queue
97
     * @param string $worker
98
     *
99
     * @throws QueueWithoutConnectionException
100
     */
101
    protected static function moveMessagesToDeadQueue(string $queue, string $worker)
102
    {
103
        self::moveMessages(
104
            $queue,
105
            self::getProcessingQueueName($queue, $worker),
106
            self::getDeadQueueName($queue)
107
        );
108
    }
109
}
110