Completed
Branch feature/beanstalkd-driver (0ecf85)
by Damian
03:58
created

BeanstalkdQueue::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 3
dl 0
loc 8
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
3
namespace Initx\Querabilis\Driver;
4
5
use Initx\Querabilis\Envelope;
6
use Initx\Querabilis\Exception\IllegalStateException;
7
use Initx\Querabilis\Exception\NoSuchElementException;
8
use Initx\Querabilis\Queue;
9
use JMS\Serializer\SerializerInterface;
10
use Pheanstalk\Contract\PheanstalkInterface;
11
12
final class BeanstalkdQueue implements Queue
13
{
14
    use HasFallbackSerializer;
15
16
    /**
17
     * @var PheanstalkInterface
18
     */
19
    private $client;
20
21
    /**
22
     * @var SerializerInterface
23
     */
24
    private $serializer;
25
26
    /**
27
     * @var string
28
     */
29
    private $queueName;
30
31
    /**
32
     * @param PheanstalkInterface      $client
33
     * @param string                   $queueName
34
     * @param SerializerInterface|null $serializer
35
     */
36 12
    public function __construct(
37
        PheanstalkInterface $client,
38
        string $queueName = PheanstalkInterface::DEFAULT_TUBE,
39
        ?SerializerInterface $serializer = null
40
    ) {
41 12
        $this->client = $client;
42 12
        $this->queueName = $queueName;
43 12
        $this->serializer = $this->fallbackSerializer($serializer);
44 12
    }
45
46 10
    public function add(Envelope $envelope): bool
47
    {
48 10
        if (!$this->offer($envelope)) {
49
            throw new IllegalStateException('Could not write to redis');
50
        }
51
52 10
        return true;
53
    }
54
55 12
    public function offer(Envelope $envelope): bool
56
    {
57 12
        $serialized = $this->serializer->serialize($envelope, 'json');
58
59 12
        return (bool)$this->client
60 12
            ->useTube($this->queueName)
61 12
            ->put($serialized);
62
    }
63
64 2
    public function remove(): Envelope
65
    {
66 2
        $element = $this->poll();
67
68 2
        if (!$element) {
69
            throw new NoSuchElementException();
70
        }
71
72 2
        return $element;
73
    }
74
75 4
    public function poll(): ?Envelope
76
    {
77 4
        $job = $this->client
78 4
            ->watch($this->queueName)
79 4
            ->reserveWithTimeout(0);
80
81 4
        if (empty($job)) {
82 2
            return null;
83
        }
84
85 4
        $serialized = $job->getData();
86
87 4
        if (empty($serialized)) {
88
            return null;
89
        }
90
91 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
92
    }
93
94 2
    public function element(): Envelope
95
    {
96 2
        $element = $this->peek();
97
98 2
        if (!$element) {
99
            throw new NoSuchElementException();
100
        }
101
102 2
        return $element;
103
    }
104
105 4
    public function peek(): ?Envelope
106
    {
107 4
        $job = $this->client
108 4
            ->watch($this->queueName)
109 4
            ->peekReady();
110
111 4
        if (empty($job)) {
112
            return null;
113
        }
114
115 4
        $serialized = $job->getData();
116
117 4
        if (empty($serialized)) {
118
            return null;
119
        }
120
121 4
        return $this->serializer->deserialize($serialized, Envelope::class, 'json');
122
    }
123
}
124