Completed
Push — master ( 1204eb...5574d8 )
by Matthew
05:57
created

JobManager   B

Complexity

Total Complexity 41

Size/Duplication

Total Lines 246
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 41
lcom 1
cbo 6
dl 0
loc 246
ccs 0
cts 147
cp 0
rs 8.2769
c 2
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 2
A setExchangeArgs() 0 4 1
A setQueueArgs() 0 12 3
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
A checkChannelArgs() 0 9 3
A performChannelSetup() 0 10 2
A setupChannel() 0 9 2
A prioritySave() 0 18 2
A setJobId() 0 6 2
A setMsgPriority() 0 7 2
A calculatePriority() 0 9 2
A validateSaveable() 0 10 4
A verifyGetJobArgs() 0 6 4
A getJob() 0 12 2
A findJob() 0 21 4
A saveHistory() 0 10 2
A __destruct() 0 6 2

How to fix   Complexity   

Complex Class

Complex classes like JobManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use JobManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\PriorityJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends PriorityJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
25
    public function __construct()
26
    {
27
        $this->hostname = gethostname() ?: '';
28
        $this->pid = getmypid();
29
    }
30
31
    /**
32
     * @param string $exchange
33
     * @param string $type
34
     * @param bool   $passive
35
     * @param bool   $durable
36
     * @param bool   $autoDelete
37
     */
38
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
39
    {
40
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
41
    }
42
43
    /**
44
     * @param string $queue
45
     * @param bool   $passive
46
     * @param bool   $durable
47
     * @param bool   $exclusive
48
     * @param bool   $autoDelete
49
     */
50
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete)
51
    {
52
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
53
54
        $this->queueArgs = $arguments;
55
        if (!ctype_digit(strval($this->maxPriority))) {
56
            throw new \Exception('Max Priority ('.$this->maxPriority.') needs to be a non-negative integer');
57
        }
58
        if (strval(intval($this->maxPriority)) !== strval($this->maxPriority)) {
59
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
60
        }
61
    }
62
63
    public function setAMQPConnection(AbstractConnection $connection)
64
    {
65
        $this->connection = $connection;
66
        $this->channel = $connection->channel();
67
    }
68
69
    /**
70
     * @return AMQPChannel
71
     */
72
    public function getChannel()
73
    {
74
        return $this->channel;
75
    }
76
77
    protected function checkChannelArgs()
78
    {
79
        if (empty($this->queueArgs)) {
80
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
81
        }
82
        if (empty($this->exchangeArgs)) {
83
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
84
        }
85
    }
86
87
    protected function performChannelSetup()
88
    {
89
        call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
90
        if ($this->maxPriority) {
91
            array_push($this->queueArgs, false);
92
            array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
93
        }
94
        call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
95
        $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
96
    }
97
98
    public function setupChannel()
99
    {
100
        $this->checkChannelArgs();
101
102
        if (!$this->channelSetup) {
103
            $this->performChannelSetup();
104
            $this->channelSetup = true;
105
        }
106
    }
107
108
    /**
109
     * @param \Dtc\QueueBundle\Model\Job $job
110
     *
111
     * @return \Dtc\QueueBundle\Model\Job
112
     *
113
     * @throws \Exception
114
     */
115
    public function prioritySave(\Dtc\QueueBundle\Model\Job $job)
116
    {
117
        if (!$job instanceof Job) {
118
            throw new \Exception('Must be derived from '.Job::class);
119
        }
120
121
        $this->setupChannel();
122
123
        $this->validateSaveable($job);
124
        $this->setJobId($job);
125
126
        $msg = new AMQPMessage($job->toMessage());
127
        $this->setMsgPriority($msg, $job);
128
129
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
130
131
        return $job;
132
    }
133
134
    /**
135
     * Attach a unique id to a job since RabbitMQ will not.
136
     *
137
     * @param \Dtc\QueueBundle\Model\Job $job
138
     */
139
    protected function setJobId(\Dtc\QueueBundle\Model\Job $job)
140
    {
141
        if (!$job->getId()) {
142
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
143
        }
144
    }
145
146
    /**
147
     * Sets the priority of the AMQPMessage.
148
     *
149
     * @param AMQPMessage                $msg
150
     * @param \Dtc\QueueBundle\Model\Job $job
151
     */
152
    protected function setMsgPriority(AMQPMessage $msg, \Dtc\QueueBundle\Model\Job $job)
153
    {
154
        if (null !== $this->maxPriority) {
155
            $priority = $job->getPriority();
156
            $msg->set('priority', $priority);
157
        }
158
    }
159
160
    protected function calculatePriority($priority)
161
    {
162
        $priority = parent::calculatePriority($priority);
163
        if (null === $priority) {
164
            return 0;
165
        }
166
167
        return $priority;
168
    }
169
170
    /**
171
     * @param \Dtc\QueueBundle\Model\Job $job
172
     *
173
     * @throws \Exception
174
     */
175
    protected function validateSaveable(\Dtc\QueueBundle\Model\Job $job)
176
    {
177
        if (null !== $job->getPriority() && null === $this->maxPriority) {
178
            throw new \Exception('This queue does not support priorities');
179
        }
180
181
        if (!$job instanceof Job) {
182
            throw new \Exception('Job needs to be instance of '.Job::class);
183
        }
184
    }
185
186
    protected function verifyGetJobArgs($workerName = null, $methodName = null, $prioritize = true)
187
    {
188
        if (null !== $workerName || null !== $methodName || true !== $prioritize) {
189
            throw new \Exception('Unsupported');
190
        }
191
    }
192
193
    /**
194
     * @param string $workerName
195
     */
196
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
197
    {
198
        $this->verifyGetJobArgs($workerName, $methodName, $prioritize);
199
        $this->setupChannel();
200
201
        do {
202
            $expiredJob = false;
203
            $job = $this->findJob($expiredJob, $runId);
204
        } while ($expiredJob);
205
206
        return $job;
207
    }
208
209
    /**
210
     * @param bool $expiredJob
211
     * @param $runId
212
     *
213
     * @return Job|null
214
     */
215
    protected function findJob(&$expiredJob, $runId)
216
    {
217
        $message = $this->channel->basic_get($this->queueArgs[0]);
218
        if ($message) {
219
            $job = new Job();
220
            $job->fromMessage($message->body);
221
            $job->setRunId($runId);
222
223
            if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
224
                $expiredJob = true;
225
                $this->channel->basic_nack($message->delivery_info['delivery_tag']);
226
227
                return null;
228
            }
229
            $job->setDeliveryTag($message->delivery_info['delivery_tag']);
230
231
            return $job;
232
        }
233
234
        return null;
235
    }
236
237
    // Save History get called upon completion of the job
238
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
239
    {
240
        if (!$job instanceof Job) {
241
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
242
        }
243
        $deliveryTag = $job->getDeliveryTag();
244
        $this->channel->basic_ack($deliveryTag);
245
246
        return;
247
    }
248
249
    public function __destruct()
250
    {
251
        if (null !== $this->channel) {
252
            $this->channel->close();
253
        }
254
    }
255
}
256