Completed
Branch master (339a46)
by Damian
04:14
created

RedisQueue   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 70
Duplicated Lines 0 %

Test Coverage

Coverage 84.62%

Importance

Changes 0
Metric Value
eloc 27
dl 0
loc 70
ccs 22
cts 26
cp 0.8462
rs 10
c 0
b 0
f 0
wmc 9

5 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A peek() 0 9 2
A add() 0 7 2
A offer() 0 7 1
A poll() 0 13 3
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Predis\ClientInterface;
11
use Throwable;
12
13
final class RedisQueue implements Queue
14
{
15
    use HasFallbackSerializer;
16
    use HasDefaultRemoveAndElement;
17
18
    /**
19
     * @var ClientInterface
20
     */
21
    private $client;
22
23
    /**
24
     * @var SerializerInterface
25
     */
26
    private $serializer;
27
28
    /**
29
     * @var string
30
     */
31
    private $queueName;
32
33 6
    public function __construct(ClientInterface $client, string $queueName, ?SerializerInterface $serializer = null)
34
    {
35 6
        $this->client = $client;
36 6
        $this->queueName = $queueName;
37 6
        $this->serializer = $this->fallbackSerializer($serializer);
38 6
    }
39
40 5
    public function add(Envelope $envelope): bool
41
    {
42 5
        if (!$this->offer($envelope)) {
43
            throw new IllegalStateException("Could not write to redis");
44
        }
45
46 5
        return true;
47
    }
48
49 6
    public function offer(Envelope $envelope): bool
50
    {
51 6
        $serialized = $this->serializer->serialize($envelope, 'json');
52
53 6
        return (bool)$this->client->rpush(
54 6
            $this->queueName,
55 6
            [$serialized]
56
        );
57
    }
58
59 2
    public function poll(): ?Envelope
60
    {
61
        try {
62 2
            $serialized = $this->client->lpop($this->queueName);
63
        } catch (Throwable $e) {
64
            throw new IllegalStateException("Predis connection error", 0, $e);
65
        }
66
67 2
        if (empty($serialized)) {
68 1
            return null;
69
        }
70
71 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
72
    }
73
74 2
    public function peek(): ?Envelope
75
    {
76 2
        $serialized = $this->client->lrange($this->queueName, 0, 0)[0];
77
78 2
        if (empty($serialized)) {
79
            return null;
80
        }
81
82 2
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
83
    }
84
}
85