Completed
Pull Request — master (#30)
by Matthew
13:27 queued 10:46
created

JobManager   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 287
Duplicated Lines 10.45 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 0%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 44
lcom 1
cbo 11
dl 30
loc 287
ccs 0
cts 163
cp 0
rs 8.3396
c 3
b 0
f 0

20 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 2
A setExchangeArgs() 0 4 1
A setQueueArgs() 0 12 3
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
A checkChannelArgs() 0 9 3
A performChannelSetup() 0 10 2
A setupChannel() 0 9 2
A prioritySave() 0 15 2
A publishJob() 0 7 1
A setJobId() 3 6 2
A setMsgPriority() 0 7 2
A calculatePriority() 0 9 2
A validateSaveable() 10 10 4
B verifyGetJobArgs() 3 6 5
A getJob() 0 12 2
B findJob() 0 22 4
A resetJob() 14 14 2
A retryableSaveHistory() 0 10 2
A __destruct() 0 4 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\BaseJob;
6
use Dtc\QueueBundle\Model\RetryableJob;
7
use Dtc\QueueBundle\Model\JobTiming;
8
use Dtc\QueueBundle\Manager\PriorityJobManager;
9
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
10
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
11
use Dtc\QueueBundle\Exception\PriorityException;
12
use Dtc\QueueBundle\Exception\UnsupportedException;
13
use Dtc\QueueBundle\Manager\RunManager;
14
use Dtc\QueueBundle\Manager\JobTimingManager;
15
use PhpAmqpLib\Channel\AMQPChannel;
16
use PhpAmqpLib\Connection\AbstractConnection;
17
use PhpAmqpLib\Message\AMQPMessage;
18
19
class JobManager extends PriorityJobManager
20
{
21
    /** @var AMQPChannel */
22
    protected $channel;
23
24
    /** @var AbstractConnection */
25
    protected $connection;
26
    protected $queueArgs;
27
    protected $exchangeArgs;
28
29
    protected $channelSetup = false;
30
31
    protected $hostname;
32
    protected $pid;
33
34
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
35
    {
36
        $this->hostname = gethostname() ?: '';
37
        $this->pid = getmypid();
38
        parent::__construct($runManager, $jobTimingManager, $jobClass);
39
    }
40
41
    /**
42
     * @param string $exchange
43
     * @param string $type
44
     * @param bool   $passive
45
     * @param bool   $durable
46
     * @param bool   $autoDelete
47
     */
48
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
49
    {
50
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
51
    }
52
53
    /**
54
     * @param string $queue
55
     * @param bool   $passive
56
     * @param bool   $durable
57
     * @param bool   $exclusive
58
     * @param bool   $autoDelete
59
     *
60
     * @throws PriorityException
61
     */
62
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
63
    {
64
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
65
66
        $this->queueArgs = $arguments;
67
        if (!ctype_digit(strval($this->maxPriority))) {
68
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
69
        }
70
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
71
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
72
        }
73
    }
74
75
    public function setAMQPConnection(AbstractConnection $connection)
76
    {
77
        $this->connection = $connection;
78
        $this->channel = $connection->channel();
79
    }
80
81
    /**
82
     * @return AMQPChannel
83
     */
84
    public function getChannel()
85
    {
86
        return $this->channel;
87
    }
88
89
    /**
90
     * @throws ArgumentsNotSetException
91
     */
92
    protected function checkChannelArgs()
93
    {
94
        if (empty($this->queueArgs)) {
95
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
96
        }
97
        if (empty($this->exchangeArgs)) {
98
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
99
        }
100
    }
101
102
    protected function performChannelSetup()
103
    {
104
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
105
        if ($this->maxPriority) {
106
            array_push($this->queueArgs, false);
107
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
108
        }
109
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
110
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
111
    }
112
113
    /**
114
     * @throws ArgumentsNotSetException
115
     */
116
    public function setupChannel()
117
    {
118
        $this->checkChannelArgs();
119
120
        if (!$this->channelSetup) {
121
            $this->performChannelSetup();
122
            $this->channelSetup = true;
123
        }
124
    }
125
126
    /**
127
     * @param \Dtc\QueueBundle\Model\Job $job
128
     *
129
     * @return \Dtc\QueueBundle\Model\Job
130
     *
131
     * @throws ClassNotSubclassException
132
     * @throws PriorityException
133
     * @throws ArgumentsNotSetException
134
     */
135
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
136
    {
137
        if (!$job instanceof Job) {
138
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
139
        }
140
141
        $this->setupChannel();
142
143
        $this->validateSaveable($job);
144
        $this->setJobId($job);
145
146
        $this->publishJob($job);
147
148
        return $job;
149
    }
150
151
    protected function publishJob(Job $job)
152
    {
153
        $msg = new AMQPMessage($job->toMessage());
154
        $this->setMsgPriority($msg, $job);
155
156
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
157
    }
158
159
    /**
160
     * Attach a unique id to a job since RabbitMQ will not.
161
     *
162
     * @param \Dtc\QueueBundle\Model\Job $job
163
     */
164
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
165
    {
166 View Code Duplication
        if (!$job->getId()) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
167
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
168
        }
169
    }
