Test Failed
Pull Request — master (#1)
by
unknown
03:30
created

BeanstalkdQueue::element()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Driver;
4
5
use Initx\Envelope;
6
use Initx\Exception\IllegalStateException;
7
use Initx\Exception\NoSuchElementException;
8
use Initx\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
/**
13
 * Class BeanstalkdQueue
14
 *
15
 * @package Initx\Driver
16
 */
17
final class BeanstalkdQueue implements Queue
18
{
19
    use HasFallbackSerializer;
20
21
    /**
22
     * @var PheanstalkInterface
23
     */
24
    private $client;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    private $serializer;
30
31
    /**
32
     * @var string
33
     */
34
    private $queueName;
35
36
    /**
37
     * BeanstalkdQueue constructor.
38
     *
39
     * @param PheanstalkInterface      $client
40
     * @param string                   $queueName
41
     * @param SerializerInterface|null $serializer
42
     */
43
    public function __construct(
44
        PheanstalkInterface $client,
45
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
46
        ?SerializerInterface $serializer = null
47
    ) {
48
        $this->client = $client;
49
        $this->queueName = $queueName;
50
        $this->serializer = $this->fallbackSerializer($serializer);
51
    }
52
53
    /**
54
     * Inserts an element if possible, otherwise throwing exception.
55
     *
56
     * @param Envelope $envelope
57
     * @return bool
58
     * @throws IllegalStateException
59
     */
60
    public function add(Envelope $envelope): bool
61
    {
62
        if (!$this->offer($envelope)) {
63
            throw new IllegalStateException('Could not write to redis');
64
        }
65
66
        return true;
67
    }
68
69
    /**
70
     * Inserts an element if possible, otherwise returning false.
71
     *
72
     * @param Envelope $envelope
73
     * @return bool
74
     */
75
    public function offer(Envelope $envelope): bool
76
    {
77
        $serialized = $this->serializer->serialize($envelope, 'json');
78
79
        return (bool)$this->client
80
            ->useTube($this->queueName)
81
            ->put($serialized);
82
    }
83
84
    /**
85
     * Remove and return head of queue, otherwise throwing exception.
86
     *
87
     * @return Envelope
88
     * @throws NoSuchElementException
89
     */
90
    public function remove(): Envelope
91
    {
92
        $element = $this->poll();
93
94
        if (!$element) {
95
            throw new NoSuchElementException();
96
        }
97
98
        return $element;
99
    }
100
101
    /**
102
     * Remove and return head of queue, otherwise returning null.
103
     *
104
     * @return Envelope | null
105
     */
106
    public function poll(): ?Envelope
107
    {
108
        $job = $this->client
109
            ->watch($this->queueName)
110
            ->reserveWithTimeout(0);
111
112
        if (empty($job)) {
113
            return null;
114
        }
115
116
        $serialized = $job->getData();
117
118
        if (empty($serialized)) {
119
            return null;
120
        }
121
122
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
123
    }
124
125
    /**
126
     * Return but do not remove head of queue, otherwise throwing exception.
127
     *
128
     * @return Envelope
129
     * @throws NoSuchElementException
130
     */
131
    public function element(): Envelope
132
    {
133
        $element = $this->peek();
134
135
        if (!$element) {
136
            throw new NoSuchElementException();
137
        }
138
139
        return $element;
140
    }
141
142
    /**
143
     * Return but do not remove head of queue, otherwise returning null.
144
     *
145
     * @return Envelope | null
146
     */
147
    public function peek(): ?Envelope
148
    {
149
        $job = $this->client
150
            ->watch($this->queueName)
151
            ->peekReady();
152
153
        if (empty($job)) {
154
            return null;
155
        }
156
157
        $serialized = $job->getData();
158
159
        if (empty($serialized)) {
160
            return null;
161
        }
162
163
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
164
    }
165
}
166