| 1 | <?php |
||
| 2 | |||
| 3 | namespace Freyo\LaravelQueueCMQ\Queue\Driver; |
||
| 4 | |||
| 5 | class Queue |
||
| 6 | { |
||
| 7 | private $queue_name; |
||
| 8 | private $cmq_client; |
||
| 9 | private $encoding; |
||
| 10 | |||
| 11 | public function __construct($queue_name, CMQClient $cmq_client, $encoding = false) |
||
| 12 | { |
||
| 13 | $this->queue_name = $queue_name; |
||
| 14 | $this->cmq_client = $cmq_client; |
||
| 15 | $this->encoding = $encoding; |
||
| 16 | } |
||
| 17 | |||
| 18 | /** |
||
| 19 | * @return mixed |
||
| 20 | */ |
||
| 21 | public function getQueueName() |
||
| 22 | { |
||
| 23 | return $this->queue_name; |
||
| 24 | } |
||
| 25 | |||
| 26 | /* 设置是否对消息体进行base64编码 |
||
| 27 | |||
| 28 | @type encoding: bool |
||
| 29 | @param encoding: 是否对消息体进行base64编码 |
||
|
0 ignored issues
–
show
Documentation
Bug
introduced
by
Loading history...
|
|||
| 30 | */ |
||
| 31 | public function set_encoding($encoding) |
||
| 32 | { |
||
| 33 | $this->encoding = $encoding; |
||
| 34 | } |
||
| 35 | |||
| 36 | /* 创建队列 |
||
| 37 | |||
| 38 | @type queue_meta: QueueMeta object |
||
| 39 | @param queue_meta: QueueMeta对象,设置队列的属性 |
||
|
0 ignored issues
–
show
|
|||
| 40 | */ |
||
| 41 | public function create($queue_meta) |
||
| 42 | { |
||
| 43 | $params = [ |
||
| 44 | 'queueName' => $this->queue_name, |
||
| 45 | 'pollingWaitSeconds' => $queue_meta->pollingWaitSeconds, |
||
| 46 | 'visibilityTimeout' => $queue_meta->visibilityTimeout, |
||
| 47 | 'maxMsgSize' => $queue_meta->maxMsgSize, |
||
| 48 | 'msgRetentionSeconds' => $queue_meta->msgRetentionSeconds, |
||
| 49 | 'rewindSeconds' => $queue_meta->rewindSeconds, |
||
| 50 | ]; |
||
| 51 | if ($queue_meta->maxMsgHeapNum > 0) { |
||
| 52 | $params['maxMsgHeapNum'] = $queue_meta->maxMsgHeapNum; |
||
| 53 | } |
||
| 54 | $this->cmq_client->create_queue($params); |
||
| 55 | } |
||
| 56 | |||
| 57 | /* 获取队列属性 |
||
| 58 | |||
| 59 | @rtype: QueueMeta object |
||
| 60 | @return 队列的属性 |
||
|
0 ignored issues
–
show
|
|||
| 61 | */ |
||
| 62 | public function get_attributes() |
||
| 63 | { |
||
| 64 | $params = [ |
||
| 65 | 'queueName' => $this->queue_name, |
||
| 66 | ]; |
||
| 67 | $resp = $this->cmq_client->get_queue_attributes($params); |
||
| 68 | $queue_meta = new QueueMeta(); |
||
| 69 | $queue_meta->queueName = $this->queue_name; |
||
| 70 | $this->__resp2meta__($queue_meta, $resp); |
||
| 71 | |||
| 72 | return $queue_meta; |
||
| 73 | } |
||
| 74 | |||
| 75 | /* 设置队列属性 |
||
| 76 | |||
| 77 | @type queue_meta: QueueMeta object |
||
| 78 | @param queue_meta: QueueMeta对象,设置队列的属性 |
||
|
0 ignored issues
–
show
|
|||
| 79 | */ |
||
| 80 | |||
| 81 | protected function __resp2meta__($queue_meta, $resp) |
||
| 82 | { |
||
| 83 | if (isset($resp['queueName'])) { |
||
| 84 | $queue_meta->queueName = $resp['queueName']; |
||
| 85 | } |
||
| 86 | if (isset($resp['maxMsgHeapNum'])) { |
||
| 87 | $queue_meta->maxMsgHeapNum = $resp['maxMsgHeapNum']; |
||
| 88 | } |
||
| 89 | if (isset($resp['pollingWaitSeconds'])) { |
||
| 90 | $queue_meta->pollingWaitSeconds = $resp['pollingWaitSeconds']; |
||
| 91 | } |
||
| 92 | if (isset($resp['visibilityTimeout'])) { |
||
| 93 | $queue_meta->visibilityTimeout = $resp['visibilityTimeout']; |
||
| 94 | } |
||
| 95 | if (isset($resp['maxMsgSize'])) { |
||
| 96 | $queue_meta->maxMsgSize = $resp['maxMsgSize']; |
||
| 97 | } |
||
| 98 | if (isset($resp['msgRetentionSeconds'])) { |
||
| 99 | $queue_meta->msgRetentionSeconds = $resp['msgRetentionSeconds']; |
||
| 100 | } |
||
| 101 | if (isset($resp['createTime'])) { |
||
| 102 | $queue_meta->createTime = $resp['createTime']; |
||
| 103 | } |
||
| 104 | if (isset($resp['lastModifyTime'])) { |
||
| 105 | $queue_meta->lastModifyTime = $resp['lastModifyTime']; |
||
| 106 | } |
||
| 107 | if (isset($resp['activeMsgNum'])) { |
||
| 108 | $queue_meta->activeMsgNum = $resp['activeMsgNum']; |
||
| 109 | } |
||
| 110 | if (isset($resp['rewindSeconds'])) { |
||
| 111 | $queue_meta->rewindSeconds = $resp['rewindSeconds']; |
||
| 112 | } |
||
| 113 | if (isset($resp['inactiveMsgNum'])) { |
||
| 114 | $queue_meta->inactiveMsgNum = $resp['inactiveMsgNum']; |
||
| 115 | } |
||
| 116 | if (isset($resp['rewindmsgNum'])) { |
||
| 117 | $queue_meta->rewindmsgNum = $resp['rewindmsgNum']; |
||
| 118 | } |
||
| 119 | if (isset($resp['minMsgTime'])) { |
||
| 120 | $queue_meta->minMsgTime = $resp['minMsgTime']; |
||
| 121 | } |
||
| 122 | if (isset($resp['delayMsgNum'])) { |
||
| 123 | $queue_meta->delayMsgNum = $resp['delayMsgNum']; |
||
| 124 | } |
||
| 125 | } |
||
| 126 | |||
| 127 | public function set_attributes($queue_meta) |
||
| 128 | { |
||
| 129 | $params = [ |
||
| 130 | 'queueName' => $this->queue_name, |
||
| 131 | 'pollingWaitSeconds' => $queue_meta->pollingWaitSeconds, |
||
| 132 | 'visibilityTimeout' => $queue_meta->visibilityTimeout, |
||
| 133 | 'maxMsgSize' => $queue_meta->maxMsgSize, |
||
| 134 | 'msgRetentionSeconds' => $queue_meta->msgRetentionSeconds, |
||
| 135 | 'rewindSeconds' => $queue_meta->rewindSeconds, |
||
| 136 | ]; |
||
| 137 | if ($queue_meta->maxMsgHeapNum > 0) { |
||
| 138 | $params['maxMsgHeapNum'] = $queue_meta->maxMsgHeapNum; |
||
| 139 | } |
||
| 140 | |||
| 141 | $this->cmq_client->set_queue_attributes($params); |
||
| 142 | } |
||
| 143 | |||
| 144 | /* 删除队列 |
||
| 145 | |||
| 146 | */ |
||
| 147 | |||
| 148 | public function rewindQueue($backTrackingTime) |
||
| 149 | { |
||
| 150 | $params = [ |
||
| 151 | 'queueName' => $this->queue_name, |
||
| 152 | 'startConsumeTime' => $backTrackingTime, |
||
| 153 | ]; |
||
| 154 | $this->cmq_client->rewindQueue($params); |
||
| 155 | } |
||
| 156 | |||
| 157 | /* 发送消息 |
||
| 158 | |||
| 159 | @type message: Message object |
||
| 160 | @param message: 发送的Message object |
||
|
0 ignored issues
–
show
|
|||
| 161 | |||
| 162 | @rtype: Message object |
||
| 163 | @return 消息发送成功的返回属性,包含MessageId |
||
|
0 ignored issues
–
show
|
|||
| 164 | |||
| 165 | */ |
||
| 166 | |||
| 167 | public function delete() |
||
| 168 | { |
||
| 169 | $params = ['queueName' => $this->queue_name]; |
||
| 170 | $this->cmq_client->delete_queue($params); |
||
| 171 | } |
||
| 172 | |||
| 173 | /* 批量发送消息 |
||
| 174 | |||
| 175 | @type messages: list of Message object |
||
| 176 | @param messages: 发送的Message object list |
||
|
0 ignored issues
–
show
|
|||
| 177 | |||
| 178 | @rtype: list of Message object |
||
| 179 | @return 多条消息发送成功的返回属性,包含MessageId |
||
|
0 ignored issues
–
show
|
|||
| 180 | */ |
||
| 181 | |||
| 182 | public function send_message($message, $delayTime = 0) |
||
| 183 | { |
||
| 184 | if ($this->encoding) { |
||
| 185 | $msgBody = base64_encode($message->msgBody); |
||
| 186 | } else { |
||
| 187 | $msgBody = $message->msgBody; |
||
| 188 | } |
||
| 189 | $params = [ |
||
| 190 | 'queueName' => $this->queue_name, |
||
| 191 | 'msgBody' => $msgBody, |
||
| 192 | 'delaySeconds' => $delayTime, |
||
| 193 | ]; |
||
| 194 | $msgId = $this->cmq_client->send_message($params); |
||
| 195 | $retmsg = new Message(); |
||
| 196 | $retmsg->msgId = $msgId; |
||
| 197 | |||
| 198 | return $retmsg; |
||
| 199 | } |
||
| 200 | |||
| 201 | /* 消费消息 |
||
| 202 | |||
| 203 | @type polling_wait_seconds: int |
||
| 204 | @param polling_wait_seconds: 本次请求的长轮询时间,单位:秒 |
||
|
0 ignored issues
–
show
|
|||
| 205 | |||
| 206 | @rtype: Message object |
||
| 207 | @return Message object中包含基本属性、临时句柄 |
||
| 208 | */ |
||
| 209 | |||
| 210 | public function batch_send_message($messages, $delayTime = 0) |
||
| 211 | { |
||
| 212 | $params = [ |
||
| 213 | 'queueName' => $this->queue_name, |
||
| 214 | 'delaySeconds' => $delayTime, |
||
| 215 | ]; |
||
| 216 | $n = 1; |
||
| 217 | foreach ($messages as $message) { |
||
| 218 | $key = 'msgBody.'.$n; |
||
| 219 | if ($this->encoding) { |
||
| 220 | $params[$key] = base64_encode($message->msgBody); |
||
| 221 | } else { |
||
| 222 | $params[$key] = $message->msgBody; |
||
| 223 | } |
||
| 224 | $n += 1; |
||
| 225 | } |
||
| 226 | $msgList = $this->cmq_client->batch_send_message($params); |
||
| 227 | $retMessageList = []; |
||
| 228 | foreach ($msgList as $msg) { |
||
| 229 | $retmsg = new Message(); |
||
| 230 | $retmsg->msgId = $msg['msgId']; |
||
| 231 | $retMessageList[] = $retmsg; |
||
| 232 | } |
||
| 233 | |||
| 234 | return $retMessageList; |
||
|
0 ignored issues
–
show
|
|||
| 235 | } |
||
| 236 | |||
| 237 | /* 批量消费消息 |
||
| 238 | |||
| 239 | @type num_of_msg: int |
||
| 240 | @param num_of_msg: 本次请求最多获取的消息条数 |
||
|
0 ignored issues
–
show
|
|||
| 241 | |||
| 242 | @type polling_wait_seconds: int |
||
| 243 | @param polling_wait_seconds: 本次请求的长轮询时间,单位:秒 |
||
| 244 | |||
| 245 | @rtype: list of Message object |
||
| 246 | @return 多条消息的属性,包含消息的基本属性、临时句柄 |
||
|
0 ignored issues
–
show
|
|||
| 247 | */ |
||
| 248 | |||
| 249 | public function receive_message($polling_wait_seconds = null) |
||
| 250 | { |
||
| 251 | $params = ['queueName' => $this->queue_name]; |
||
| 252 | if ($polling_wait_seconds !== null) { |
||
| 253 | $params['UserpollingWaitSeconds'] = $polling_wait_seconds; |
||
| 254 | $params['pollingWaitSeconds'] = $polling_wait_seconds; |
||
| 255 | } else { |
||
| 256 | $params['UserpollingWaitSeconds'] = 30; |
||
| 257 | } |
||
| 258 | $resp = $this->cmq_client->receive_message($params); |
||
| 259 | $msg = new Message(); |
||
| 260 | if ($this->encoding) { |
||
| 261 | $msg->msgBody = base64_decode($resp['msgBody']); |
||
| 262 | } else { |
||
| 263 | $msg->msgBody = $resp['msgBody']; |
||
| 264 | } |
||
| 265 | $msg->msgId = $resp['msgId']; |
||
| 266 | $msg->receiptHandle = $resp['receiptHandle']; |
||
| 267 | $msg->enqueueTime = $resp['enqueueTime']; |
||
| 268 | $msg->nextVisibleTime = $resp['nextVisibleTime']; |
||
|
0 ignored issues
–
show
|
|||
| 269 | $msg->dequeueCount = $resp['dequeueCount']; |
||
|
0 ignored issues
–
show
|
|||
| 270 | $msg->firstDequeueTime = $resp['firstDequeueTime']; |
||
|
0 ignored issues
–
show
|
|||
| 271 | |||
| 272 | return $msg; |
||
| 273 | } |
||
| 274 | |||
| 275 | /* 删除消息 |
||
| 276 | |||
| 277 | @type receipt_handle: string |
||
| 278 | @param receipt_handle: 最近一次操作该消息返回的临时句柄 |
||
|
0 ignored issues
–
show
|
|||
| 279 | */ |
||
| 280 | |||
| 281 | public function batch_receive_message($num_of_msg, $polling_wait_seconds = null) |
||
| 282 | { |
||
| 283 | $params = ['queueName' => $this->queue_name, 'numOfMsg' => $num_of_msg]; |
||
| 284 | if ($polling_wait_seconds != null) { |
||
| 285 | $params['UserpollingWaitSeconds'] = $polling_wait_seconds; |
||
| 286 | $params['pollingWaitSeconds'] = $polling_wait_seconds; |
||
| 287 | } else { |
||
| 288 | $params['UserpollingWaitSeconds'] = 30; |
||
| 289 | } |
||
| 290 | $msgInfoList = $this->cmq_client->batch_receive_message($params); |
||
| 291 | $retMessageList = []; |
||
| 292 | foreach ($msgInfoList as $msg) { |
||
| 293 | $retmsg = new Message(); |
||
| 294 | if ($this->encoding) { |
||
| 295 | $retmsg->msgBody = base64_decode($msg['msgBody']); |
||
| 296 | } else { |
||
| 297 | $retmsg->msgBody = $msg['msgBody']; |
||
| 298 | } |
||
| 299 | $retmsg->msgId = $msg['msgId']; |
||
| 300 | $retmsg->receiptHandle = $msg['receiptHandle']; |
||
| 301 | $retmsg->enqueueTime = $msg['enqueueTime']; |
||
| 302 | $retmsg->nextVisibleTime = $msg['nextVisibleTime']; |
||
|
0 ignored issues
–
show
|
|||
| 303 | $retmsg->dequeueCount = $msg['dequeueCount']; |
||
|
0 ignored issues
–
show
|
|||
| 304 | $retmsg->firstDequeueTime = $msg['firstDequeueTime']; |
||
|
0 ignored issues
–
show
|
|||
| 305 | $retMessageList[] = $retmsg; |
||
| 306 | } |
||
| 307 | |||
| 308 | return $retMessageList; |
||
| 309 | } |
||
| 310 | |||
| 311 | /* 批量删除消息 |
||
| 312 | |||
| 313 | @type receipt_handle_list: list |
||
| 314 | @param receipt_handle_list: batch_receive_message返回的多条消息的临时句柄 |
||
|
0 ignored issues
–
show
|
|||
| 315 | */ |
||
| 316 | |||
| 317 | public function delete_message($receipt_handle) |
||
| 318 | { |
||
| 319 | $params = ['queueName' => $this->queue_name, 'receiptHandle' => $receipt_handle]; |
||
| 320 | $this->cmq_client->delete_message($params); |
||
| 321 | } |
||
| 322 | |||
| 323 | public function batch_delete_message($receipt_handle_list) |
||
| 324 | { |
||
| 325 | $params = ['queueName' => $this->queue_name]; |
||
| 326 | $n = 1; |
||
| 327 | foreach ($receipt_handle_list as $receipt_handle) { |
||
| 328 | $key = 'receiptHandle.'.$n; |
||
| 329 | $params[$key] = $receipt_handle; |
||
| 330 | $n += 1; |
||
| 331 | } |
||
| 332 | $this->cmq_client->batch_delete_message($params); |
||
| 333 | } |
||
| 334 | } |
||
| 335 |