Completed
Push — master ( 673fc6...a88edb )
by Oleg
03:40
created

RabbitMQ::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 7
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
cc 1
eloc 4
nc 1
nop 1
1
<?php /** MicroRabbitMQ */
2
3
namespace Micro\Queue\Drivers;
4
5
/**
6
 * RabbitMQ class file.
7
 *
8
 * @author Oleg Lunegov <[email protected]>
9
 * @link https://github.com/linpax/microphp-framework
10
 * @copyright Copyright (c) 2013 Oleg Lunegov
11
 * @license https://github.com/linpax/microphp-framework/blob/master/LICENSE
12
 * @package Micro
13
 * @subpackage Queue
14
 * @version 1.0
15
 * @since 1.0
16
 */
17
class RabbitMQ
18
{
19
    /** @var \AMQPConnection $connect Connect to broker */
20
    protected $connect;
21
    /** @var \AMQPChannel $channel Channel of connection */
22
    protected $channel;
23
24
25
    /**
26
     * Constructor RabbitMQ
27
     *
28
     * @access public
29
     *
30
     * @param array $params connect to broker
31
     *
32
     * @result void
33
     * @throws \AMQPConnectionException
34
     */
35
    public function __construct(array $params = [])
36
    {
37
        $this->connect = new \AMQPConnection($params);
38
        $this->connect->connect();
39
40
        $this->channel = new \AMQPChannel($this->connect);
41
    }
42
43
    /**
44
     * Close RabbitMQ
45
     *
46
     * @access public
47
     * @return void
48
     */
49
    public function __destruct()
50
    {
51
        $this->connect->disconnect();
52
    }
53
54
    /**
55
     * Send message
56
     *
57
     * @access public
58
     *
59
     * @param string $message message text
60
     * @param string $route name route
61
     * @param string $chat name chat room
62
     *
63
     * @return bool
64
     * @throws \AMQPConnectionException
65
     * @throws \AMQPChannelException
66
     * @throws \AMQPExchangeException
67
     */
68
    public function send($message, $route, $chat)
69
    {
70
        $exchange = new \AMQPExchange($this->channel);
71
        $exchange->setName($chat);
72
73
        return $exchange->publish($message, $route);
74
    }
75
76
    /**
77
     * Read current message
78
     *
79
     * @access public
80
     *
81
     * @param string $chat name chat room
82
     * @param string $route name route
83
     * @param string $nameReader name queue
84
     *
85
     * @return \AMQPEnvelope|bool
86
     * @throws \AMQPConnectionException
87
     * @throws \AMQPChannelException
88
     * @throws \AMQPQueueException
89
     */
90
    public function read($chat, $route, $nameReader = 'random')
91
    {
92
        $queue = new \AMQPQueue($this->channel);
93
        $queue->setName($nameReader);
94
        /** @noinspection PhpUndefinedMethodInspection */
95
        $queue->declare();
0 ignored issues
show
Bug introduced by
The method declare() does not exist on AMQPQueue. Did you maybe mean declareQueue()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
96
        $queue->bind($chat, $route);
97
98
        $envelop = $queue->get();
99
        if ($envelop) {
100
            $queue->ack($envelop->getDeliveryTag());
101
102
            return $envelop;
103
        }
104
105
        return false;
106
    }
107
108
    /**
109
     * Read all messages
110
     *
111
     * @access public
112
     *
113
     * @param string $chat name chat room
114
     * @param string $route name route
115
     * @param string $nameReader name queue
116
     *
117
     * @return array
118
     * @throws \AMQPConnectionException
119
     * @throws \AMQPQueueException
120
     * @throws \AMQPChannelException
121
     */
122
    public function readAll($chat, $route, $nameReader)
123
    {
124
        $queue = new \AMQPQueue($this->channel);
125
        $queue->setName($nameReader);
126
        /** @noinspection PhpUndefinedMethodInspection */
127
        $queue->declare();
0 ignored issues
show
Bug introduced by
The method declare() does not exist on AMQPQueue. Did you maybe mean declareQueue()?

This check marks calls to methods that do not seem to exist on an object.

This is most likely the result of a method being renamed without all references to it being renamed likewise.

Loading history...
128
        $queue->bind($chat, $route);
129
130
        $result = [];
131
        while ($envelop = $queue->get()) {
132
            $queue->ack($envelop->getDeliveryTag());
133
            $result[] = $envelop;
134
        }
135
136
        return $result;
137
    }
138
}
139