Passed
Pull Request — master (#1)
by
unknown
02:58
created

BeanstalkdQueue::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 3
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
final class BeanstalkdQueue implements Queue
13
{
14
    use HasFallbackSerializer;
15
16
    /**
17
     * @var PheanstalkInterface
18
     */
19
    private $client;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    private $serializer;
25
26
    /**
27
     * @var string
28
     */
29
    private $queueName;
30
31 12
    public function __construct(
32
        PheanstalkInterface $client,
33
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
34
        ?SerializerInterface $serializer = null
35
    ) {
36 12
        $this->client = $client;
37 12
        $this->queueName = $queueName;
38 12
        $this->serializer = $this->fallbackSerializer($serializer);
39 12
    }
40
41 10
    public function add(Envelope $envelope): bool
42
    {
43 10
        if (!$this->offer($envelope)) {
44
            throw new IllegalStateException('Could not write to redis');
45
        }
46
47 10
        return true;
48
    }
49
50 12
    public function offer(Envelope $envelope): bool
51
    {
52 12
        $serialized = $this->serializer->serialize($envelope, 'json');
53
54 12
        return (bool)$this->client
55 12
            ->useTube($this->queueName)
56 12
            ->put($serialized);
57
    }
58
59 2
    public function remove(): Envelope
60
    {
61 2
        $element = $this->poll();
62
63 2
        if (!$element) {
64
            throw new NoSuchElementException();
65
        }
66
67 2
        return $element;
68
    }
69
70 4
    public function poll(): ?Envelope
71
    {
72 4
        $job = $this->client
73 4
            ->watch($this->queueName)
74 4
            ->reserveWithTimeout(0);
75
76 4
        if (empty($job)) {
77 2
            return null;
78
        }
79
80 4
        $serialized = $job->getData();
81
82 4
        if (empty($serialized)) {
83
            return null;
84
        }
85
86 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
87
    }
88
89 2
    public function element(): Envelope
90
    {
91 2
        $element = $this->peek();
92
93 2
        if (!$element) {
94
            throw new NoSuchElementException();
95
        }
96
97 2
        return $element;
98
    }
99
100 4
    public function peek(): ?Envelope
101
    {
102 4
        $job = $this->client
103 4
            ->watch($this->queueName)
104 4
            ->peekReady();
105
106 4
        if (empty($job)) {
107
            return null;
108
        }
109
110 4
        $serialized = $job->getData();
111
112 4
        if (empty($serialized)) {
113
            return null;
114
        }
115
116 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
117
    }
118
}
119