Completed
Branch master (339a46)
by Damian
04:14
created

RedisQueue::remove()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Predis\ClientInterface;
11
use Throwable;
12
13
final class RedisQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var ClientInterface
20
     */
21
    private $client;
22
23
    /**
24
     * @var SerializerInterface
25
     */
26
    private $serializer;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueName;
32
33 6
    public function __construct(ClientInterface $client, string $queueName, ?SerializerInterface $serializer = null)
34
    {
35 6
        $this->client = $client;
36 6
        $this->queueName = $queueName;
37 6
        $this->serializer = $this->fallbackSerializer($serializer);
38 6
    }
39
40 5
    public function add(Envelope $envelope): bool
41
    {
42 5
        if (!$this->offer($envelope)) {
43
            throw new IllegalStateException("Could not write to redis");
44
        }
45
46 5
        return true;
47
    }
48
49 6
    public function offer(Envelope $envelope): bool
50
    {
51 6
        $serialized = $this->serializer->serialize($envelope, 'json');
52
53 6
        return (bool)$this->client->rpush(
54 6
            $this->queueName,
55 6
            [$serialized]
56
        );
57
    }
58
59 2
    public function poll(): ?Envelope
60
    {
61
        try {
62 2
            $serialized = $this->client->lpop($this->queueName);
63
        } catch (Throwable $e) {
64
            throw new IllegalStateException("Predis connection error", 0, $e);
65
        }
66
67 2
        if (empty($serialized)) {
68 1
            return null;
69
        }
70
71 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
72
    }
73
74 2
    public function peek(): ?Envelope
75
    {
76 2
        $serialized = $this->client->lrange($this->queueName, 0, 0)[0];
77
78 2
        if (empty($serialized)) {
79
            return null;
80
        }
81
82 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
83
    }
84
}
85