PredisUniquePullCommandQueue::publish()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 4
cts 4
cp 1
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
declare(strict_types=1);
3
4
/**
5
 * GpsLab component.
6
 *
7
 * @author    Peter Gribanov <[email protected]>
8
 * @copyright Copyright (c) 2011, Peter Gribanov
9
 * @license   http://opensource.org/licenses/MIT
10
 */
11
12
namespace GpsLab\Component\Command\Queue\Pull;
13
14
use GpsLab\Component\Command\Command;
15
use GpsLab\Component\Command\Queue\Serializer\Serializer;
16
use Predis\Client;
17
use Psr\Log\LoggerInterface;
18
19
class PredisUniquePullCommandQueue implements PullCommandQueue
20
{
21
    /**
22
     * @var Client<Client>
23
     */
24
    private $client;
25
26
    /**
27
     * @var Serializer
28
     */
29
    private $serializer;
30
31
    /**
32
     * @var LoggerInterface
33
     */
34
    private $logger;
35
36
    /**
37
     * @var string
38
     */
39
    private $queue_name;
40
41
    /**
42
     * @param Client<Client>  $client
0 ignored issues
show
Documentation introduced by
The doc-type Client<Client> could not be parsed: Expected "|" or "end of type", but got "<" at position 6. (view supported doc-types)

This check marks PHPDoc comments that could not be parsed by our parser. To see which comment annotations we can parse, please refer to our documentation on supported doc-types.

Loading history...
43
     * @param Serializer      $serializer
44
     * @param LoggerInterface $logger
45
     * @param string          $queue_name
46
     */
47 4
    public function __construct(Client $client, Serializer $serializer, LoggerInterface $logger, string $queue_name)
48
    {
49 4
        $this->client = $client;
50 4
        $this->serializer = $serializer;
51 4
        $this->logger = $logger;
52 4
        $this->queue_name = $queue_name;
53 4
    }
54
55
    /**
56
     * Publish command to queue.
57
     *
58
     * @param Command $command
59
     *
60
     * @return bool
61
     */
62 1
    public function publish(Command $command): bool
63
    {
64 1
        $value = $this->serializer->serialize($command);
65
66
        // remove exists command and publish it again
67 1
        $this->client->lrem($this->queue_name, 0, $value);
68
69 1
        return (bool) $this->client->rpush($this->queue_name, [$value]);
70
    }
71
72
    /**
73
     * Pop command from queue. Return NULL if queue is empty.
74
     *
75
     * @return Command|null
76
     */
77 3
    public function pull(): ?Command
78
    {
79 3
        $value = $this->client->lpop($this->queue_name);
80
81 3
        if (!$value) {
82 1
            return null;
83
        }
84
85
        try {
86 3
            $command = $this->serializer->deserialize($value);
87
88 2
            if ($command instanceof Command) {
89 1
                return $command;
90
            }
91
92 1
            throw new \RuntimeException(sprintf('The denormalization command is expected "%s", got "%s" inside.', Command::class, get_class($command)));
93 2
        } catch (\Exception $e) {
94
            // it's a critical error
95
            // it is necessary to react quickly to it
96 2
            $this->logger->critical('Failed denormalize a command in the Redis queue', [$value, $e->getMessage()]);
97
98
            // try denormalize in later
99 2
            $this->client->rpush($this->queue_name, [$value]);
100
101 2
            return null;
102
        }
103
    }
104
}
105