Completed
Push — master ( 58e792...f01f49 )
by Matthew
05:34
created

JobManager   A

Complexity

Total Complexity 28

Size/Duplication

Total Lines 163
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 40%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
wmc 28
lcom 1
cbo 6
dl 0
loc 163
ccs 30
cts 75
cp 0.4
rs 10
c 3
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 2
A setExchangeArgs() 0 4 1
A setQueueArgs() 0 13 3
A setAMQPConnection() 0 5 1
A getChannel() 0 4 1
B setupChannel() 0 20 5
B save() 0 21 6
B getJob() 0 29 6
A saveHistory() 0 10 2
A __destruct() 0 4 1
1
<?php
2
3
namespace Dtc\QueueBundle\RabbitMQ;
4
5
use Dtc\QueueBundle\Model\AbstractJobManager;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use PhpAmqpLib\Connection\AbstractConnection;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class JobManager extends AbstractJobManager
11
{
12
    /** @var AMQPChannel */
13
    protected $channel;
14
15
    /** @var AbstractConnection */
16
    protected $connection;
17
    protected $queueArgs;
18
    protected $exchangeArgs;
19
20
    protected $channelSetup = false;
21
22
    protected $hostname;
23
    protected $pid;
24
    protected $maxPriority;
25
26
    public function __construct()
27
    {
28
        $this->hostname = gethostname() ?: '';
29
        $this->pid = getmypid();
30
    }
31
32
    /**
33
     * @param string $exchange
34
     * @param string $type
35
     * @param bool   $passive
36
     * @param bool   $durable
37
     * @param bool   $autoDelete
38
     */
39
    public function setExchangeArgs($exchange, $type, $passive, $durable, $autoDelete)
40
    {
41
        $this->exchangeArgs = [$exchange, $type, $passive, $durable, $autoDelete];
42
    }
43
44
    /**
45
     * @param string $queue
46
     * @param bool   $passive
47
     * @param bool   $durable
48
     * @param bool   $exclusive
49
     * @param bool   $autoDelete
50
     * @param int    $maxPriority
51
     */
52
    public function setQueueArgs($queue, $passive, $durable, $exclusive, $autoDelete, $maxPriority)
53
    {
54
        $arguments = [$queue, $passive, $durable, $exclusive, $autoDelete];
55
56
        $this->queueArgs = $arguments;
57
        if (!ctype_digit(strval($maxPriority))) {
58
            throw new \Exception('Max Priority needs to be a non-negative integer');
59
        }
60
        if (strval(intval($maxPriority)) !== strval($maxPriority)) {
61
            throw new \Exception('Priority is higher than '.PHP_INT_MAX);
62
        }
63
        $this->maxPriority = $maxPriority;
64
    }
65
66
    public function setAMQPConnection(AbstractConnection $connection)
67
    {
68
        $this->connection = $connection;
69
        $this->channel = $connection->channel();
70
    }
71
72
    /**
73
     * @return AMQPChannel
74
     */
75
    public function getChannel()
76
    {
77
        return $this->channel;
78
    }
79
80 5
    public function setupChannel()
81
    {
82 5
        if (empty($this->queueArgs)) {
83
            throw new \Exception(__METHOD__.': queue args need to be set via setQueueArgs(...)');
84
        }
85 5
        if (empty($this->exchangeArgs)) {
86
            throw new \Exception(__METHOD__.': exchange args need to be set via setExchangeArgs(...)');
87
        }
88
89 5
        if (!$this->channelSetup) {
90
            call_user_func_array([$this->channel, 'exchange_declare'], $this->exchangeArgs);
91
            if ($this->maxPriority) {
92
                array_push($this->queueArgs, false);
93
                array_push($this->queueArgs, ['x-max-priority' => ['I', intval($this->maxPriority)]]);
94
            }
95
            call_user_func_array([$this->channel, 'queue_declare'], $this->queueArgs);
96
            $this->channel->queue_bind($this->queueArgs[0], $this->exchangeArgs[0]);
97
            $this->channelSetup = true;
98
        }
99 5
    }
100
101 5
    public function save(\Dtc\QueueBundle\Model\Job $job)
102
    {
103 5
        $this->setupChannel();
104 5
        if (!$job->getId()) {
105 5
            $job->setId(uniqid($this->hostname.'-'.$this->pid, true));
106
        }
107
108 5
        if (null !== ($priority = $job->getPriority()) && !$this->maxPriority) {
109
            throw new \Exception('This queue does not support priorities');
110
        }
111
112 5
        $msg = new AMQPMessage($job->toMessage());
0 ignored issues
show
Documentation Bug introduced by
The method toMessage does not exist on object<Dtc\QueueBundle\Model\Job>? Since you implemented __call, maybe consider adding a @method annotation.

If you implement __call and you know which methods are available, you can improve IDE auto-completion and static analysis by adding a @method annotation to the class.

This is often the case, when __call is implemented by a parent class and only the child class knows which methods exist:

class ParentClass {
    private $data = array();

    public function __call($method, array $args) {
        if (0 === strpos($method, 'get')) {
            return $this->data[strtolower(substr($method, 3))];
        }

        throw new \LogicException(sprintf('Unsupported method: %s', $method));
    }
}

/**
 * If this class knows which fields exist, you can specify the methods here:
 *
 * @method string getName()
 */
class SomeClass extends ParentClass { }
Loading history...
113
114 5
        if ($this->maxPriority) {
115 5
            $priority = null === $priority ? 0 : $this->maxPriority - $priority;
116 5
            $msg->set('priority', $priority);
117
        }
118 5
        $this->channel->basic_publish($msg, $this->exchangeArgs[0]);
119
120 5
        return $job;
121
    }
122
123
    /**
124
     * @param string $workerName
125
     */
126 5
    public function getJob($workerName = null, $methodName = null, $prioritize = true, $runId = null)
127
    {
128 5
        if ($methodName) {
129
            throw new \Exception('Unsupported');
130
        }
131
132 5
        $this->setupChannel();
133
134 5
        $expiredJob = false;
135
        do {
136 5
            $message = $this->channel->basic_get($this->queueArgs[0]);
137 5
            if ($message) {
138 5
                $job = new Job();
139 5
                $job->fromMessage($message->body);
140 5
                $job->setRunId($runId);
141
142 5
                if (($expiresAt = $job->getExpiresAt()) && $expiresAt->getTimestamp() < time()) {
143
                    $expiredJob = true;
144
                    $this->channel->basic_nack($message->delivery_info['delivery_tag']);
145
                    continue;
146
                }
147 5
                $job->setDeliveryTag($message->delivery_info['delivery_tag']);
148
149 5
                return $job;
150
            }
151 1
        } while ($expiredJob);
152
153 1
        return null;
154
    }
155
156
    // Save History get called upon completion of the job
157
    public function saveHistory(\Dtc\QueueBundle\Model\Job $job)
158
    {
159
        if (!$job instanceof Job) {
160
            throw new \Exception("Expected \Dtc\QueueBundle\RabbitMQ\Job, got ".get_class($job));
161
        }
162
        $deliveryTag = $job->getDeliveryTag();
163
        $this->channel->basic_ack($deliveryTag);
164
165
        return;
166
    }
167
168
    public function __destruct()
169
    {
170
        $this->channel->close();
171
    }
172
}
173