1 | <?php |
||||
2 | /** |
||||
3 | * Author: Joker |
||||
4 | * Date: 2020-05-08 13:57 |
||||
5 | */ |
||||
6 | |||||
7 | namespace JokerProject\LaravelAliyunAmqp; |
||||
8 | |||||
9 | use PhpAmqpLib\Channel\AMQPChannel; |
||||
10 | use PhpAmqpLib\Connection\AbstractConnection; |
||||
11 | use PhpAmqpLib\Connection\AMQPSocketConnection; |
||||
12 | use PhpAmqpLib\Connection\AMQPStreamConnection; |
||||
13 | use PhpAmqpLib\Exception\AMQPChannelClosedException; |
||||
14 | use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
||||
15 | use Psr\Log\LoggerAwareInterface; |
||||
16 | use Psr\Log\LoggerAwareTrait; |
||||
17 | |||||
18 | /** |
||||
19 | * Class AMQPConnection |
||||
20 | * |
||||
21 | * @package JokerProject\LaravelAliyunAmqp |
||||
22 | */ |
||||
23 | class AMQPConnection implements LoggerAwareInterface |
||||
24 | { |
||||
25 | use LoggerAwareTrait; |
||||
26 | |||||
27 | /** |
||||
28 | * @const array Default connections parameters |
||||
29 | */ |
||||
30 | const DEFAULTS = [ |
||||
31 | 'hostname' => '127.0.0.1', |
||||
32 | 'port' => 5672, |
||||
33 | 'username' => 'guest', |
||||
34 | 'password' => 'guest', |
||||
35 | 'vhost' => '/', |
||||
36 | |||||
37 | # whether the connection should be lazy |
||||
38 | 'lazy' => true, |
||||
39 | |||||
40 | # More info about timeouts can be found on https://www.rabbitmq.com/networking.html |
||||
41 | 'read_write_timeout' => 3, // default timeout for writing/reading (in seconds) |
||||
42 | 'connect_timeout' => 3, |
||||
43 | 'heartbeat' => 0, |
||||
44 | 'keep_alive' => false, |
||||
45 | 'access_key' => '', |
||||
46 | 'access_secret' => '', |
||||
47 | 'resource_owner_id' => '', |
||||
48 | ]; |
||||
49 | |||||
50 | /** |
||||
51 | * @var array |
||||
52 | */ |
||||
53 | protected $connectionDetails = []; |
||||
54 | |||||
55 | /** |
||||
56 | * @var string |
||||
57 | */ |
||||
58 | protected $aliasName = ''; |
||||
59 | |||||
60 | /** |
||||
61 | * @var null|AbstractConnection |
||||
62 | */ |
||||
63 | private $connection = null; |
||||
64 | |||||
65 | /** |
||||
66 | * @var null|AMQPChannel |
||||
67 | */ |
||||
68 | private $channel = null; |
||||
69 | |||||
70 | /** |
||||
71 | * @var string |
||||
72 | */ |
||||
73 | private $accessKey = ''; |
||||
74 | |||||
75 | /** |
||||
76 | * @var string |
||||
77 | */ |
||||
78 | private $accessSecret = ''; |
||||
79 | |||||
80 | /** |
||||
81 | * @var string |
||||
82 | */ |
||||
83 | private $resourceOwnerId = ''; |
||||
84 | |||||
85 | /** |
||||
86 | * @param string $aliasName |
||||
87 | * @param array $connectionDetails |
||||
88 | * |
||||
89 | * @return AMQPConnection |
||||
90 | */ |
||||
91 | public static function createConnection(string $aliasName, array $connectionDetails) |
||||
92 | { |
||||
93 | if ($diff = array_diff(array_keys($connectionDetails), array_keys(self::DEFAULTS))) { |
||||
94 | throw new \InvalidArgumentException( |
||||
95 | sprintf( |
||||
96 | "Cannot create connection %s, received unknown arguments: %s!", |
||||
97 | (string)$aliasName, |
||||
98 | implode(', ', $diff) |
||||
99 | ) |
||||
100 | ); |
||||
101 | } |
||||
102 | return new static( |
||||
103 | $aliasName, |
||||
104 | array_merge(self::DEFAULTS, $connectionDetails) |
||||
105 | ); |
||||
106 | } |
||||
107 | |||||
108 | /** |
||||
109 | * AMQPConnection constructor. |
||||
110 | * |
||||
111 | * @param string $aliasName |
||||
112 | * @param array $connectionDetails |
||||
113 | * @param string $accessKey |
||||
114 | * @param string $accessSecret |
||||
115 | * @param string $resourceOwnerId |
||||
116 | */ |
||||
117 | public function __construct( |
||||
118 | string $aliasName, |
||||
119 | array $connectionDetails = [] |
||||
120 | ) { |
||||
121 | $this->aliasName = $aliasName; |
||||
122 | if (array_key_exists('access_key', $connectionDetails)) { |
||||
123 | $this->accessKey = $connectionDetails['access_key']; |
||||
124 | $this->accessSecret = $connectionDetails['access_secret']; |
||||
125 | $this->resourceOwnerId = $connectionDetails['resource_owner_id']; |
||||
126 | if ($connectionDetails['access_key'] != '' |
||||
127 | && $connectionDetails['access_secret'] != '' |
||||
128 | && $connectionDetails['resource_owner_id'] != '' |
||||
129 | ) { |
||||
130 | $connectionDetails['username'] = $this->getUser(); |
||||
131 | $connectionDetails['password'] = $this->getPassword(); |
||||
132 | } |
||||
133 | } |
||||
134 | $this->connectionDetails = $connectionDetails; |
||||
135 | if (isset($connectionDetails['lazy']) && $connectionDetails['lazy'] === false) { |
||||
136 | $this->getConnection(); |
||||
137 | } |
||||
138 | } |
||||
139 | |||||
140 | /** |
||||
141 | * @return AbstractConnection |
||||
142 | */ |
||||
143 | protected function getConnection(): AbstractConnection |
||||
144 | { |
||||
145 | if (is_null($this->connection)) { |
||||
146 | if (!isset($this->connection['type'])) { |
||||
147 | $this->connection['type'] = AMQPStreamConnection::class; |
||||
148 | } |
||||
149 | switch ($this->connection['type']) { |
||||
150 | case AMQPStreamConnection::class: |
||||
151 | case 'stream': |
||||
152 | $type = AMQPStreamConnection::class; |
||||
153 | break; |
||||
154 | default: |
||||
155 | $type = AMQPSocketConnection::class; |
||||
156 | } |
||||
157 | |||||
158 | $this->connection = $this->createConnectionByType($type); |
||||
159 | } |
||||
160 | return $this->connection; |
||||
161 | } |
||||
162 | |||||
163 | /** |
||||
164 | * @param $type |
||||
165 | * |
||||
166 | * @return mixed |
||||
167 | */ |
||||
168 | private function createConnectionByType($type) |
||||
169 | { |
||||
170 | $connectionDetails = $this->connectionDetails; |
||||
171 | if (array_key_exists('access_key', $connectionDetails)) { |
||||
172 | $this->accessKey = $connectionDetails['access_key']; |
||||
173 | $this->accessSecret = $connectionDetails['access_secret']; |
||||
174 | $this->resourceOwnerId = $connectionDetails['resource_owner_id']; |
||||
175 | if ($connectionDetails['access_key'] != '' |
||||
176 | && $connectionDetails['access_secret'] != '' |
||||
177 | && $connectionDetails['resource_owner_id'] != '' |
||||
178 | ) { |
||||
179 | $this->connectionDetails['username'] = $this->getUser(); |
||||
180 | $this->connectionDetails['password'] = $this->getPassword(); |
||||
181 | } |
||||
182 | } |
||||
183 | return new $type( |
||||
184 | $this->connectionDetails['hostname'], |
||||
185 | $this->connectionDetails['port'], |
||||
186 | $this->connectionDetails['username'], |
||||
187 | $this->connectionDetails['password'], |
||||
188 | $this->connectionDetails['vhost'], |
||||
189 | /** insist */ |
||||
190 | false, |
||||
191 | /** login method */ |
||||
192 | 'AMQPLAIN', |
||||
193 | /** login_response */ |
||||
194 | null, |
||||
195 | /** locale */ |
||||
196 | 'en_US', |
||||
197 | $this->connectionDetails['connect_timeout'], |
||||
198 | $this->connectionDetails['read_write_timeout'], |
||||
199 | null, |
||||
200 | $this->connectionDetails['keep_alive'], |
||||
201 | $this->connectionDetails['heartbeat'] |
||||
202 | ); |
||||
203 | } |
||||
204 | |||||
205 | /** |
||||
206 | * Reconnect |
||||
207 | */ |
||||
208 | public function reconnect() |
||||
209 | { |
||||
210 | try { |
||||
211 | if (!$this->connection->isConnected()) { |
||||
0 ignored issues
–
show
|
|||||
212 | $this->connection = $this->getConnection(); |
||||
213 | } |
||||
214 | if ($this->connection->channel()->is_open()) { |
||||
215 | $this->connection->channel()->close(); |
||||
216 | } |
||||
217 | $this->channel = null; |
||||
218 | $this->getConnection()->reconnect(); |
||||
219 | } catch (AMQPChannelClosedException $e) { |
||||
220 | $this->logger->info('channel was closed'); |
||||
221 | } catch (AMQPConnectionClosedException $e){ |
||||
222 | $this->getConnection()->reconnect(); |
||||
223 | $this->logger->info('connection was closed'); |
||||
224 | } |
||||
225 | } |
||||
226 | |||||
227 | public function close() |
||||
228 | { |
||||
229 | if ($this->channel->is_open()) { |
||||
0 ignored issues
–
show
The method
is_open() does not exist on null .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||
230 | $this->channel->close(); |
||||
231 | } |
||||
232 | if ($this->connection->isConnected()) { |
||||
233 | try { |
||||
234 | $this->connection->close(); |
||||
235 | } catch (\Exception $exception) { |
||||
236 | throw $exception; |
||||
237 | } |
||||
238 | } |
||||
239 | } |
||||
240 | |||||
241 | /** |
||||
242 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||||
243 | */ |
||||
244 | public function getChannel() |
||||
245 | { |
||||
246 | if (is_null($this->channel)) { |
||||
247 | $this->channel = $this->getConnection()->channel(); |
||||
248 | } |
||||
249 | return $this->channel; |
||||
250 | } |
||||
251 | |||||
252 | /** |
||||
253 | * Retrieve the connection alias name |
||||
254 | * |
||||
255 | * @return string |
||||
256 | */ |
||||
257 | public function getAliasName(): string |
||||
258 | { |
||||
259 | return $this->aliasName; |
||||
260 | } |
||||
261 | |||||
262 | |||||
263 | /** |
||||
264 | * getUser |
||||
265 | * |
||||
266 | * @return string |
||||
267 | */ |
||||
268 | private function getUser() |
||||
269 | { |
||||
270 | $t = '0:' . $this->resourceOwnerId . ':' . $this->accessKey; |
||||
271 | return base64_encode($t); |
||||
272 | } |
||||
273 | |||||
274 | /** |
||||
275 | * getPassword |
||||
276 | * |
||||
277 | * @return string |
||||
278 | */ |
||||
279 | private function getPassword() |
||||
280 | { |
||||
281 | $ts = (int)(microtime(true) * 1000); |
||||
282 | $value = utf8_encode($this->accessSecret); |
||||
283 | $key = utf8_encode((string)$ts); |
||||
284 | $sig = strtoupper(hash_hmac('sha1', $value, $key, false)); |
||||
285 | return base64_encode(utf8_encode($sig . ':' . $ts)); |
||||
286 | } |
||||
287 | } |
||||
288 |
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.
This is most likely a typographical error or the method has been renamed.