Completed
Branch master (339a46)
by Damian
04:14
created

BeanstalkdQueue::element()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
cc 2
eloc 4
nc 2
nop 0
dl 0
loc 9
ccs 4
cts 5
cp 0.8
crap 2.032
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
final class BeanstalkdQueue implements Queue
13
{
14
    use HasFallbackSerializer;
15
    use HasDefaultRemoveAndElement;
16
17
    /**
18
     * @var PheanstalkInterface
19
     */
20
    private $client;
21
22
    /**
23
     * @var SerializerInterface
24
     */
25
    private $serializer;
26
27
    /**
28
     * @var string
29
     */
30
    private $queueName;
31
32 12
    public function __construct(
33
        PheanstalkInterface $client,
34
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
35
        ?SerializerInterface $serializer = null
36
    ) {
37 12
        $this->client = $client;
38 12
        $this->queueName = $queueName;
39 12
        $this->serializer = $this->fallbackSerializer($serializer);
40 12
    }
41
42 10
    public function add(Envelope $envelope): bool
43
    {
44 10
        if (!$this->offer($envelope)) {
45
            throw new IllegalStateException('Could not write to redis');
46
        }
47
48 10
        return true;
49
    }
50
51 12
    public function offer(Envelope $envelope): bool
52
    {
53 12
        $serialized = $this->serializer->serialize($envelope, 'json');
54
55 12
        return (bool)$this->client
56 12
            ->useTube($this->queueName)
57 12
            ->put($serialized);
58
    }
59
60 4
    public function poll(): ?Envelope
61
    {
62 4
        $job = $this->client
63 4
            ->watch($this->queueName)
64 4
            ->reserveWithTimeout(0);
65
66 4
        if (empty($job)) {
67 2
            return null;
68
        }
69
70 4
        $serialized = $job->getData();
71
72 4
        if (empty($serialized)) {
73
            return null;
74
        }
75
76 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
77
    }
78
79 4
    public function peek(): ?Envelope
80
    {
81 4
        $job = $this->client
82 4
            ->watch($this->queueName)
83 4
            ->peekReady();
84
85 4
        if (empty($job)) {
86
            return null;
87
        }
88
89 4
        $serialized = $job->getData();
90
91 4
        if (empty($serialized)) {
92
            return null;
93
        }
94
95 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
96
    }
97
}
98