BeanstalkdQueue::peek()   A
last analyzed

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 3.072

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 9
c 1
b 0
f 0
nc 3
nop 0
dl 0
loc 17
ccs 8
cts 10
cp 0.8
crap 3.072
rs 9.9666
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Queue;
8
use JMS\Serializer\SerializerInterface;
9
use Pheanstalk\Contract\PheanstalkInterface;
10
11
final class BeanstalkdQueue implements Queue
12
{
13
    use HasFallbackSerializer;
14
    use HasDefaultRemoveAndElement;
15
16
    /**
17
     * @var PheanstalkInterface
18
     */
19
    private $client;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    private $serializer;
25
26
    /**
27
     * @var string
28
     */
29
    private $queueName;
30
31 12
    public function __construct(
32
        PheanstalkInterface $client,
33
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
34
        ?SerializerInterface $serializer = null
35
    ) {
36 12
        $this->client = $client;
37 12
        $this->queueName = $queueName;
38 12
        $this->serializer = $this->fallbackSerializer($serializer);
39 12
    }
40
41 10
    public function add(Envelope $envelope): bool
42
    {
43 10
        if (!$this->offer($envelope)) {
44
            throw new IllegalStateException('Could not write to redis');
45
        }
46
47 10
        return true;
48
    }
49
50 12
    public function offer(Envelope $envelope): bool
51
    {
52 12
        $serialized = $this->serializer->serialize($envelope, 'json');
53
54 12
        return (bool)$this->client
55 12
            ->useTube($this->queueName)
56 12
            ->put($serialized);
57
    }
58
59 4
    public function poll(): ?Envelope
60
    {
61 4
        $job = $this->client
62 4
            ->watch($this->queueName)
63 4
            ->reserveWithTimeout(0);
64
65 4
        if (empty($job)) {
66 2
            return null;
67
        }
68
69 4
        $serialized = $job->getData();
70
71 4
        if (empty($serialized)) {
72
            return null;
73
        }
74
75 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
76
    }
77
78 4
    public function peek(): ?Envelope
79
    {
80 4
        $job = $this->client
81 4
            ->watch($this->queueName)
82 4
            ->peekReady();
83
84 4
        if (empty($job)) {
85
            return null;
86
        }
87
88 4
        $serialized = $job->getData();
89
90 4
        if (empty($serialized)) {
91
            return null;
92
        }
93
94 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
95
    }
96
}
97