Completed
Push — master ( 5fe2ea...bba9f0 )
by Matthew
13:23
created

JobManager   B

Complexity

Total Complexity 41

Size/Duplication

Total Lines 249
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Test Coverage

Coverage 0%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 41
lcom 1
cbo 11
dl 0
loc 249
ccs 0
cts 149
cp 0
rs 8.2769
c 3
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 2
A setExchangeArgs() 0 4 1
A setQueueArgs() 0 12 3
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
A checkChannelArgs() 0 9 3
A performChannelSetup() 0 10 2
A setupChannel() 0 9 2
A prioritySave() 0 18 2
A setJobId() 0 6 2
A setMsgPriority() 0 7 2
A calculatePriority() 0 9 2
A validateSaveable() 0 10 4
A verifyGetJobArgs() 0 6 4
A getJob() 0 12 2
B findJob() 0 22 4
A saveHistory() 0 10 2
A __destruct() 0 6 2

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\JobTiming;
6
use Dtc\QueueBundle\Model\PriorityJobManager;
7
use Dtc\QueueBundle\Exception\ArgumentsNotSetException;
8
use Dtc\QueueBundle\Exception\ClassNotSubclassException;
9
use Dtc\QueueBundle\Exception\PriorityException;
10
use Dtc\QueueBundle\Exception\UnsupportedException;
11
use Dtc\QueueBundle\Model\RunManager;
12
use Dtc\QueueBundle\Model\JobTimingManager;
13
use PhpAmqpLib\Channel\AMQPChannel;
14
use PhpAmqpLib\Connection\AbstractConnection;
15
use PhpAmqpLib\Message\AMQPMessage;
16
17
class JobManager extends PriorityJobManager
18
{
19
    /** @var AMQPChannel */
20
    protected $channel;
21
22
    /** @var AbstractConnection */
23
    protected $connection;
24
    protected $queueArgs;
25
    protected $exchangeArgs;
26
27
    protected $channelSetup = false;
28
29
    protected $hostname;
30
    protected $pid;
31
32
    public function __construct(RunManager $runManager, JobTimingManager $jobTimingManager, $jobClass)
33
    {
34
        $this->hostname = gethostname() ?: '';
35
        $this->pid = getmypid();
36
        parent::__construct($runManager, $jobTimingManager, $jobClass);
37
    }
38
39
    /**
40
     * @param string $exchange
41
     * @param string $type
42
     * @param bool   $passive
43
     * @param bool   $durable
44
     * @param bool   $autoDelete
45
     */
46
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
47
    {
48
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
49
    }
50
51
    /**
52
     * @param string $queue
53
     * @param bool   $passive
54
     * @param bool   $durable
55
     * @param bool   $exclusive
56
     * @param bool   $autoDelete
57
     */
58
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
59
    {
60
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
61
62
        $this->queueArgs = $arguments;
63
        if (!ctype_digit(strval($this->maxPriority))) {
64
            throw new PriorityException('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
65
        }
66
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
67
            throw new PriorityException('Priority is higher than '.PHP_INT_MAX);
68
        }
69
    }
70
71
    public function setAMQPConnection(AbstractConnection $connection)
72
    {
73
        $this->connection = $connection;
74
        $this->channel = $connection->channel();
75
    }
76
77
    /**
78
     * @return AMQPChannel
79
     */
80
    public function getChannel()
81
    {
82
        return $this->channel;
83
    }
84
85
    protected function checkChannelArgs()
86
    {
87
        if (empty($this->queueArgs)) {
88
            throw new ArgumentsNotSetException(__METHOD__.': queue args need to be set via setQueueArgs(...)');
89
        }
90
        if (empty($this->exchangeArgs)) {
91
            throw new ArgumentsNotSetException(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
92
        }
93
    }
94
95
    protected function performChannelSetup()
96
    {
97
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
98
        if ($this->maxPriority) {
99
            array_push($this->queueArgs, false);
100
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
101
        }
102
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
103
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
104
    }
105
106
    public function setupChannel()
107
    {
108
        $this->checkChannelArgs();
109
110
        if (!$this->channelSetup) {
111
            $this->performChannelSetup();
112
            $this->channelSetup = true;
113
        }
114
    }
115
116
    /**
117
     * @param \Dtc\QueueBundle\Model\Job $job
118
     *
119
     * @return \Dtc\QueueBundle\Model\Job
120
     *
121
     * @throws ClassNotSubclassException
122
     */
123
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
124
    {
125
        if (!$job instanceof Job) {
126
            throw new ClassNotSubclassException('Must be derived from '.Job::class);
127
        }
128
129
        $this->setupChannel();
130
131
        $this->validateSaveable($job);
132
        $this->setJobId($job);
133
134
        $msg = new AMQPMessage($job->toMessage());
135
        $this->setMsgPriority($msg, $job);
136
137
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
138
139
        return $job;
140
    }
141
142
    /**
143
     * Attach a unique id to a job since RabbitMQ will not.
144
     *
145
     * @param \Dtc\QueueBundle\Model\Job $job
146
     */
147
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
148
    {
149
        if (!$job->getId()) {
150
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
151
        }
152
    }
153
154
    /**
155
     * Sets the priority of the AMQPMessage.
156
     *
157
     * @param AMQPMessage                $msg
158
     * @param \Dtc\QueueBundle\Model\Job $job
159
     */
160
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
161
    {
162
        if (null !== $this->maxPriority) {
163
            $priority = $job->getPriority();
164
            $msg->set('priority', $priority);
165
        }
166
    }
167
168
    protected function calculatePriority($priority)
169
    {
170
        $priority = parent::calculatePriority($priority);
171
        if (null === $priority) {
172
            return 0;
173
        }
174
175
        return $priority;
176
    }
177
178
    /**
179
     * @param \Dtc\QueueBundle\Model\Job $job
180
     *
181
     * @throws PriorityException
182
     * @throws ClassNotSubclassException
183
     */
184
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
185
    {
186
        if (null !== $job->getPriority() && null === $this->maxPriority) {
187
            throw new PriorityException('This queue does not support priorities');
188
        }
189
190
        if (!$job instanceof Job) {
191
            throw new ClassNotSubclassException('Job needs to be instance of '.Job::class);
192
        }
193
    }
194
195
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
196
    {
197
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
198
            throw new UnsupportedException('Unsupported');
199
        }
200
    }
201
202
    /**
203
     * @param string $workerName
204
     */
205
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
206
    {
207
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
208
        $this->setupChannel();
209
210
        do {
211
            $expiredJob = false;
212
            $job = $this->findJob($expiredJob, $runId);
213
        } while ($expiredJob);
214
215
        return $job;
216
    }
217
218
    /**
219
     * @param bool $expiredJob
220
     * @param $runId
221
     *
222
     * @return Job|null
223
     */
224
    protected function findJob(&$expiredJob, $runId)
225
    {
226
        $message = $this->channel->basic_get($this->queueArgs[0]);
227
        if ($message) {
228
            $job = new Job();
229
            $job->fromMessage($message->body);
230
            $job->setRunId($runId);
231
232
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
233
                $expiredJob = true;
234
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
235
                $this->jobTiminigManager->recordTiming(JobTiming::STATUS_FINISHED_EXPIRED);
236
237
                return null;
238
            }
239
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
240
241
            return $job;
242
        }
243
244
        return null;
245
    }
246
247
    // Save History get called upon completion of the job
248
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
249
    {
250
        if (!$job instanceof Job) {
251
            throw new ClassNotSubclassException("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
252
        }
253
        $deliveryTag = $job->getDeliveryTag();
254
        $this->channel->basic_ack($deliveryTag);
255
256
        return;
257
    }
258
259
    public function __destruct()
260
    {
261
        if (null !== $this->channel) {
262
            $this->channel->close();
263
        }
264
    }
265
}
266