Test Failed
Pull Request — master (#1)
by
unknown
03:28
created

BeanstalkdQueue   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 144
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 41
dl 0
loc 144
rs 10
c 0
b 0
f 0
wmc 14

7 Methods

Rating   Name   Duplication   Size   Complexity  
A peek() 0 17 3
A __construct() 0 5 1
A offer() 0 7 1
A remove() 0 9 2
A add() 0 7 2
A poll() 0 17 3
A element() 0 9 2
1
<?php declare(strict_types=1);
2
3
namespace Initx\Driver;
4
5
use Initx\Envelope;
6
use Initx\Exception\IllegalStateException;
7
use Initx\Exception\NoSuchElementException;
8
use Initx\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
/**
13
 * Class BeanstalkdQueue
14
 *
15
 * @package Initx\Driver
16
 */
17
final class BeanstalkdQueue implements Queue
18
{
19
    use HasFallbackSerializer;
20
21
    /**
22
     * @var PheanstalkInterface
23
     */
24
    private $client;
25
26
    /**
27
     * @var SerializerInterface
28
     */
29
    private $serializer;
30
31
    /**
32
     * @var string
33
     */
34
    private $queueName;
35
36
    /**
37
     * BeanstalkdQueue constructor.
38
     *
39
     * @param PheanstalkInterface      $client
40
     * @param string                   $queueName
41
     * @param SerializerInterface|null $serializer
42
     */
43
    public function __construct(PheanstalkInterface $client, string $queueName = PheanstalkInterface::DEFAULT_TUBE, ?SerializerInterface $serializer = null)
44
    {
45
        $this->client = $client;
46
        $this->queueName = $queueName;
47
        $this->serializer = $this->fallbackSerializer($serializer);
48
    }
49
50
    /**
51
     * Inserts an element if possible, otherwise throwing exception.
52
     *
53
     * @param Envelope $envelope
54
     * @return bool
55
     * @throws IllegalStateException
56
     */
57
    public function add(Envelope $envelope): bool
58
    {
59
        if (!$this->offer($envelope)) {
60
            throw new IllegalStateException('Could not write to redis');
61
        }
62
63
        return true;
64
    }
65
66
    /**
67
     * Inserts an element if possible, otherwise returning false.
68
     *
69
     * @param Envelope $envelope
70
     * @return bool
71
     */
72
    public function offer(Envelope $envelope): bool
73
    {
74
        $serialized = $this->serializer->serialize($envelope, 'json');
75
76
        return (bool)$this->client
77
            ->useTube($this->queueName)
78
            ->put($serialized);
79
    }
80
81
    /**
82
     * Remove and return head of queue, otherwise throwing exception.
83
     *
84
     * @return Envelope
85
     * @throws NoSuchElementException
86
     */
87
    public function remove(): Envelope
88
    {
89
        $element = $this->poll();
90
91
        if (!$element) {
92
            throw new NoSuchElementException();
93
        }
94
95
        return $element;
96
    }
97
98
    /**
99
     * Remove and return head of queue, otherwise returning null.
100
     *
101
     * @return Envelope | null
102
     */
103
    public function poll(): ?Envelope
104
    {
105
        $job = $this->client
106
            ->watch($this->queueName)
107
            ->reserveWithTimeout(0);
108
109
        if (empty($job)) {
110
            return null;
111
        }
112
113
        $serialized = $job->getData();
114
115
        if (empty($serialized)) {
116
            return null;
117
        }
118
119
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
120
    }
121
122
    /**
123
     * Return but do not remove head of queue, otherwise throwing exception.
124
     *
125
     * @return Envelope
126
     * @throws NoSuchElementException
127
     */
128
    public function element(): Envelope
129
    {
130
        $element = $this->peek();
131
132
        if (!$element) {
133
            throw new NoSuchElementException();
134
        }
135
136
        return $element;
137
    }
138
139
    /**
140
     * Return but do not remove head of queue, otherwise returning null.
141
     *
142
     * @return Envelope | null
143
     */
144
    public function peek(): ?Envelope
145
    {
146
        $job = $this->client
147
            ->watch($this->queueName)
148
            ->peekReady();
149
150
        if (empty($job)) {
151
            return null;
152
        }
153
154
        $serialized = $job->getData();
155
156
        if (empty($serialized)) {
157
            return null;
158
        }
159
160
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
161
    }
162
}
163