Completed
Pull Request — master (#30)
by Matthew
14:29
created

JobManager::setAMQPConnection()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 5
ccs 4
cts 4
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Manager\PriorityJobManager;
9
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
10
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
11
use Dtc\QueueBundle\Exception\PriorityException;
12
use Dtc\QueueBundle\Exception\UnsupportedException;
13
use Dtc\QueueBundle\Manager\RunManager;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use Dtc\QueueBundle\Util\Util;
16
use PhpAmqpLib\Channel\AMQPChannel;
17
use PhpAmqpLib\Connection\AbstractConnection;
18
use PhpAmqpLib\Message\AMQPMessage;
19
20
class JobManager extends PriorityJobManager
21
{
22
    /** @var AMQPChannel */
23
    protected $channel;
24
25
    /** @var AbstractConnection */
26
    protected $connection;
27
    protected $queueArgs;
28
    protected $exchangeArgs;
29
30
    protected $channelSetup = false;
31
32
    protected $hostname;
33
    protected $pid;
34
35 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
36
    {
37 2
        $this->hostname = gethostname() ?: '';
38 2
        $this->pid = getmypid();
39 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
40 2
    }
41
42
    /**
43
     * @param string $exchange
44
     * @param string $type
45
     * @param bool   $passive
46
     * @param bool   $durable
47
     * @param bool   $autoDelete
48
     */
49 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
50
    {
51 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
52 1
    }
53
54
    /**
55
     * @param string $queue
56
     * @param bool   $passive
57
     * @param bool   $durable
58
     * @param bool   $exclusive
59
     * @param bool   $autoDelete
60
     *
61
     * @throws PriorityException
62
     */
63 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
64
    {
65 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
66
67 1
        $this->queueArgs = $arguments;
68 1
        if (!ctype_digit(strval($this->maxPriority))) {
69 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
70
        }
71 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
72
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
73
        }
74 1
    }
75
76 1
    public function setAMQPConnection(AbstractConnection $connection)
77
    {
78 1
        $this->connection = $connection;
79 1
        $this->channel = $connection->channel();
80 1
    }
81
82
    /**
83
     * @return AMQPChannel
84
     */
85
    public function getChannel()
86
    {
87
        return $this->channel;
88
    }
89
90
    /**
91
     * @throws ArgumentsNotSetException
92
     */
93 9
    protected function checkChannelArgs()
94
    {
95 9
        if (empty($this->queueArgs)) {
96 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
97
        }
98 9
        if (empty($this->exchangeArgs)) {
99 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
100
        }
101 9
    }
102
103 1
    protected function performChannelSetup()
104
    {
105 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
106 1
        if ($this->maxPriority) {
107 1
            array_push($this->queueArgs, false);
108 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
109
        }
110 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
111 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
112 1
    }
113
114
    /**
115
     * @throws ArgumentsNotSetException
116
     */
117 9
    public function setupChannel()
118
    {
119 9
        $this->checkChannelArgs();
120
121 9
        if (!$this->channelSetup) {
122 1
            $this->performChannelSetup();
123 1
            $this->channelSetup = true;
124
        }
125 9
    }
126
127
    /**
128
     * @param \Dtc\QueueBundle\Model\Job $job
129
     *
130
     * @return \Dtc\QueueBundle\Model\Job
131
     *
132
     * @throws ClassNotSubclassException
133
     * @throws PriorityException
134
     * @throws ArgumentsNotSetException
135
     */
136 8
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
137
    {
138 8
        if (!$job instanceof Job) {
139
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
140
        }
141
142 8
        $this->setupChannel();
143
144 8
        $this->validateSaveable($job);
145 8
        $this->setJobId($job);
146
147 8
        $this->publishJob($job);
148
149 8
        return $job;
150
    }
151
152 8
    protected function publishJob(Job $job)
153
    {
154 8
        $msg = new AMQPMessage($job->toMessage());
155 8
        $this->setMsgPriority($msg, $job);
156
157 8
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
158 8
    }
159
160
    /**
161
     * Attach a unique id to a job since RabbitMQ will not.
162
     *
163
     * @param \Dtc\QueueBundle\Model\Job $job
164
     */
165 8
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
166
    {
167 8 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
168 8
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
169
        }
170 8
    }
171
172
    /**
173
     * Sets the priority of the AMQPMessage.
174
     *
175
     * @param AMQPMessage                $msg
176
     * @param \Dtc\QueueBundle\Model\Job $job
177
     */
178 8
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
179
    {
180 8
        if (null !== $this->maxPriority) {
181 8
            $priority = $job->getPriority();
182 8
            $msg->set('priority', $priority);
183
        }
184 8
    }
185
186 8
    protected function calculatePriority($priority)
187
    {
188 8
        $priority = parent::calculatePriority($priority);
189 8
        if (null === $priority) {
190 7
            return 0;
191
        }
192
193 3
        return $priority;
194
    }
195
196
    /**
197
     * @param \Dtc\QueueBundle\Model\Job $job
198
     *
199
     * @throws PriorityException
200
     * @throws ClassNotSubclassException
201
     */
202 8 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
203
    {
204 8
        if (null !== $job->getPriority() && null === $this->maxPriority) {
205
            throw new PriorityException('This queue does not support priorities');
206
        }
207
208 8
        if (!$job instanceof Job) {
209
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
210
        }
211 8
    }
212
213
    /**
214
     * @param string|null $workerName
215
     * @param string|null $methodName
216
     * @param bool        $prioritize
217
     *
218
     * @throws UnsupportedException
219
     */
220 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
221
    {
222 7 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
223 2
            throw new UnsupportedException('Unsupported');
224
        }
225 6
    }
226
227
    /**
228
     * @param string|null $workerName
229
     * @param string|null $methodName
230
     *
231
     * @throws UnsupportedException
232
     * @throws ArgumentsNotSetException
233
     */
234 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
235
    {
236 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
237 6
        $this->setupChannel();
238
239
        do {
240 6
            $expiredJob = false;
241 6
            $job = $this->findJob($expiredJob, $runId);
242 6
        } while ($expiredJob);
243
244 6
        return $job;
245
    }
246
247
    /**
248
     * @param bool $expiredJob
249
     * @param $runId
250
     *
251
     * @return Job|null
252
     */
253 6
    protected function findJob(&$expiredJob, $runId)
254
    {
255 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
256 6
        if ($message) {
257 6
            $job = new Job();
258 6
            $job->fromMessage($message->body);
259 6
            $job->setRunId($runId);
260
261 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
262 1
                $expiredJob = true;
263 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
264 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
265
266 1
                return null;
267
            }
268 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
269
270 6
            return $job;
271
        }
272
273 3
        return null;
274
    }
275
276 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
277
    {
278 2
        if (!$job instanceof Job) {
279
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
280
        }
281 2
        $job->setStatus(BaseJob::STATUS_NEW);
282 2
        $job->setMessage(null);
283 2
        $job->setStartedAt(null);
284 2
        $job->setRetries($job->getRetries() + 1);
285 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
286 2
        $this->publishJob($job);
287
288 2
        return true;
289
    }
290
291
    // Save History get called upon completion of the job
292 3
    protected function retryableSaveHistory(RetryableJob $job, $retry)
293
    {
294 3
        if (!$job instanceof Job) {
295
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
296
        }
297 3
        $deliveryTag = $job->getDeliveryTag();
298 3
        $this->channel->basic_ack($deliveryTag);
299
300 3
        return;
301
    }
302
303 2
    public function __destruct()
304
    {
305
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
306 2
    }
307
}
308