Completed
Push — master ( 859c70...b3ba31 )
by Matthew
06:19
created

JobManager::checkChannelArgs()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 6
cts 6
cp 1
rs 9.6666
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 0
crap 3
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\PriorityJobManager;
6
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
7
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
8
use Dtc\QueueBundle\Exception\PriorityException;
9
use Dtc\QueueBundle\Exception\UnsupportedException;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Connection\AbstractConnection;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class JobManager extends PriorityJobManager
15
{
16
    /** @var AMQPChannel */
17
    protected $channel;
18
19
    /** @var AbstractConnection */
20
    protected $connection;
21
    protected $queueArgs;
22
    protected $exchangeArgs;
23
24
    protected $channelSetup = false;
25
26
    protected $hostname;
27
    protected $pid;
28
29 2
    public function __construct()
30
    {
31 2
        $this->hostname = gethostname() ?: '';
32 2
        $this->pid = getmypid();
33 2
    }
34
35
    /**
36
     * @param string $exchange
37
     * @param string $type
38
     * @param bool   $passive
39
     * @param bool   $durable
40
     * @param bool   $autoDelete
41
     */
42 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
43
    {
44 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
45 1
    }
46
47
    /**
48
     * @param string $queue
49
     * @param bool   $passive
50
     * @param bool   $durable
51
     * @param bool   $exclusive
52
     * @param bool   $autoDelete
53
     */
54 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
55
    {
56 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
57
58 1
        $this->queueArgs = $arguments;
59 1
        if (!ctype_digit(strval($this->maxPriority))) {
60 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
61
        }
62 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
63
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
64
        }
65 1
    }
66
67 1
    public function setAMQPConnection(AbstractConnection $connection)
68
    {
69 1
        $this->connection = $connection;
70 1
        $this->channel = $connection->channel();
71 1
    }
72
73
    /**
74
     * @return AMQPChannel
75
     */
76
    public function getChannel()
77
    {
78
        return $this->channel;
79
    }
80
81 7
    protected function checkChannelArgs()
82
    {
83 7
        if (empty($this->queueArgs)) {
84 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
85
        }
86 7
        if (empty($this->exchangeArgs)) {
87 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
88
        }
89 7
    }
90
91 1
    protected function performChannelSetup()
92
    {
93 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
94 1
        if ($this->maxPriority) {
95 1
            array_push($this->queueArgs, false);
96 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
97
        }
98 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
99 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
100 1
    }
101
102 7
    public function setupChannel()
103
    {
104 7
        $this->checkChannelArgs();
105
106 7
        if (!$this->channelSetup) {
107 1
            $this->performChannelSetup();
108 1
            $this->channelSetup = true;
109
        }
110 7
    }
111
112
    /**
113
     * @param \Dtc\QueueBundle\Model\Job $job
114
     *
115
     * @return \Dtc\QueueBundle\Model\Job
116
     *
117
     * @throws ClassNotSubclassException
118
     */
119 6
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
120
    {
121 6
        if (!$job instanceof Job) {
122 1
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
123
        }
124
125 6
        $this->setupChannel();
126
127 6
        $this->validateSaveable($job);
128 6
        $this->setJobId($job);
129
130 6
        $msg = new AMQPMessage($job->toMessage());
131 6
        $this->setMsgPriority($msg, $job);
132
133 6
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
134
135 6
        return $job;
136
    }
137
138
    /**
139
     * Attach a unique id to a job since RabbitMQ will not.
140
     *
141
     * @param \Dtc\QueueBundle\Model\Job $job
142
     */
143 6
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
144
    {
145 6
        if (!$job->getId()) {
146 6
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
147
        }
148 6
    }
149
150
    /**
151
     * Sets the priority of the AMQPMessage.
152
     *
153
     * @param AMQPMessage                $msg
154
     * @param \Dtc\QueueBundle\Model\Job $job
155
     */
156 6
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
157
    {
158 6
        if (null !== $this->maxPriority) {
159 6
            $priority = $job->getPriority();
160 6
            $msg->set('priority', $priority);
161
        }
162 6
    }
163
164 6
    protected function calculatePriority($priority)
165
    {
166 6
        $priority = parent::calculatePriority($priority);
167 6
        if (null === $priority) {
168 5
            return 0;
169
        }
170
171 3
        return $priority;
172
    }
173
174
    /**
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     *
177
     * @throws PriorityException
178
     * @throws ClassNotSubclassException
179
     */
180 6
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
181
    {
182 6
        if (null !== $job->getPriority() && null === $this->maxPriority) {
183
            throw new PriorityException('This queue does not support priorities');
184
        }
185
186 6
        if (!$job instanceof Job) {
187
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
188
        }
189 6
    }
190
191 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
192
    {
193 7
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
194 2
            throw new UnsupportedException('Unsupported');
195
        }
196 6
    }
197
198
    /**
199
     * @param string $workerName
200
     */
201 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
202
    {
203 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
204 6
        $this->setupChannel();
205
206
        do {
207 6
            $expiredJob = false;
208 6
            $job = $this->findJob($expiredJob, $runId);
209 6
        } while ($expiredJob);
210
211 6
        return $job;
212
    }
213
214
    /**
215
     * @param bool $expiredJob
216
     * @param $runId
217
     *
218
     * @return Job|null
219
     */
220 6
    protected function findJob(&$expiredJob, $runId)
221
    {
222 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
223 6
        if ($message) {
224 6
            $job = new Job();
225 6
            $job->fromMessage($message->body);
226 6
            $job->setRunId($runId);
227
228 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
229 1
                $expiredJob = true;
230 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
231
232 1
                return null;
233
            }
234 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
235
236 6
            return $job;
237
        }
238
239 3
        return null;
240
    }
241
242
    // Save History get called upon completion of the job
243 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
244
    {
245 1
        if (!$job instanceof Job) {
246 1
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
247
        }
248 1
        $deliveryTag = $job->getDeliveryTag();
249 1
        $this->channel->basic_ack($deliveryTag);
250
251 1
        return;
252
    }
253
254 3
    public function __destruct()
255
    {
256 3
        if (null !== $this->channel) {
257 2
            $this->channel->close();
258
        }
259 3
    }
260
}
261