Test Failed
Branch feature/beanstalkd-driver (0ecf85)
by Damian
03:05
created

BeanstalkdQueue::peek()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nc 3
nop 0
dl 0
loc 17
rs 9.9666
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
final class BeanstalkdQueue implements Queue
13
{
14
    use HasFallbackSerializer;
15
16
    /**
17
     * @var PheanstalkInterface
18
     */
19
    private $client;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    private $serializer;
25
26
    /**
27
     * @var string
28
     */
29
    private $queueName;
30
31
    /**
32
     * @param PheanstalkInterface      $client
33
     * @param string                   $queueName
34
     * @param SerializerInterface|null $serializer
35
     */
36
    public function __construct(
37
        PheanstalkInterface $client,
38
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
39
        ?SerializerInterface $serializer = null
40
    ) {
41
        $this->client = $client;
42
        $this->queueName = $queueName;
43
        $this->serializer = $this->fallbackSerializer($serializer);
44
    }
45
46
    public function add(Envelope $envelope): bool
47
    {
48
        if (!$this->offer($envelope)) {
49
            throw new IllegalStateException('Could not write to redis');
50
        }
51
52
        return true;
53
    }
54
55
    public function offer(Envelope $envelope): bool
56
    {
57
        $serialized = $this->serializer->serialize($envelope, 'json');
58
59
        return (bool)$this->client
60
            ->useTube($this->queueName)
61
            ->put($serialized);
62
    }
63
64
    public function remove(): Envelope
65
    {
66
        $element = $this->poll();
67
68
        if (!$element) {
69
            throw new NoSuchElementException();
70
        }
71
72
        return $element;
73
    }
74
75
    public function poll(): ?Envelope
76
    {
77
        $job = $this->client
78
            ->watch($this->queueName)
79
            ->reserveWithTimeout(0);
80
81
        if (empty($job)) {
82
            return null;
83
        }
84
85
        $serialized = $job->getData();
86
87
        if (empty($serialized)) {
88
            return null;
89
        }
90
91
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
92
    }
93
94
    public function element(): Envelope
95
    {
96
        $element = $this->peek();
97
98
        if (!$element) {
99
            throw new NoSuchElementException();
100
        }
101
102
        return $element;
103
    }
104
105
    public function peek(): ?Envelope
106
    {
107
        $job = $this->client
108
            ->watch($this->queueName)
109
            ->peekReady();
110
111
        if (empty($job)) {
112
            return null;
113
        }
114
115
        $serialized = $job->getData();
116
117
        if (empty($serialized)) {
118
            return null;
119
        }
120
121
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
122
    }
123
}
124