CMQClient::batch_send_message()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 4
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 8
ccs 0
cts 6
cp 0
crap 2
rs 10
1
<?php
2
3
namespace Freyo\LaravelQueueCMQ\Queue\Driver;
4
5
class CMQClient
6
{
7
    private $host;
8
    private $secretId;
9
    private $secretKey;
10
    private $version;
11
    private $http;
12
    private $method;
13
    private $URISEC = '/v2/index.php';
14
15
    public function __construct($host, $secretId, $secretKey, $version = 'SDK_PHP_1.3', $method = 'POST')
16
    {
17
        $this->process_host($host);
18
        $this->secretId = $secretId;
19
        $this->secretKey = $secretKey;
20
        $this->version = $version;
21
        $this->method = $method;
22
        $this->sign_method = 'HmacSHA1';
0 ignored issues
show
Bug Best Practice introduced by
The property sign_method does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
23
        $this->http = new CMQHttp($host);
24
    }
25
26
    protected function process_host($host)
27
    {
28
        if (strpos($host, 'http://') === 0) {
29
            $_host = substr($host, 7, strlen($host) - 7);
30
        } elseif (strpos($host, 'https://') === 0) {
31
            $_host = substr($host, 8, strlen($host) - 8);
32
        } else {
33
            throw new CMQClientParameterException('Only support http(s) prototol. Invalid endpoint:'.$host);
34
        }
35
        if ($_host[strlen($_host) - 1] == '/') {
36
            $this->host = substr($_host, 0, strlen($_host) - 1);
37
        } else {
38
            $this->host = $_host;
39
        }
40
    }
41
42
    public function set_sign_method($sign_method = 'sha1')
43
    {
44
        if ($sign_method == 'sha1' || $sign_method == 'HmacSHA256') {
45
            $this->sign_method = 'HmacSHA1';
0 ignored issues
show
Bug Best Practice introduced by
The property sign_method does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
46
        } elseif ($sign_method == 'sha256') {
47
            $this->sign_method = 'HmacSHA256';
48
        } else {
49
            throw new CMQClientParameterException('Only support sign method HmasSHA256 or HmacSHA1 . Invalid sign method:'.$sign_method);
50
        }
51
    }
52
53
    public function set_method($method = 'POST')
54
    {
55
        $this->method = $method;
56
    }
57
58
    public function set_connection_timeout($connection_timeout)
59
    {
60
        $this->http->set_connection_timeout($connection_timeout);
61
    }
62
63
    public function set_keep_alive($keep_alive)
64
    {
65
        $this->http->set_keep_alive($keep_alive);
66
    }
67
68
    public function create_queue($params)
69
    {
70
        $resp_inter = $this->request('CreateQueue', $params);
71
        $this->check_status($resp_inter);
72
    }
73
74
    protected function request($action, $params)
75
    {
76
        // make request internal
77
        $req_inter = new RequestInternal($this->method, $this->URISEC);
78
        $this->build_req_inter($action, $params, $req_inter);
79
80
        $iTimeout = 0;
81
82
        if (array_key_exists('UserpollingWaitSeconds', $params)) {
83
            $iTimeout = (int) $params['UserpollingWaitSeconds'];
84
        }
85
        // send request
86
        $resp_inter = $this->http->send_request($req_inter, $iTimeout);
87
88
        return $resp_inter;
89
    }
90
91
    protected function build_req_inter($action, $params, &$req_inter)
92
    {
93
        $_params = $params;
94
        $_params['Action'] = ucfirst($action);
95
        $_params['RequestClient'] = $this->version;
96
97
        if (!isset($_params['SecretId'])) {
98
            $_params['SecretId'] = $this->secretId;
99
        }
100
101
        if (!isset($_params['Nonce'])) {
102
            $_params['Nonce'] = (int)(microtime(TRUE) * 1000) + rand(500000, 100000000);
103
        }
104
105
        if (!isset($_params['Timestamp'])) {
106
            $_params['Timestamp'] = time();
107
        }
108
109
        if (!isset($_params['SignatureMethod'])) {
110
            $_params['SignatureMethod'] = $this->sign_method;
111
        }
112
113
        $plainText = Signature::makeSignPlainText($_params,
114
            $this->method, $this->host, $req_inter->uri);
115
        $_params['Signature'] = Signature::sign($plainText, $this->secretKey, $this->sign_method);
116
117
        $req_inter->data = http_build_query($_params);
118
        $this->build_header($req_inter);
119
    }
120
121
    protected function build_header(&$req_inter)
122
    {
123
        if ($this->http->is_keep_alive()) {
124
            $req_inter->header[] = 'Connection: Keep-Alive';
125
        }
126
127
        $req_inter->header[] = 'Expect:';
128
    }
129
130
    //===============================================queue operation===============================================
131
132
    protected function check_status($resp_inter)
133
    {
134
        if ($resp_inter->status != 200) {
135
            throw new CMQServerNetworkException($resp_inter->status, $resp_inter->header, $resp_inter->data);
136
        }
137
138
        $resp = json_decode($resp_inter->data, true);
139
140
        $code = $resp['code'];
141
        $message = $resp['message'];
142
        $requestId = isset($resp['requestId']) ? $resp['requestId'] : null;
143
144
        if ($code != 0) {
145
            throw new CMQServerException($message, $requestId, $code, $resp);
146
        }
147
    }
148
149
    public function delete_queue($params)
150
    {
151
        $resp_inter = $this->request('DeleteQueue', $params);
152
        $this->check_status($resp_inter);
153
    }
154
155
    public function rewindQueue($params)
156
    {
157
        $resp_inter = $this->request('RewindQueue', $params);
158
        $this->check_status($resp_inter);
159
    }
160
161
    public function list_queue($params)
162
    {
163
        $resp_inter = $this->request('ListQueue', $params);
164
        $this->check_status($resp_inter);
165
166
        $ret = json_decode($resp_inter->data, true);
167
168
        return $ret;
169
    }
170
171
    public function set_queue_attributes($params)
172
    {
173
        $resp_inter = $this->request('SetQueueAttributes', $params);
174
        $this->check_status($resp_inter);
175
    }
176
177
    public function get_queue_attributes($params)
178
    {
179
        $resp_inter = $this->request('GetQueueAttributes', $params);
180
        $this->check_status($resp_inter);
181
182
        $ret = json_decode($resp_inter->data, true);
183
184
        return $ret;
185
    }
186
187
    public function send_message($params)
188
    {
189
        $resp_inter = $this->request('SendMessage', $params);
190
        $this->check_status($resp_inter);
191
192
        $ret = json_decode($resp_inter->data, true);
193
194
        return $ret['msgId'];
195
    }
196
197
    public function batch_send_message($params)
198
    {
199
        $resp_inter = $this->request('BatchSendMessage', $params);
200
        $this->check_status($resp_inter);
201
202
        $ret = json_decode($resp_inter->data, true);
203
204
        return $ret['msgList'];
205
    }
206
207
    public function receive_message($params)
208
    {
209
        $resp_inter = $this->request('ReceiveMessage', $params);
210
        $this->check_status($resp_inter);
211
212
        $ret = json_decode($resp_inter->data, true);
213
214
        return $ret;
215
    }
216
217
    public function batch_receive_message($params)
218
    {
219
        $resp_inter = $this->request('BatchReceiveMessage', $params);
220
        $this->check_status($resp_inter);
221
222
        $ret = json_decode($resp_inter->data, true);
223
224
        return $ret['msgInfoList'];
225
    }
226
227
    public function delete_message($params)
228
    {
229
        $resp_inter = $this->request('DeleteMessage', $params);
230
        $this->check_status($resp_inter);
231
    }
232
233
    public function batch_delete_message($params)
234
    {
235
        $resp_inter = $this->request('BatchDeleteMessage', $params);
236
        $this->check_status($resp_inter);
237
    }
238
239
    //=============================================topic operation================================================
240
241
    public function create_topic($params)
242
    {
243
        $resp_inter = $this->request('CreateTopic', $params);
244
        $this->check_status($resp_inter);
245
    }
246
247
    public function delete_topic($params)
248
    {
249
        $resp_inter = $this->request('DeleteTopic', $params);
250
        $this->check_status($resp_inter);
251
    }
252
253
    public function list_topic($params)
254
    {
255
        $resp_inter = $this->request('ListTopic', $params);
256
        $this->check_status($resp_inter);
257
        $ret = json_decode($resp_inter->data, true);
258
259
        return $ret;
260
    }
261
262
    public function set_topic_attributes($params)
263
    {
264
        $resp_inter = $this->request('SetTopicAttributes', $params);
265
        $this->check_status($resp_inter);
266
    }
267
268
    public function get_topic_attributes($params)
269
    {
270
        $resp_inter = $this->request('GetTopicAttributes', $params);
271
        $this->check_status($resp_inter);
272
        $ret = json_decode($resp_inter->data, true);
273
274
        return $ret;
275
    }
276
277
    public function publish_message($params)
278
    {
279
        $resp_inter = $this->request('PublishMessage', $params);
280
        $this->check_status($resp_inter);
281
        $ret = json_decode($resp_inter->data, true);
282
283
        return $ret;
284
    }
285
286
    public function batch_publish_message($params)
287
    {
288
        $resp_inter = $this->request('BatchPublishMessage', $params);
289
        $this->check_status($resp_inter);
290
        $ret = json_decode($resp_inter->data, true);
291
292
        return $ret;
293
    }
294
295
    //============================================subscription operation=============================================
296
    public function create_subscription($params)
297
    {
298
        $resp_inter = $this->request('Subscribe', $params);
299
        $this->check_status($resp_inter);
300
    }
301
302
    public function clear_filterTags($params)
303
    {
304
        $resp_inter = $this->request('ClearSubscriptionFilterTags', $params);
305
        $this->check_status($resp_inter);
306
    }
307
308
    public function delete_subscription($params)
309
    {
310
        $resp_inter = $this->request('Unsubscribe', $params);
311
        $this->check_status($resp_inter);
312
    }
313
314
    public function get_subscription_attributes($params)
315
    {
316
        $resp_inter = $this->request('GetSubscriptionAttributes', $params);
317
        $this->check_status($resp_inter);
318
        $ret = json_decode($resp_inter->data, true);
319
320
        return $ret;
321
    }
322
323
    public function set_subscription_attributes($params)
324
    {
325
        $resp_inter = $this->request('SetSubscriptionAttributes', $params);
326
        $this->check_status($resp_inter);
327
    }
328
329
    public function list_subscription($params)
330
    {
331
        $resp_inter = $this->request('ListSubscriptionByTopic', $params);
332
        $this->check_status($resp_inter);
333
        $ret = json_decode($resp_inter->data, true);
334
335
        return $ret;
336
    }
337
}
338