yuanxing008 /
laravel-aliyun-amqp
| 1 | <?php |
||||
| 2 | /** |
||||
| 3 | * Author: Joker |
||||
| 4 | * Date: 2020-05-08 13:57 |
||||
| 5 | */ |
||||
| 6 | |||||
| 7 | namespace JokerProject\LaravelAliyunAmqp; |
||||
| 8 | |||||
| 9 | use PhpAmqpLib\Channel\AMQPChannel; |
||||
| 10 | use PhpAmqpLib\Connection\AbstractConnection; |
||||
| 11 | use PhpAmqpLib\Connection\AMQPSocketConnection; |
||||
| 12 | use PhpAmqpLib\Connection\AMQPStreamConnection; |
||||
| 13 | use PhpAmqpLib\Exception\AMQPChannelClosedException; |
||||
| 14 | use PhpAmqpLib\Exception\AMQPConnectionClosedException; |
||||
| 15 | use Psr\Log\LoggerAwareInterface; |
||||
| 16 | use Psr\Log\LoggerAwareTrait; |
||||
| 17 | |||||
| 18 | /** |
||||
| 19 | * Class AMQPConnection |
||||
| 20 | * |
||||
| 21 | * @package JokerProject\LaravelAliyunAmqp |
||||
| 22 | */ |
||||
| 23 | class AMQPConnection implements LoggerAwareInterface |
||||
| 24 | { |
||||
| 25 | use LoggerAwareTrait; |
||||
| 26 | |||||
| 27 | /** |
||||
| 28 | * @const array Default connections parameters |
||||
| 29 | */ |
||||
| 30 | const DEFAULTS = [ |
||||
| 31 | 'hostname' => '127.0.0.1', |
||||
| 32 | 'port' => 5672, |
||||
| 33 | 'username' => 'guest', |
||||
| 34 | 'password' => 'guest', |
||||
| 35 | 'vhost' => '/', |
||||
| 36 | |||||
| 37 | # whether the connection should be lazy |
||||
| 38 | 'lazy' => true, |
||||
| 39 | |||||
| 40 | # More info about timeouts can be found on https://www.rabbitmq.com/networking.html |
||||
| 41 | 'read_write_timeout' => 3, // default timeout for writing/reading (in seconds) |
||||
| 42 | 'connect_timeout' => 3, |
||||
| 43 | 'heartbeat' => 0, |
||||
| 44 | 'keep_alive' => false, |
||||
| 45 | 'access_key' => '', |
||||
| 46 | 'access_secret' => '', |
||||
| 47 | 'resource_owner_id' => '', |
||||
| 48 | ]; |
||||
| 49 | |||||
| 50 | /** |
||||
| 51 | * @var array |
||||
| 52 | */ |
||||
| 53 | protected $connectionDetails = []; |
||||
| 54 | |||||
| 55 | /** |
||||
| 56 | * @var string |
||||
| 57 | */ |
||||
| 58 | protected $aliasName = ''; |
||||
| 59 | |||||
| 60 | /** |
||||
| 61 | * @var null|AbstractConnection |
||||
| 62 | */ |
||||
| 63 | private $connection = null; |
||||
| 64 | |||||
| 65 | /** |
||||
| 66 | * @var null|AMQPChannel |
||||
| 67 | */ |
||||
| 68 | private $channel = null; |
||||
| 69 | |||||
| 70 | /** |
||||
| 71 | * @var string |
||||
| 72 | */ |
||||
| 73 | private $accessKey = ''; |
||||
| 74 | |||||
| 75 | /** |
||||
| 76 | * @var string |
||||
| 77 | */ |
||||
| 78 | private $accessSecret = ''; |
||||
| 79 | |||||
| 80 | /** |
||||
| 81 | * @var string |
||||
| 82 | */ |
||||
| 83 | private $resourceOwnerId = ''; |
||||
| 84 | |||||
| 85 | /** |
||||
| 86 | * @param string $aliasName |
||||
| 87 | * @param array $connectionDetails |
||||
| 88 | * |
||||
| 89 | * @return AMQPConnection |
||||
| 90 | */ |
||||
| 91 | public static function createConnection(string $aliasName, array $connectionDetails) |
||||
| 92 | { |
||||
| 93 | if ($diff = array_diff(array_keys($connectionDetails), array_keys(self::DEFAULTS))) { |
||||
| 94 | throw new \InvalidArgumentException( |
||||
| 95 | sprintf( |
||||
| 96 | "Cannot create connection %s, received unknown arguments: %s!", |
||||
| 97 | (string)$aliasName, |
||||
| 98 | implode(', ', $diff) |
||||
| 99 | ) |
||||
| 100 | ); |
||||
| 101 | } |
||||
| 102 | return new static( |
||||
| 103 | $aliasName, |
||||
| 104 | array_merge(self::DEFAULTS, $connectionDetails) |
||||
| 105 | ); |
||||
| 106 | } |
||||
| 107 | |||||
| 108 | /** |
||||
| 109 | * AMQPConnection constructor. |
||||
| 110 | * |
||||
| 111 | * @param string $aliasName |
||||
| 112 | * @param array $connectionDetails |
||||
| 113 | * @param string $accessKey |
||||
| 114 | * @param string $accessSecret |
||||
| 115 | * @param string $resourceOwnerId |
||||
| 116 | */ |
||||
| 117 | public function __construct( |
||||
| 118 | string $aliasName, |
||||
| 119 | array $connectionDetails = [] |
||||
| 120 | ) { |
||||
| 121 | $this->aliasName = $aliasName; |
||||
| 122 | if (array_key_exists('access_key', $connectionDetails)) { |
||||
| 123 | $this->accessKey = $connectionDetails['access_key']; |
||||
| 124 | $this->accessSecret = $connectionDetails['access_secret']; |
||||
| 125 | $this->resourceOwnerId = $connectionDetails['resource_owner_id']; |
||||
| 126 | if ($connectionDetails['access_key'] != '' |
||||
| 127 | && $connectionDetails['access_secret'] != '' |
||||
| 128 | && $connectionDetails['resource_owner_id'] != '' |
||||
| 129 | ) { |
||||
| 130 | $connectionDetails['username'] = $this->getUser(); |
||||
| 131 | $connectionDetails['password'] = $this->getPassword(); |
||||
| 132 | } |
||||
| 133 | } |
||||
| 134 | $this->connectionDetails = $connectionDetails; |
||||
| 135 | if (isset($connectionDetails['lazy']) && $connectionDetails['lazy'] === false) { |
||||
| 136 | $this->getConnection(); |
||||
| 137 | } |
||||
| 138 | } |
||||
| 139 | |||||
| 140 | /** |
||||
| 141 | * @return AbstractConnection |
||||
| 142 | */ |
||||
| 143 | protected function getConnection(): AbstractConnection |
||||
| 144 | { |
||||
| 145 | if (is_null($this->connection)) { |
||||
| 146 | if (!isset($this->connection['type'])) { |
||||
| 147 | $this->connection['type'] = AMQPStreamConnection::class; |
||||
| 148 | } |
||||
| 149 | switch ($this->connection['type']) { |
||||
| 150 | case AMQPStreamConnection::class: |
||||
| 151 | case 'stream': |
||||
| 152 | $type = AMQPStreamConnection::class; |
||||
| 153 | break; |
||||
| 154 | default: |
||||
| 155 | $type = AMQPSocketConnection::class; |
||||
| 156 | } |
||||
| 157 | |||||
| 158 | $this->connection = $this->createConnectionByType($type); |
||||
| 159 | } |
||||
| 160 | return $this->connection; |
||||
| 161 | } |
||||
| 162 | |||||
| 163 | /** |
||||
| 164 | * @param $type |
||||
| 165 | * |
||||
| 166 | * @return mixed |
||||
| 167 | */ |
||||
| 168 | private function createConnectionByType($type) |
||||
| 169 | { |
||||
| 170 | $connectionDetails = $this->connectionDetails; |
||||
| 171 | if (array_key_exists('access_key', $connectionDetails)) { |
||||
| 172 | $this->accessKey = $connectionDetails['access_key']; |
||||
| 173 | $this->accessSecret = $connectionDetails['access_secret']; |
||||
| 174 | $this->resourceOwnerId = $connectionDetails['resource_owner_id']; |
||||
| 175 | if ($connectionDetails['access_key'] != '' |
||||
| 176 | && $connectionDetails['access_secret'] != '' |
||||
| 177 | && $connectionDetails['resource_owner_id'] != '' |
||||
| 178 | ) { |
||||
| 179 | $this->connectionDetails['username'] = $this->getUser(); |
||||
| 180 | $this->connectionDetails['password'] = $this->getPassword(); |
||||
| 181 | } |
||||
| 182 | } |
||||
| 183 | return new $type( |
||||
| 184 | $this->connectionDetails['hostname'], |
||||
| 185 | $this->connectionDetails['port'], |
||||
| 186 | $this->connectionDetails['username'], |
||||
| 187 | $this->connectionDetails['password'], |
||||
| 188 | $this->connectionDetails['vhost'], |
||||
| 189 | /** insist */ |
||||
| 190 | false, |
||||
| 191 | /** login method */ |
||||
| 192 | 'AMQPLAIN', |
||||
| 193 | /** login_response */ |
||||
| 194 | null, |
||||
| 195 | /** locale */ |
||||
| 196 | 'en_US', |
||||
| 197 | $this->connectionDetails['connect_timeout'], |
||||
| 198 | $this->connectionDetails['read_write_timeout'], |
||||
| 199 | null, |
||||
| 200 | $this->connectionDetails['keep_alive'], |
||||
| 201 | $this->connectionDetails['heartbeat'] |
||||
| 202 | ); |
||||
| 203 | } |
||||
| 204 | |||||
| 205 | /** |
||||
| 206 | * Reconnect |
||||
| 207 | */ |
||||
| 208 | public function reconnect() |
||||
| 209 | { |
||||
| 210 | try { |
||||
| 211 | if (!$this->connection->isConnected()) { |
||||
|
0 ignored issues
–
show
|
|||||
| 212 | $this->connection = $this->getConnection(); |
||||
| 213 | } |
||||
| 214 | if ($this->connection->channel()->is_open()) { |
||||
| 215 | $this->connection->channel()->close(); |
||||
| 216 | } |
||||
| 217 | $this->channel = null; |
||||
| 218 | $this->getConnection()->reconnect(); |
||||
| 219 | } catch (AMQPChannelClosedException $e) { |
||||
| 220 | $this->logger->info('channel was closed'); |
||||
| 221 | } catch (AMQPConnectionClosedException $e){ |
||||
| 222 | $this->getConnection()->reconnect(); |
||||
| 223 | $this->logger->info('connection was closed'); |
||||
| 224 | } |
||||
| 225 | } |
||||
| 226 | |||||
| 227 | public function close() |
||||
| 228 | { |
||||
| 229 | if ($this->channel->is_open()) { |
||||
|
0 ignored issues
–
show
The method
is_open() does not exist on null.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||
| 230 | $this->channel->close(); |
||||
| 231 | } |
||||
| 232 | if ($this->connection->isConnected()) { |
||||
| 233 | try { |
||||
| 234 | $this->connection->close(); |
||||
| 235 | } catch (\Exception $exception) { |
||||
| 236 | throw $exception; |
||||
| 237 | } |
||||
| 238 | } |
||||
| 239 | } |
||||
| 240 | |||||
| 241 | /** |
||||
| 242 | * @return \PhpAmqpLib\Channel\AMQPChannel |
||||
| 243 | */ |
||||
| 244 | public function getChannel() |
||||
| 245 | { |
||||
| 246 | if (is_null($this->channel)) { |
||||
| 247 | $this->channel = $this->getConnection()->channel(); |
||||
| 248 | } |
||||
| 249 | return $this->channel; |
||||
| 250 | } |
||||
| 251 | |||||
| 252 | /** |
||||
| 253 | * Retrieve the connection alias name |
||||
| 254 | * |
||||
| 255 | * @return string |
||||
| 256 | */ |
||||
| 257 | public function getAliasName(): string |
||||
| 258 | { |
||||
| 259 | return $this->aliasName; |
||||
| 260 | } |
||||
| 261 | |||||
| 262 | |||||
| 263 | /** |
||||
| 264 | * getUser |
||||
| 265 | * |
||||
| 266 | * @return string |
||||
| 267 | */ |
||||
| 268 | private function getUser() |
||||
| 269 | { |
||||
| 270 | $t = '0:' . $this->resourceOwnerId . ':' . $this->accessKey; |
||||
| 271 | return base64_encode($t); |
||||
| 272 | } |
||||
| 273 | |||||
| 274 | /** |
||||
| 275 | * getPassword |
||||
| 276 | * |
||||
| 277 | * @return string |
||||
| 278 | */ |
||||
| 279 | private function getPassword() |
||||
| 280 | { |
||||
| 281 | $ts = (int)(microtime(true) * 1000); |
||||
| 282 | $value = utf8_encode($this->accessSecret); |
||||
| 283 | $key = utf8_encode((string)$ts); |
||||
| 284 | $sig = strtoupper(hash_hmac('sha1', $value, $key, false)); |
||||
| 285 | return base64_encode(utf8_encode($sig . ':' . $ts)); |
||||
| 286 | } |
||||
| 287 | } |
||||
| 288 |
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.
This is most likely a typographical error or the method has been renamed.