Completed
Push — master ( 129581...a310fa )
by Damian
03:27
created

RedisQueue::peek()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Predis\ClientInterface;
11
use Throwable;
12
13
final class RedisQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
17
    /**
18
     * @var ClientInterface
19
     */
20
    private $client;
21
22
    /**
23
     * @var SerializerInterface
24
     */
25
    private $serializer;
26
27
    /**
28
     * @var string
29
     */
30
    private $queueName;
31
32 6
    public function __construct(ClientInterface $client, string $queueName, ?SerializerInterface $serializer = null)
33
    {
34 6
        $this->client = $client;
35 6
        $this->queueName = $queueName;
36 6
        $this->serializer = $this->fallbackSerializer($serializer);
37 6
    }
38
39 5
    public function add(Envelope $envelope): bool
40
    {
41 5
        if (!$this->offer($envelope)) {
42
            throw new IllegalStateException("Could not write to redis");
43
        }
44
45 5
        return true;
46
    }
47
48 6
    public function offer(Envelope $envelope): bool
49
    {
50 6
        $serialized = $this->serializer->serialize($envelope, 'json');
51
52 6
        return (bool)$this->client->rpush(
53 6
            $this->queueName,
54 6
            [$serialized]
55
        );
56
    }
57
58 1
    public function remove(): Envelope
59
    {
60 1
        $element = $this->poll();
61
62 1
        if (!$element) {
63
            throw new NoSuchElementException();
64
        }
65
66 1
        return $element;
67
    }
68
69 2
    public function poll(): ?Envelope
70
    {
71
        try {
72 2
            $serialized = $this->client->lpop($this->queueName);
73
        } catch (Throwable $e) {
74
            throw new IllegalStateException("Predis connection error", 0, $e);
75
        }
76
77 2
        if (empty($serialized)) {
78 1
            return null;
79
        }
80
81 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
82
    }
83
84 1
    public function element(): Envelope
85
    {
86 1
        $element = $this->peek();
87
88 1
        if (!$element) {
89
            throw new NoSuchElementException();
90
        }
91
92 1
        return $element;
93
    }
94
95 2
    public function peek(): ?Envelope
96
    {
97 2
        $serialized = $this->client->lrange($this->queueName, 0, 0)[0];
98
99 2
        if (empty($serialized)) {
100
            return null;
101
        }
102
103 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
104
    }
105
}
106