Completed
Push — master ( f30a66...783f1c )
by Matthew
08:20 queued 39s
created

JobManager   B

Complexity

Total Complexity 41

Size/Duplication

Total Lines 246
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 95.24%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 41
lcom 1
cbo 6
dl 0
loc 246
ccs 100
cts 105
cp 0.9524
rs 8.2769
c 2
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 2
A setExchangeArgs() 0 4 1
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
A setQueueArgs() 0 12 3
A checkChannelArgs() 0 9 3
A performChannelSetup() 0 10 2
A setupChannel() 0 9 2
A prioritySave() 0 18 2
A setJobId() 0 6 2
A setMsgPriority() 0 7 2
A calculatePriority() 0 9 2
A validateSaveable() 0 10 4
A verifyGetJobArgs() 0 6 4
A getJob() 0 12 2
A findJob() 0 21 4
A saveHistory() 0 10 2
A __destruct() 0 6 2

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\PriorityJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends PriorityJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
25 2
    public function __construct()
26
    {
27 2
        $this->hostname = gethostname() ?: '';
28 2
        $this->pid = getmypid();
29 2
    }
30
31
    /**
32
     * @param string $exchange
33
     * @param string $type
34
     * @param bool   $passive
35
     * @param bool   $durable
36
     * @param bool   $autoDelete
37
     */
38 1
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
39
    {
40 1
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
41 1
    }
42
43
    /**
44
     * @param string $queue
45
     * @param bool   $passive
46
     * @param bool   $durable
47
     * @param bool   $exclusive
48
     * @param bool   $autoDelete
49
     */
50 1
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
51
    {
52 1
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
53
54 1
        $this->queueArgs = $arguments;
55 1
        if (!ctype_digit(strval($this->maxPriority))) {
56 1
            throw new \Exception('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
57
        }
58 1
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
59
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
60
        }
61 1
    }
62
63 1
    public function setAMQPConnection(AbstractConnection $connection)
64
    {
65 1
        $this->connection = $connection;
66 1
        $this->channel = $connection->channel();
67 1
    }
68
69
    /**
70
     * @return AMQPChannel
71
     */
72
    public function getChannel()
73
    {
74
        return $this->channel;
75
    }
76
77 7
    protected function checkChannelArgs()
78
    {
79 7
        if (empty($this->queueArgs)) {
80 1
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
81
        }
82 7
        if (empty($this->exchangeArgs)) {
83 1
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
84
        }
85 7
    }
86
87 1
    protected function performChannelSetup()
88
    {
89 1
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
90 1
        if ($this->maxPriority) {
91 1
            array_push($this->queueArgs, false);
92 1
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
93
        }
94 1
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
95 1
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
96 1
    }
97
98 7
    public function setupChannel()
99
    {
100 7
        $this->checkChannelArgs();
101
102 7
        if (!$this->channelSetup) {
103 1
            $this->performChannelSetup();
104 1
            $this->channelSetup = true;
105
        }
106 7
    }
107
108
    /**
109
     * @param \Dtc\QueueBundle\Model\Job $job
110
     *
111
     * @return \Dtc\QueueBundle\Model\Job
112
     *
113
     * @throws \Exception
114
     */
115 6
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
116
    {
117 6
        if (!$job instanceof Job) {
118 1
            throw new \Exception('Must be derived from '.Job::class);
119
        }
120
121 6
        $this->setupChannel();
122
123 6
        $this->validateSaveable($job);
124 6
        $this->setJobId($job);
125
126 6
        $msg = new AMQPMessage($job->toMessage());
127 6
        $this->setMsgPriority($msg, $job);
128
129 6
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
130
131 6
        return $job;
132
    }
133
134
    /**
135
     * Attach a unique id to a job since RabbitMQ will not.
136
     *
137
     * @param \Dtc\QueueBundle\Model\Job $job
138
     */
139 6
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
140
    {
141 6
        if (!$job->getId()) {
142 6
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
143
        }
144 6
    }
145
146
    /**
147
     * Sets the priority of the AMQPMessage.
148
     *
149
     * @param AMQPMessage                $msg
150
     * @param \Dtc\QueueBundle\Model\Job $job
151
     */
152 6
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
153
    {
154 6
        if (null !== $this->maxPriority) {
155 6
            $priority = $job->getPriority();
156 6
            $msg->set('priority', $priority);
157
        }
158 6
    }
159
160 6
    protected function calculatePriority($priority)
161
    {
162 6
        $priority = parent::calculatePriority($priority);
163 6
        if (null === $priority) {
164 5
            return 0;
165
        }
166
167 3
        return $priority;
168
    }
169
170
    /**
171
     * @param \Dtc\QueueBundle\Model\Job $job
172
     *
173
     * @throws \Exception
174
     */
175 6
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
176
    {
177 6
        if (null !== $job->getPriority() && null === $this->maxPriority) {
178
            throw new \Exception('This queue does not support priorities');
179
        }
180
181 6
        if (!$job instanceof Job) {
182
            throw new \Exception('Job needs to be instance of '.Job::class);
183
        }
184 6
    }
185
186 7
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
187
    {
188 7
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
189 2
            throw new \Exception('Unsupported');
190
        }
191 6
    }
192
193
    /**
194
     * @param string $workerName
195
     */
196 7
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
197
    {
198 7
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
199 6
        $this->setupChannel();
200
201
        do {
202 6
            $expiredJob = false;
203 6
            $job = $this->findJob($expiredJob, $runId);
204 6
        } while ($expiredJob);
205
206 6
        return $job;
207
    }
208
209
    /**
210
     * @param bool $expiredJob
211
     * @param $runId
212
     *
213
     * @return Job|null
214
     */
215 6
    protected function findJob(&$expiredJob, $runId)
216
    {
217 6
        $message = $this->channel->basic_get($this->queueArgs[0]);
218 6
        if ($message) {
219 6
            $job = new Job();
220 6
            $job->fromMessage($message->body);
221 6
            $job->setRunId($runId);
222
223 6
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
224 1
                $expiredJob = true;
225 1
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
226
227 1
                return null;
228
            }
229 6
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
230
231 6
            return $job;
232
        }
233
234 3
        return null;
235
    }
236
237
    // Save History get called upon completion of the job
238 1
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
239
    {
240 1
        if (!$job instanceof Job) {
241 1
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
242
        }
243 1
        $deliveryTag = $job->getDeliveryTag();
244 1
        $this->channel->basic_ack($deliveryTag);
245
246 1
        return;
247
    }
248
249 3
    public function __destruct()
250
    {
251 3
        if (null !== $this->channel) {
252 2
            $this->channel->close();
253
        }
254 3
    }
255
}
256