Passed
Pull Request — master (#6)
by Daniel
04:58
created

QueueClient::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
1
<?php
2
3
namespace Jellyfish\QueueRabbitMq;
4
5
use Jellyfish\Queue\MessageMapperInterface;
6
use Jellyfish\Queue\QueueClientInterface;
7
use Jellyfish\Queue\MessageInterface;
8
use PhpAmqpLib\Connection\AbstractConnection;
9
use PhpAmqpLib\Message\AMQPMessage;
10
11
class QueueClient implements QueueClientInterface
12
{
13
    /**
14
     * @var \PhpAmqpLib\Connection\AbstractConnection
15
     */
16
    protected $connection;
17
18
    /**
19
     * @var \PhpAmqpLib\Channel\AMQPChannel
20
     */
21
    protected $channel;
22
23
    /**
24
     * @var \Jellyfish\Queue\MessageMapperInterface
25
     */
26
    protected $messageMapper;
27
28
    /**
29
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
30
     * @param \Jellyfish\Queue\MessageMapperInterface $messageMapper
31
     */
32
    public function __construct(AbstractConnection $connection, MessageMapperInterface $messageMapper)
33
    {
34
        $this->connection = $connection;
35
        $this->channel = $this->connection->channel();
36
        $this->messageMapper = $messageMapper;
37
    }
38
39
    /**
40
     * @param string $queueName
41
     *
42
     * @return \Jellyfish\Queue\MessageInterface|null
43
     */
44
    public function receiveMessage(string $queueName): ?MessageInterface
45
    {
46
        $this->channel->queue_declare($queueName);
47
48
        $messageAsJson = $this->channel->basic_get($queueName);
49
        
50
        if ($messageAsJson === null) {
51
            return null;
52
        }
53
54
        return $this->messageMapper->fromJson($messageAsJson);
55
    }
56
57
    /**
58
     * @param string $queueName
59
     * @param \Jellyfish\Queue\MessageInterface $message
60
     *
61
     * @return \Jellyfish\Queue\QueueClientInterface
62
     */
63
    public function sendMessage(string $queueName, MessageInterface $message): QueueClientInterface
64
    {
65
        $messageAsJson = $this->messageMapper->toJson($message);
66
67
        $this->channel->queue_declare($queueName);
68
        $this->channel->basic_publish(new AMQPMessage($messageAsJson), '', $queueName);
69
70
        return $this;
71
    }
72
73
    public function __destruct()
74
    {
75
        if ($this->channel !== null) {
76
            $this->channel->close();
77
        }
78
79
        $this->connection->close();
80
    }
81
}
82