170
171
    /**
172
     * Sets the priority of the AMQPMessage.
173
     *
174
     * @param AMQPMessage                $msg
175
     * @param \Dtc\QueueBundle\Model\Job $job
176
     */
177
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
178
    {
179
        if (null !== $this->maxPriority) {
180
            $priority = $job->getPriority();
181
            $msg->set('priority', $priority);
182
        }
183
    }
184
185
    protected function calculatePriority($priority)
186
    {
187
        $priority = parent::calculatePriority($priority);
188
        if (null === $priority) {
189
            return 0;
190
        }
191
192
        return $priority;
193
    }
194
195
    /**
196
     * @param \Dtc\QueueBundle\Model\Job $job
197
     *
198
     * @throws PriorityException
199
     * @throws ClassNotSubclassException
200
     */
201 View Code Duplication
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
202
    {
203
        if (null !== $job->getPriority() && null === $this->maxPriority) {
204
            throw new PriorityException('This queue does not support priorities');
205
        }
206
207
        if (!$job instanceof Job) {
208
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
209
        }
210
    }
211
212
    /**
213
     * @param null $workerName
214
     * @param null $methodName
215
     * @param bool $prioritize
216
     *
217
     * @throws UnsupportedException
218
     */
219
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
220
    {
221 View Code Duplication
        if (null !== $workerName || null !== $methodName || (null !== $this->maxPriority && true !== $prioritize)) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
222
            throw new UnsupportedException('Unsupported');
223
        }
224
    }
225
226
    /**
227
     * @param string $workerName
228
     *
229
     * @throws UnsupportedException
230
     * @throws ArgumentsNotSetException
231
     */
232
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
233
    {
234
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
0 ignored issues
show
Bug introduced by
It seems like $workerName defined by parameter $workerName on line 232 can also be of type string; however, Dtc\QueueBundle\RabbitMQ...ger::verifyGetJobArgs() does only seem to accept null, maybe add an additional type check?

This check looks at variables that have been passed in as parameters and are passed out again to other methods.

If the outgoing method call has stricter type requirements than the method itself, an issue is raised.

An additional type check may prevent trouble.

Loading history...
235
        $this->setupChannel();
236
237
        do {
238
            $expiredJob = false;
239
            $job = $this->findJob($expiredJob, $runId);
240
        } while ($expiredJob);
241
242
        return $job;
243
    }
244
245
    /**
246
     * @param bool $expiredJob
247
     * @param $runId
248
     *
249
     * @return Job|null
250
     */
251
    protected function findJob(&$expiredJob, $runId)
252
    {
253
        $message = $this->channel->basic_get($this->queueArgs[0]);
254
        if ($message) {
255
            $job = new Job();
256
            $job->fromMessage($message->body);
257
            $job->setRunId($runId);
258
259
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
260
                $expiredJob = true;
261
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
262
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
263
264
                return null;
265
            }
266
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
267
268
            return $job;
269
        }
270
271
        return null;
272
    }
273
274 View Code Duplication
    protected function resetJob(RetryableJob $job)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
275
    {
276
        if (!$job instanceof Job) {
277
            throw new \InvalidArgumentException('$job must be instance of '.Job::class);
278
        }
279
        $job->setStatus(BaseJob::STATUS_NEW);
280
        $job->setMessage(null);
281
        $job->setStartedAt(null);
282
        $job->setRetries($job->getRetries() + 1);
283
        $job->setUpdatedAt(new \DateTime());
284
        $this->publishJob($job);
285
286
        return true;
287
    }
288
289
    // Save History get called upon completion of the job
290
    protected function retryableSaveHistory(RetryableJob $job, $retry)
291
    {
292
        if (!$job instanceof Job) {
293
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
294
        }
295
        $deliveryTag = $job->getDeliveryTag();
296
        $this->channel->basic_ack($deliveryTag);
297
298
        return;
299
    }
300
301
    public function __destruct()
302
    {
303
        // There's some kind of problem trying to close the channel, otherwise we'd call $this->channel->close() at this point.
304
    }
305
}
306