Issues (10)

src/AMQPConnection.php (2 issues)

Labels
Severity
1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp;
8
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Connection\AMQPSocketConnection;
12
use PhpAmqpLib\Connection\AMQPStreamConnection;
13
use PhpAmqpLib\Exception\AMQPChannelClosedException;
14
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
15
use Psr\Log\LoggerAwareInterface;
16
use Psr\Log\LoggerAwareTrait;
17
18
/**
19
 * Class AMQPConnection
20
 *
21
 * @package JokerProject\LaravelAliyunAmqp
22
 */
23
class AMQPConnection implements LoggerAwareInterface
24
{
25
    use LoggerAwareTrait;
26
27
    /**
28
     * @const array Default connections parameters
29
     */
30
    const DEFAULTS = [
31
        'hostname' => '127.0.0.1',
32
        'port' => 5672,
33
        'username' => 'guest',
34
        'password' => 'guest',
35
        'vhost' => '/',
36
37
        # whether the connection should be lazy
38
        'lazy' => true,
39
40
        # More info about timeouts can be found on https://www.rabbitmq.com/networking.html
41
        'read_write_timeout' => 3,   // default timeout for writing/reading (in seconds)
42
        'connect_timeout' => 3,
43
        'heartbeat' => 0,
44
        'keep_alive' => false,
45
        'access_key' => '',
46
        'access_secret' => '',
47
        'resource_owner_id' => '',
48
    ];
49
50
    /**
51
     * @var array
52
     */
53
    protected $connectionDetails = [];
54
55
    /**
56
     * @var string
57
     */
58
    protected $aliasName = '';
59
60
    /**
61
     * @var null|AbstractConnection
62
     */
63
    private $connection = null;
64
65
    /**
66
     * @var null|AMQPChannel
67
     */
68
    private $channel = null;
69
70
    /**
71
     * @var string
72
     */
73
    private $accessKey = '';
74
75
    /**
76
     * @var string
77
     */
78
    private $accessSecret = '';
79
80
    /**
81
     * @var string
82
     */
83
    private $resourceOwnerId = '';
84
85
    /**
86
     * @param string $aliasName
87
     * @param array  $connectionDetails
88
     *
89
     * @return AMQPConnection
90
     */
91
    public static function createConnection(string $aliasName, array $connectionDetails)
92
    {
93
        if ($diff = array_diff(array_keys($connectionDetails), array_keys(self::DEFAULTS))) {
94
            throw new \InvalidArgumentException(
95
                sprintf(
96
                    "Cannot create connection %s, received unknown arguments: %s!",
97
                    (string)$aliasName,
98
                    implode(', ', $diff)
99
                )
100
            );
101
        }
102
        return new static(
103
            $aliasName,
104
            array_merge(self::DEFAULTS, $connectionDetails)
105
        );
106
    }
107
108
    /**
109
     * AMQPConnection constructor.
110
     *
111
     * @param string $aliasName
112
     * @param array  $connectionDetails
113
     * @param string $accessKey
114
     * @param string $accessSecret
115
     * @param string $resourceOwnerId
116
     */
117
    public function __construct(
118
        string $aliasName,
119
        array $connectionDetails = []
120
    ) {
121
        $this->aliasName = $aliasName;
122
        if (array_key_exists('access_key', $connectionDetails)) {
123
            $this->accessKey = $connectionDetails['access_key'];
124
            $this->accessSecret = $connectionDetails['access_secret'];
125
            $this->resourceOwnerId = $connectionDetails['resource_owner_id'];
126
            if ($connectionDetails['access_key'] != ''
127
                && $connectionDetails['access_secret'] != ''
128
                && $connectionDetails['resource_owner_id'] != ''
129
            ) {
130
                $connectionDetails['username'] = $this->getUser();
131
                $connectionDetails['password'] = $this->getPassword();
132
            }
133
        }
134
        $this->connectionDetails = $connectionDetails;
135
        if (isset($connectionDetails['lazy']) && $connectionDetails['lazy'] === false) {
136
            $this->getConnection();
137
        }
138
    }
139
140
    /**
141
     * @return AbstractConnection
142
     */
143
    protected function getConnection(): AbstractConnection
144
    {
145
        if (is_null($this->connection)) {
146
            if (!isset($this->connection['type'])) {
147
                $this->connection['type'] = AMQPStreamConnection::class;
148
            }
149
            switch ($this->connection['type']) {
150
                case AMQPStreamConnection::class:
151
                case 'stream':
152
                    $type = AMQPStreamConnection::class;
153
                    break;
154
                default:
155
                    $type = AMQPSocketConnection::class;
156
            }
157
158
            $this->connection = $this->createConnectionByType($type);
159
        }
160
        return $this->connection;
161
    }
162
163
    /**
164
     * @param $type
165
     *
166
     * @return mixed
167
     */
168
    private function createConnectionByType($type)
169
    {
170
        $connectionDetails = $this->connectionDetails;
171
        if (array_key_exists('access_key', $connectionDetails)) {
172
            $this->accessKey = $connectionDetails['access_key'];
173
            $this->accessSecret = $connectionDetails['access_secret'];
174
            $this->resourceOwnerId = $connectionDetails['resource_owner_id'];
175
            if ($connectionDetails['access_key'] != ''
176
                && $connectionDetails['access_secret'] != ''
177
                && $connectionDetails['resource_owner_id'] != ''
178
            ) {
179
                $this->connectionDetails['username'] = $this->getUser();
180
                $this->connectionDetails['password'] = $this->getPassword();
181
            }
182
        }
183
        return new $type(
184
            $this->connectionDetails['hostname'],
185
            $this->connectionDetails['port'],
186
            $this->connectionDetails['username'],
187
            $this->connectionDetails['password'],
188
            $this->connectionDetails['vhost'],
189
            /** insist */
190
            false,
191
            /** login method */
192
            'AMQPLAIN',
193
            /** login_response */
194
            null,
195
            /** locale */
196
            'en_US',
197
            $this->connectionDetails['connect_timeout'],
198
            $this->connectionDetails['read_write_timeout'],
199
            null,
200
            $this->connectionDetails['keep_alive'],
201
            $this->connectionDetails['heartbeat']
202
        );
203
    }
204
205
    /**
206
     * Reconnect
207
     */
208
    public function reconnect()
209
    {
210
        try {
211
            if (!$this->connection->isConnected()) {
0 ignored issues
show
The method isConnected() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

211
            if (!$this->connection->/** @scrutinizer ignore-call */ isConnected()) {

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
212
                $this->connection = $this->getConnection();
213
            }
214
            if ($this->connection->channel()->is_open()) {
215
                $this->connection->channel()->close();
216
            }
217
            $this->channel = null;
218
            $this->getConnection()->reconnect();
219
        } catch (AMQPChannelClosedException $e) {
220
            $this->logger->info('channel was closed');
221
        } catch (AMQPConnectionClosedException $e){
222
            $this->getConnection()->reconnect();
223
            $this->logger->info('connection was closed');
224
        }
225
    }
226
227
    public function close()
228
    {
229
        if ($this->channel->is_open()) {
0 ignored issues
show
The method is_open() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

229
        if ($this->channel->/** @scrutinizer ignore-call */ is_open()) {

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
230
            $this->channel->close();
231
        }
232
        if ($this->connection->isConnected()) {
233
            try {
234
                $this->connection->close();
235
            } catch (\Exception $exception) {
236
                throw $exception;
237
            }
238
        }
239
    }
240
241
    /**
242
     * @return \PhpAmqpLib\Channel\AMQPChannel
243
     */
244
    public function getChannel()
245
    {
246
        if (is_null($this->channel)) {
247
            $this->channel = $this->getConnection()->channel();
248
        }
249
        return $this->channel;
250
    }
251
252
    /**
253
     * Retrieve the connection alias name
254
     *
255
     * @return string
256
     */
257
    public function getAliasName(): string
258
    {
259
        return $this->aliasName;
260
    }
261
262
263
    /**
264
     * getUser
265
     *
266
     * @return string
267
     */
268
    private function getUser()
269
    {
270
        $t = '0:' . $this->resourceOwnerId . ':' . $this->accessKey;
271
        return base64_encode($t);
272
    }
273
274
    /**
275
     * getPassword
276
     *
277
     * @return string
278
     */
279
    private function getPassword()
280
    {
281
        $ts = (int)(microtime(true) * 1000);
282
        $value = utf8_encode($this->accessSecret);
283
        $key = utf8_encode((string)$ts);
284
        $sig = strtoupper(hash_hmac('sha1', $value, $key, false));
285
        return base64_encode(utf8_encode($sig . ':' . $ts));
286
    }
287
}
288