Completed
Push — master ( 7fae80...8ffdf8 )
by Matthew
07:25
created

JobManager::resetJob()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 15
Code Lines 11

Duplication

Lines 15
Ratio 100 %

Code Coverage

Tests 10
CRAP Score 2.003

Importance

Changes 0
Metric Value
dl 15
loc 15
ccs 10
cts 11
cp 0.9091
rs 9.4285
c 0
b 0
f 0
nc 2
cc 2
eloc 11
nop 1
crap 2.003
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Manager\VerifyTrait;
6
use Dtc\QueueBundle\Model\BaseJob;
7
use Dtc\QueueBundle\Manager\JobIdTrait;
8
use Dtc\QueueBundle\Model\RetryableJob;
9
use Dtc\QueueBundle\Model\JobTiming;
10
use Dtc\QueueBundle\Manager\PriorityJobManager;
11
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
12
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
13
use Dtc\QueueBundle\Exception\PriorityException;
14
use Dtc\QueueBundle\Exception\UnsupportedException;
15
use Dtc\QueueBundle\Manager\RunManager;
16
use Dtc\QueueBundle\Manager\JobTimingManager;
17
use Dtc\QueueBundle\Util\Util;
18
use PhpAmqpLib\Channel\AMQPChannel;
19
use PhpAmqpLib\Connection\AbstractConnection;
20
use PhpAmqpLib\Message\AMQPMessage;
21
22
class JobManager extends PriorityJobManager
23
{
24
    use JobIdTrait;
25
    use VerifyTrait;
26
27
    /** @var AMQPChannel */
28
    protected $channel;
29
30
    /** @var AbstractConnection */
31
    protected $connection;
32
    protected $queueArgs;
33
    protected $exchangeArgs;
34
35
    protected $channelSetup = false;
36
37
    protected $hostname;
38
    protected $pid;
39
40 2
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
41
    {
42 2
        $this->hostname = gethostname() ?: '';
43 2
        $this->pid = getmypid();
44 2
        parent::__construct($runManager, $jobTimingManager, $jobClass);
45 2
    }
46
47
    /**
48
     * @param string $exchange
49
     * @param string $type
50
     * @param bool   $passive
51
     * @param bool   $durable
52
     * @param bool   $autoDelete
53
     */
54 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
55
    {
56 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
57 1
    }
58
59
    /**
60
     * @param string $queue
61
     * @param bool   $passive
62
     * @param bool   $durable
63
     * @param bool   $exclusive
64
     * @param bool   $autoDelete
65
     *
66
     * @throws PriorityException
67
     */
68 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
69
    {
70 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
71
72 1
        $this->queueArgs = $arguments;
73 1
        if (!ctype_digit(strval($this->maxPriority))) {
74 1
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
75
        }
76 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
77
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
78
        }
79 1
    }
80
81 1
    public function setAMQPConnection(AbstractConnection $connection)
82
    {
83 1
        $this->connection = $connection;
84 1
        $this->channel = $connection->channel();
85 1
    }
86
87
    /**
88
     * @return AMQPChannel
89
     */
90 1
    public function getChannel()
91
    {
92 1
        return $this->channel;
93
    }
94
95
    /**
96
     * @throws ArgumentsNotSetException
97
     */
98 11
    protected function checkChannelArgs()
99
    {
100 11
        if (empty($this->queueArgs)) {
101 1
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
102
        }
103 11
        if (empty($this->exchangeArgs)) {
104 1
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
105
        }
106 11
    }
107
108 1
    protected function performChannelSetup()
109
    {
110 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
111 1
        if ($this->maxPriority) {
112 1
            array_push($this->queueArgs, false);
113 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
114
        }
115 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
116 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
117 1
    }
118
119
    /**
120
     * @throws ArgumentsNotSetException
121
     */
122 11
    public function setupChannel()
123
    {
124 11
        $this->checkChannelArgs();
125
126 11
        if (!$this->channelSetup) {
127 1
            $this->performChannelSetup();
128 1
            $this->channelSetup = true;
129
        }
130 11
    }
131
132
    /**
133
     * @param \Dtc\QueueBundle\Model\Job $job
134
     *
135
     * @return \Dtc\QueueBundle\Model\Job
136
     *
137
     * @throws ClassNotSubclassException
138
     * @throws PriorityException
139
     * @throws ArgumentsNotSetException
140
     */
