BeanstalkdQueue   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 84
Duplicated Lines 0 %

Test Coverage

Coverage 88.24%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 34
c 1
b 0
f 0
dl 0
loc 84
ccs 30
cts 34
cp 0.8824
rs 10
wmc 10

5 Methods

Rating   Name   Duplication   Size   Complexity  
A poll() 0 17 3
A add() 0 7 2
A peek() 0 17 3
A __construct() 0 8 1
A offer() 0 7 1
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Queue;
8
use JMS\Serializer\SerializerInterface;
9
use Pheanstalk\Contract\PheanstalkInterface;
10
11
final class BeanstalkdQueue implements Queue
12
{
13
    use HasFallbackSerializer;
14
    use HasDefaultRemoveAndElement;
15
16
    /**
17
     * @var PheanstalkInterface
18
     */
19
    private $client;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    private $serializer;
25
26
    /**
27
     * @var string
28
     */
29
    private $queueName;
30
31 12
    public function __construct(
32
        PheanstalkInterface $client,
33
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
34
        ?SerializerInterface $serializer = null
35
    ) {
36 12
        $this->client = $client;
37 12
        $this->queueName = $queueName;
38 12
        $this->serializer = $this->fallbackSerializer($serializer);
39 12
    }
40
41 10
    public function add(Envelope $envelope): bool
42
    {
43 10
        if (!$this->offer($envelope)) {
44
            throw new IllegalStateException('Could not write to redis');
45
        }
46
47 10
        return true;
48
    }
49
50 12
    public function offer(Envelope $envelope): bool
51
    {
52 12
        $serialized = $this->serializer->serialize($envelope, 'json');
53
54 12
        return (bool)$this->client
55 12
            ->useTube($this->queueName)
56 12
            ->put($serialized);
57
    }
58
59 4
    public function poll(): ?Envelope
60
    {
61 4
        $job = $this->client
62 4
            ->watch($this->queueName)
63 4
            ->reserveWithTimeout(0);
64
65 4
        if (empty($job)) {
66 2
            return null;
67
        }
68
69 4
        $serialized = $job->getData();
70
71 4
        if (empty($serialized)) {
72
            return null;
73
        }
74
75 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
76
    }
77
78 4
    public function peek(): ?Envelope
79
    {
80 4
        $job = $this->client
81 4
            ->watch($this->queueName)
82 4
            ->peekReady();
83
84 4
        if (empty($job)) {
85
            return null;
86
        }
87
88 4
        $serialized = $job->getData();
89
90 4
        if (empty($serialized)) {
91
            return null;
92
        }
93
94 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
95
    }
96
}
97