Test Setup Failed
Push — master ( 298fd3...56daff )
by Matthew
17:14
created

JobManager   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 146
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Importance

Changes 2
Bugs 0 Features 0
Metric Value
wmc 28
lcom 1
cbo 6
dl 0
loc 146
rs 10
c 2
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 2
A setExchangeArgs() 0 4 1
A setQueueArgs() 0 14 3
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
B setupChannel() 0 20 5
B save() 0 21 6
B getJob() 0 29 6
A saveHistory() 0 10 2
A __destruct() 0 4 1
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends AbstractJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
    protected $maxPriority;
25
26
    public function __construct()
27
    {
28
        $this->hostname = gethostname() ?: '';
29
        $this->pid = getmypid();
30
    }
31
32
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
0 ignored issues
show
Unused Code introduced by
The parameter $exchange is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $type is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $passive is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $durable is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $autoDelete is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
33
    {
34
        $this->exchangeArgs = func_get_args();
35
    }
36
37
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete, $maxPriority)
0 ignored issues
show
Unused Code introduced by
The parameter $queue is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $passive is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $durable is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $exclusive is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
Unused Code introduced by
The parameter $autoDelete is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
38
    {
39
        $arguments = func_get_args();
40
        array_pop($arguments); // Pop off max priority
41
42
        $this->queueArgs = $arguments;
43
        if (!ctype_digit(strval($maxPriority))) {
44
            throw new \Exception('Max Priority needs to be a non-negative integer');
45
        }
46
        if (strval(intval($maxPriority)) !== strval($maxPriority)) {
47
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
48
        }
49
        $this->maxPriority = $maxPriority;
50
    }
51
52
    public function setAMQPConnection(AbstractConnection $connection)
53
    {
54
        $this->connection = $connection;
55
        $this->channel = $connection->channel();
56
    }
57
58
    /**
59
     * @return AMQPChannel
60
     */
61
    public function getChannel()
62
    {
63
        return $this->channel;
64
    }
65
66
    public function setupChannel()
67
    {
68
        if (!$this->queueArgs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->queueArgs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
69
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
70
        }
71
        if (!$this->exchangeArgs) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->exchangeArgs of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
72
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
73
        }
74
75
        if (!$this->channelSetup) {
76
            call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
77
            if ($this->maxPriority) {
78
                array_push($this->queueArgs, false);
79
                array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
80
            }
81
            call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
82
            $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
83
            $this->channelSetup = true;
84
        }
85
    }
86
87
    public function save(\Dtc\QueueBundle\Model\Job $job)
88
    {
89
        $this->setupChannel();
90
        if (!$job->getId()) {
91
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
92
        }
93
94
        if (null !== ($priority = $job->getPriority()) && !$this->maxPriority) {
95
            throw new \Exception('This queue does not support priorities');
96
        }
97
98
        $msg = new AMQPMessage($job->toMessage());
99
100
        if ($this->maxPriority) {
101
            $priority = null === $priority ? 0 : $this->maxPriority - $priority;
102
            $msg->set('priority', $priority);
103
        }
104
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
105
106
        return $job;
107
    }
108
109
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
110
    {
111
        if ($methodName) {
112
            throw new \Exception('Unsupported');
113
        }
114
115
        $this->setupChannel();
116
117
        $expiredJob = false;
118
        do {
119
            $message = $this->channel->basic_get($this->queueArgs[0]);
120
            if ($message) {
121
                $job = new Job();
122
                $job->fromMessage($message->body);
123
                $job->setRunId($runId);
124
125
                if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
126
                    $expiredJob = true;
127
                    $this->channel->basic_nack($message->delivery_info['delivery_tag']);
128
                    continue;
129
                }
130
                $job->setDeliveryTag($message->delivery_info['delivery_tag']);
131
132
                return $job;
133
            }
134
        } while ($expiredJob);
135
136
        return null;
137
    }
138
139
    // Save History get called upon completion of the job
140
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
141
    {
142
        if (!$job instanceof Job) {
143
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
144
        }
145
        $deliveryTag = $job->getDeliveryTag();
146
        $this->channel->basic_ack($deliveryTag);
147
148
        return;
149
    }
150
151
    public function __destruct()
152
    {
153
        $this->channel->close();
154
    }
155
}
156