Test Failed
Push — master ( ea270b...d636ed )
by yufei
06:02 queued 02:40
created

AMQPConnection::reconnect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nc 1
nop 0
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp;
8
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Connection\AbstractConnection;
11
use PhpAmqpLib\Connection\AMQPSocketConnection;
12
use PhpAmqpLib\Connection\AMQPStreamConnection;
13
14
/**
15
 * Class AMQPConnection
16
 *
17
 * @package JokerProject\LaravelAliyunAmqp
18
 */
19
class AMQPConnection
20
{
21
    /**
22
     * @const array Default connections parameters
23
     */
24
    const DEFAULTS = [
25
        'hostname' => '127.0.0.1',
26
        'port' => 5672,
27
        'username' => 'guest',
28
        'password' => 'guest',
29
        'vhost' => '/',
30
31
        # whether the connection should be lazy
32
        'lazy' => true,
33
34
        # More info about timeouts can be found on https://www.rabbitmq.com/networking.html
35
        'read_write_timeout' => 3,   // default timeout for writing/reading (in seconds)
36
        'connect_timeout' => 3,
37
        'heartbeat' => 0,
38
        'keep_alive' => false,
39
        'access_key' => '',
40
        'access_secret' => '',
41
        'resource_owner_id' => '',
42
    ];
43
44
    /**
45
     * @var array
46
     */
47
    protected $connectionDetails = [];
48
49
    /**
50
     * @var string
51
     */
52
    protected $aliasName = '';
53
54
    /**
55
     * @var null|AbstractConnection
56
     */
57
    private $connection = null;
58
59
    /**
60
     * @var null|AMQPChannel
61
     */
62
    private $channel = null;
63
64
    /**
65
     * @var string
66
     */
67
    private $accessKey = '';
68
69
    /**
70
     * @var string
71
     */
72
    private $accessSecret = '';
73
74
    /**
75
     * @var string
76
     */
77
    private $resourceOwnerId = '';
78
79
    /**
80
     * @param string $aliasName
81
     * @param array  $connectionDetails
82
     *
83
     * @return AMQPConnection
84
     */
85
    public static function createConnection(string $aliasName, array $connectionDetails)
86
    {
87
        if ($diff = array_diff(array_keys($connectionDetails), array_keys(self::DEFAULTS))) {
88
            throw new \InvalidArgumentException(
89
                sprintf(
90
                    "Cannot create connection %s, received unknown arguments: %s!",
91
                    (string)$aliasName,
92
                    implode(', ', $diff)
93
                )
94
            );
95
        }
96
        return new static(
97
            $aliasName,
98
            array_merge(self::DEFAULTS, $connectionDetails)
99
        );
100
    }
101
102
    /**
103
     * AMQPConnection constructor.
104
     *
105
     * @param string $aliasName
106
     * @param array  $connectionDetails
107
     * @param string $accessKey
108
     * @param string $accessSecret
109
     * @param string $resourceOwnerId
110
     */
111
    public function __construct(
112
        string $aliasName,
113
        array $connectionDetails = []
114
    ) {
115
        $this->aliasName = $aliasName;
116
        if (array_key_exists('access_key', $connectionDetails)) {
117
            $this->accessKey = $connectionDetails['access_key'];
118
            $this->accessSecret = $connectionDetails['access_secret'];
119
            $this->resourceOwnerId = $connectionDetails['resource_owner_id'];
120
            if ($connectionDetails['access_key'] != ''
121
                && $connectionDetails['access_secret'] != ''
122
                && $connectionDetails['resource_owner_id'] != ''
123
            ) {
124
                $connectionDetails['username'] = $this->getUser();
125
                $connectionDetails['password'] = $this->getPassword();
126
            }
127
        }
128
        $this->connectionDetails = $connectionDetails;
129
        if (isset($connectionDetails['lazy']) && $connectionDetails['lazy'] === false) {
130
            $this->getConnection();
131
        }
132
    }
133
134
    /**
135
     * @return AbstractConnection
136
     */
137
    protected function getConnection(): AbstractConnection
138
    {
139
        if (is_null($this->connection)) {
140
            if (!isset($this->connection['type'])) {
141
                $this->connection['type'] = AMQPStreamConnection::class;
142
            }
143
            switch ($this->connection['type']) {
144
                case AMQPStreamConnection::class:
145
                case 'stream':
146
                    $type = AMQPStreamConnection::class;
147
                    break;
148
                default:
149
                    $type = AMQPSocketConnection::class;
150
            }
151
152
            $this->connection = $this->createConnectionByType($type);
153
        }
154
        return $this->connection;
155
    }
156
157
    /**
158
     * @param $type
159
     *
160
     * @return mixed
161
     */
162
    private function createConnectionByType($type)
163
    {
164
        return new $type(
165
            $this->connectionDetails['hostname'],
166
            $this->connectionDetails['port'],
167
            $this->connectionDetails['username'],
168
            $this->connectionDetails['password'],
169
            $this->connectionDetails['vhost'],
170
            /** insist */
171
            false,
172
            /** login method */
173
            'AMQPLAIN',
174
            /** login_response */
175
            null,
176
            /** locale */
177
            'en_US',
178
            $this->connectionDetails['connect_timeout'],
179
            $this->connectionDetails['read_write_timeout'],
180
            null,
181
            $this->connectionDetails['keep_alive'],
182
            $this->connectionDetails['heartbeat']
183
        );
184
    }
185
186
    /**
187
     * Reconnect
188
     */
189
    public function reconnect()
190
    {
191
        $this->getConnection()->channel()->close();
192
        $this->channel = null;
193
        $this->getConnection()->reconnect();
194
    }
195
196
    /**
197
     * @return \PhpAmqpLib\Channel\AMQPChannel
198
     */
199
    public function getChannel()
200
    {
201
        if (is_null($this->channel)) {
202
            $this->channel = $this->getConnection()->channel();
203
        }
204
        return $this->channel;
205
    }
206
207
    /**
208
     * Retrieve the connection alias name
209
     *
210
     * @return string
211
     */
212
    public function getAliasName(): string
213
    {
214
        return $this->aliasName;
215
    }
216
217
218
    /**
219
     * getUser
220
     *
221
     * @return string
222
     */
223
    private function getUser()
224
    {
225
        $t = '0:' . $this->resourceOwnerId . ':' . $this->accessKey;
226
        return base64_encode($t);
227
    }
228
229
    /**
230
     * getPassword
231
     *
232
     * @return string
233
     */
234
    private function getPassword()
235
    {
236
        $ts = (int)(microtime(true) * 1000);
237
        $value = utf8_encode($this->accessSecret);
238
        $key = utf8_encode((string)$ts);
239
        $sig = strtoupper(hash_hmac('sha1', $value, $key, false));
240
        return base64_encode(utf8_encode($sig . ':' . $ts));
241
    }
242
}
243