Completed
Push — master ( 53c691...aee4b3 )
by Peter
02:49
created

PredisUniquePullCommandQueue::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 13
c 0
b 0
f 0
ccs 7
cts 7
cp 1
rs 9.4285
cc 2
eloc 11
nc 2
nop 5
crap 2
1
<?php
2
3
/**
4
 * GpsLab component.
5
 *
6
 * @author    Peter Gribanov <[email protected]>
7
 * @copyright Copyright (c) 2011, Peter Gribanov
8
 * @license   http://opensource.org/licenses/MIT
9
 */
10
11
namespace GpsLab\Component\Command\Queue\Pull;
12
13
use GpsLab\Component\Command\Command;
14
use Predis\Client;
15
use Psr\Log\LoggerInterface;
16
use Symfony\Component\Serializer\SerializerInterface;
17
18
class PredisUniquePullCommandQueue implements PullCommandQueue
19
{
20
    const DEFAULT_FORMAT = PredisPullCommandQueue::DEFAULT_FORMAT;
21
22
    /**
23
     * @var Client
24
     */
25
    private $client;
26
27
    /**
28
     * @var SerializerInterface
29
     */
30
    private $serializer;
31
32
    /**
33
     * @var LoggerInterface
34
     */
35
    private $logger;
36
37
    /**
38
     * @var string
39
     */
40
    private $queue_name = '';
41
42
    /**
43
     * @var string
44
     */
45
    private $format = '';
46
47
    /**
48
     * @param Client              $client
49
     * @param SerializerInterface $serializer
50
     * @param LoggerInterface     $logger
51
     * @param string              $queue_name
52
     * @param string|null         $format
53
     */
54 6
    public function __construct(
55
        Client $client,
56
        SerializerInterface $serializer,
57
        LoggerInterface $logger,
58
        $queue_name,
59
        $format = null
60
    ) {
61 6
        $this->client = $client;
62 6
        $this->serializer = $serializer;
63 6
        $this->logger = $logger;
64 6
        $this->queue_name = $queue_name;
65 6
        $this->format = $format ?: self::DEFAULT_FORMAT;
66 6
    }
67
68
    /**
69
     * Publish command to queue.
70
     *
71
     * @param Command $command
72
     *
73
     * @return bool
74
     */
75 2
    public function publish(Command $command)
76
    {
77 2
        $value = $this->serializer->serialize($command, $this->format);
78
79
        // remove exists command and publish it again
80 2
        $this->client->lrem($this->queue_name, 0, $value);
81
82 2
        return (bool) $this->client->rpush($this->queue_name, [$value]);
83
    }
84
85
    /**
86
     * Pop command from queue. Return NULL if queue is empty.
87
     *
88
     * @return Command|null
89
     */
90 4
    public function pull()
91
    {
92 4
        $value = $this->client->lpop($this->queue_name);
93
94 4
        if (!$value) {
95 2
            return null;
96
        }
97
98
        try {
99 4
            return $this->serializer->deserialize($value, Command::class, $this->format);
100 2
        } catch (\Exception $e) {
101
            // it's a critical error
102
            // it is necessary to react quickly to it
103 2
            $this->logger->critical('Failed denormalize a command in the Redis queue', [$value, $e->getMessage()]);
104
105
            // try denormalize in later
106 2
            $this->client->rpush($this->queue_name, [$value]);
107
108 2
            return null;
109
        }
110
    }
111
}
112