Completed
Push — master ( e17880...ddcf44 )
by Markus
20s queued 10s
created

QueueClient::getChannel()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
c 0
b 0
f 0
dl 0
loc 7
rs 10
cc 2
nc 2
nop 0
1
<?php
2
3
namespace Jellyfish\QueueRabbitMq;
4
5
use Jellyfish\Queue\MessageInterface;
6
use Jellyfish\Queue\MessageMapperInterface;
7
use Jellyfish\Queue\QueueClientInterface;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Connection\AbstractConnection;
10
use PhpAmqpLib\Message\AMQPMessage;
11
12
class QueueClient implements QueueClientInterface
13
{
14
    /**
15
     * @var \PhpAmqpLib\Connection\AbstractConnection
16
     */
17
    protected $connection;
18
19
    /**
20
     * @var \PhpAmqpLib\Channel\AMQPChannel|null
21
     */
22
    protected $channel;
23
24
    /**
25
     * @var \Jellyfish\Queue\MessageMapperInterface
26
     */
27
    protected $messageMapper;
28
29
    /**
30
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
31
     * @param \Jellyfish\Queue\MessageMapperInterface $messageMapper
32
     */
33
    public function __construct(AbstractConnection $connection, MessageMapperInterface $messageMapper)
34
    {
35
        $this->connection = $connection;
36
        $this->messageMapper = $messageMapper;
37
    }
38
39
    /**
40
     * @param string $queueName
41
     *
42
     * @return \Jellyfish\Queue\MessageInterface|null
43
     */
44
    public function receiveMessage(string $queueName): ?MessageInterface
45
    {
46
        $this->getChannel()->queue_declare($queueName);
47
48
        $messageAsJson = $this->getChannel()->basic_get($queueName, true);
49
50
        if ($messageAsJson === null || !($messageAsJson instanceof AMQPMessage)) {
51
            return null;
52
        }
53
54
        return $this->messageMapper->fromJson($messageAsJson->getBody());
55
    }
56
57
    /**
58
     * @param string $queueName
59
     * @param \Jellyfish\Queue\MessageInterface $message
60
     *
61
     * @return \Jellyfish\Queue\QueueClientInterface
62
     */
63
    public function sendMessage(string $queueName, MessageInterface $message): QueueClientInterface
64
    {
65
        $messageAsJson = $this->messageMapper->toJson($message);
66
67
        $this->getChannel()->queue_declare($queueName);
68
        $this->getChannel()->basic_publish(new AMQPMessage($messageAsJson), '', $queueName);
69
70
        return $this;
71
    }
72
73
    /**
74
     * @return \PhpAmqpLib\Channel\AMQPChannel
75
     */
76
    protected function getChannel(): AMQPChannel
77
    {
78
        if ($this->channel === null) {
79
            $this->channel = $this->connection->channel();
80
        }
81
82
        return $this->channel;
83
    }
84
85
    public function __destruct()
86
    {
87
        if ($this->connection->isConnected()) {
88
            $this->connection->close();
89
        }
90
    }
91
}
92