Completed
Pull Request — master (#7)
by Klaus
02:27
created

RabbitAdapter::perform()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 19
rs 9.6333
c 0
b 0
f 0
cc 3
nc 2
nop 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Linio\Component\Queue\Adapter;
6
7
use Linio\Component\Queue\AdapterInterface;
8
use Linio\Component\Queue\Job;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AMQPConnection;
11
use PhpAmqpLib\Message\AMQPMessage;
12
13
class RabbitAdapter implements AdapterInterface
14
{
15
    protected ?AMQPChannel $channel = null;
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected '?', expecting T_FUNCTION or T_CONST
Loading history...
16
    protected array $config = [];
17
18
    public function __construct(array $config = [])
19
    {
20
        $this->config = $config;
21
    }
22
23
    public function add(Job $job): void
24
    {
25
        $this->getChannel()->queue_declare($job->getQueue(), true, $job->isPersistent(), false, false);
26
        $message = new AMQPMessage($job->getPayload(), ['delivery_mode' => 2]);
27
        $this->getChannel()->basic_publish($message, '', $job->getQueue());
28
    }
29
30
    public function perform(Job $job): void
31
    {
32
        $this->getChannel()->queue_declare($job->getQueue(), true, $job->isPersistent(), false, false);
33
        $this->getChannel()->basic_qos(0, 1, false);
34
        $this->getChannel()->basic_consume($job->getQueue(), '', false, false, false, false, function ($message) use ($job): void {
35
            $job->setPayload($message->body);
36
            $job->perform();
37
38
            if ($job->isFinished()) {
39
                $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
40
            } else {
41
                $this->getChannel()->basic_nack($message->delivery_info['delivery_tag'], false, true);
42
            }
43
        });
44
45
        while (count($this->getChannel()->callbacks)) {
46
            $this->getChannel()->wait();
47
        }
48
    }
49
50
    public function getChannel(): AMQPChannel
51
    {
52
        if (!$this->channel) {
53
            $connection = new AMQPConnection($this->config['host'], $this->config['port'], $this->config['username'], $this->config['password'], $this->config['vhost']);
54
            $this->channel = $connection->channel();
55
        }
56
57
        return $this->channel;
58
    }
59
60
    public function setChannel(AMQPChannel $channel): void
61
    {
62
        $this->channel = $channel;
63
    }
64
}
65