141 9
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
142
    {
143 9
        if (!$job instanceof Job) {
144
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
145
        }
146
147 9
        $this->setupChannel();
148
149 9
        $this->validateSaveable($job);
150 9
        $this->setJobId($job);
151
152 9
        $this->publishJob($job);
153
154 9
        return $job;
155
    }
156
157 9
    protected function publishJob(Job $job)
158
    {
159 9
        $msg = new AMQPMessage($job->toMessage());
160 9
        $this->setMsgPriority($msg, $job);
161
162 9
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
163 9
    }
164
165
    /**
166
     * Sets the priority of the AMQPMessage.
167
     *
168
     * @param AMQPMessage                $msg
169
     * @param \Dtc\QueueBundle\Model\Job $job
170
     */
171 9
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
172
    {
173 9
        if (null !== $this->maxPriority) {
174 9
            $priority = $job->getPriority();
175 9
            $msg->set('priority', $priority);
176
        }
177 9
    }
178
179 9
    protected function calculatePriority($priority)
180
    {
181 9
        $priority = parent::calculatePriority($priority);
182 9
        if (null === $priority) {
183 8
            return 0;
184
        }
185
186 3
        return $priority;
187
    }
188
189
    /**
190
     * @param \Dtc\QueueBundle\Model\Job $job
191
     *
192
     * @throws PriorityException
193
     * @throws ClassNotSubclassException
194
     */
195 9 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
196
    {
197 9
        if (null !== $job->getPriority() && null === $this->maxPriority) {
198
            throw new PriorityException('This queue does not support priorities');
199
        }
200
201 9
        if (!$job instanceof Job) {
202
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
203
        }
204 9
    }
205
206
    /**
207
     * @param string|null $workerName
208
     * @param string|null $methodName
209
     *
210
     * @throws UnsupportedException
211
     * @throws ArgumentsNotSetException
212
     */
213 9
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
214
    {
215 9
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
216 8
        $this->setupChannel();
217
218
        do {
219 8
            $expiredJob = false;
220 8
            $job = $this->findJob($expiredJob, $runId);
221 8
        } while ($expiredJob);
222
223 8
        return $job;
224
    }
225
226
    /**
227
     * @param bool $expiredJob
228
     * @param $runId
229
     *
230
     * @return Job|null
231
     */
232 8
    protected function findJob(&$expiredJob, $runId)
233
    {
234 8
        $message = $this->channel->basic_get($this->queueArgs[0]);
235 8
        if ($message) {
236 8
            $job = new Job();
237 8
            $job->fromMessage($message->body);
238 8
            $job->setRunId($runId);
239
240 8
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
241 1
                $expiredJob = true;
242 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
243 1
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
244
245 1
                return null;
246
            }
247 8
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
248
249 8
            return $job;
250
        }
251
252 5
        return null;
253
    }
254
255 2 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
256
    {
257 2
        if (!$job instanceof Job) {
258
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
259
        }
260 2
        $job->setStatus(BaseJob::STATUS_NEW);
261 2
        $job->setMessage(null);
262 2
        $job->setStartedAt(null);
263 2
        $job->setDeliveryTag(null);
264 2
        $job->setRetries($job->getRetries() + 1);
265 2
        $job->setUpdatedAt(Util::getMicrotimeDateTime());
266 2
        $this->publishJob($job);
267
268 2
        return true;
269
    }
270
271
    // Save History get called upon completion of the job
272 4
    protected function retryableSaveHistory(RetryableJob $job, $retry)
273
    {
274 4
        if (!$job instanceof Job) {
275
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
276
        }
277 4
        $deliveryTag = $job->getDeliveryTag();
278 4
        if (null !== $deliveryTag) {
279 4
            $this->channel->basic_ack($deliveryTag);
280
        }
281
282 4
        return;
283
    }
284
285 3
    public function getWaitingJobCount($workerName = null, $methodName = null)
286
    {
287 3
        $this->setupChannel();
288
289 3
        if ($workerName) {
290 1
            throw new UnsupportedException('Waiting Job Count by $workerName is not supported');
291
        }
292 3
        if ($methodName) {
293 1
            throw new UnsupportedException('Waiting Job Count by $methodName is not supported');
294
        }
295
296 3
        $count = call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
297
298 3
        return isset($count[1]) ? $count[1] : 0;
299
    }
300
301 2
    public function __destruct()
302
    {
303
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
304 2
    }
305
}
306