1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Freyo\LaravelQueueCMQ\Queue; |
4
|
|
|
|
5
|
|
|
use Freyo\LaravelQueueCMQ\Queue\Driver\Account; |
6
|
|
|
use Freyo\LaravelQueueCMQ\Queue\Driver\CMQServerException; |
7
|
|
|
use Freyo\LaravelQueueCMQ\Queue\Driver\Message; |
8
|
|
|
use Freyo\LaravelQueueCMQ\Queue\Driver\Topic; |
9
|
|
|
use Freyo\LaravelQueueCMQ\Queue\Jobs\CMQJob; |
10
|
|
|
use Illuminate\Contracts\Queue\Queue as QueueContract; |
11
|
|
|
use Illuminate\Queue\Queue; |
12
|
|
|
use Illuminate\Support\Arr; |
13
|
|
|
|
14
|
|
|
class CMQQueue extends Queue implements QueueContract |
15
|
|
|
{ |
16
|
|
|
const CMQ_QUEUE_NO_MESSAGE_CODE = 7000; |
17
|
|
|
|
18
|
|
|
const CMQ_TOPIC_TAG_FILTER_NAME = 'msgtag'; |
19
|
|
|
const CMQ_TOPIC_ROUTING_FILTER_NAME = 'routing'; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var array |
23
|
|
|
*/ |
24
|
|
|
protected $queueOptions; |
25
|
|
|
protected $topicOptions; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* @var Account |
29
|
|
|
*/ |
30
|
|
|
private $queueAccount; |
31
|
|
|
private $topicAccount; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var array |
35
|
|
|
*/ |
36
|
|
|
protected $plainOptions; |
37
|
|
|
|
38
|
|
|
public function __construct(Account $queueAccount, Account $topicAccount, array $config) |
39
|
|
|
{ |
40
|
|
|
$this->queueAccount = $queueAccount; |
41
|
|
|
$this->topicAccount = $topicAccount; |
42
|
|
|
|
43
|
|
|
$this->queueOptions = $config['options']['queue']; |
44
|
|
|
$this->topicOptions = $config['options']['topic']; |
45
|
|
|
|
46
|
|
|
$this->plainOptions = Arr::get($config, 'plain', []); |
47
|
|
|
} |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @return bool |
51
|
|
|
*/ |
52
|
3 |
|
public function isPlain() |
53
|
|
|
{ |
54
|
3 |
|
return Arr::get($this->plainOptions, 'enable', false); |
55
|
|
|
} |
56
|
|
|
|
57
|
|
|
/** |
58
|
|
|
* @return string |
59
|
|
|
*/ |
60
|
1 |
|
public function getPlainJob() |
61
|
|
|
{ |
62
|
1 |
|
return Arr::get($this->plainOptions, 'job'); |
63
|
|
|
} |
64
|
|
|
|
65
|
|
|
/** |
66
|
|
|
* Get the size of the queue. |
67
|
|
|
* |
68
|
|
|
* @param string $queue |
69
|
|
|
* |
70
|
|
|
* @return int |
71
|
|
|
*/ |
72
|
1 |
|
public function size($queue = null) |
73
|
|
|
{ |
74
|
1 |
|
$attributes = $this->getQueue($queue)->get_attributes(); |
75
|
|
|
|
76
|
1 |
|
return (int) $attributes->activeMsgNum; |
77
|
|
|
} |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* Push a new job onto the queue. |
81
|
|
|
* |
82
|
|
|
* @param string|object $job |
83
|
|
|
* @param mixed $data |
84
|
|
|
* @param string $queue |
85
|
|
|
* |
86
|
|
|
* @return mixed |
87
|
|
|
*/ |
88
|
2 |
|
public function push($job, $data = '', $queue = null) |
89
|
|
|
{ |
90
|
2 |
|
if ($this->isPlain()) { |
91
|
|
|
return $this->pushRaw($job->getPayload(), $queue); |
92
|
|
|
} |
93
|
|
|
|
94
|
1 |
|
$reflection = new \ReflectionMethod($this, 'createPayload'); |
95
|
1 |
|
if ($reflection->getNumberOfParameters() === 3) { // version >= 5.7 |
96
|
1 |
|
$payload = $this->createPayload($job, $queue, $data); |
97
|
1 |
|
} else { |
98
|
|
|
$payload = $this->createPayload($job, $data); |
99
|
|
|
} |
100
|
|
|
|
101
|
1 |
|
return $this->pushRaw($payload, $queue); |
102
|
|
|
} |
103
|
|
|
|
104
|
|
|
/** |
105
|
|
|
* Push a raw payload onto the queue. |
106
|
|
|
* |
107
|
|
|
* @param string $payload |
108
|
|
|
* @param string $queue |
109
|
|
|
* @param array $options |
110
|
|
|
* |
111
|
|
|
* @return mixed |
112
|
|
|
*/ |
113
|
3 |
|
public function pushRaw($payload, $queue = null, array $options = []) |
114
|
|
|
{ |
115
|
3 |
|
$message = new Message($payload); |
116
|
|
|
|
117
|
3 |
|
$driver = $this->parseQueue($queue); |
118
|
|
|
|
119
|
3 |
|
if ($driver instanceof Topic) { |
120
|
|
|
$vTagList = []; |
121
|
|
|
if ($this->topicOptions['filter'] === self::CMQ_TOPIC_TAG_FILTER_NAME) { |
122
|
|
|
$vTagList = explode(',', $queue); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
$routingKey = null; |
126
|
|
|
if ($this->topicOptions['filter'] === self::CMQ_TOPIC_ROUTING_FILTER_NAME) { |
127
|
|
|
$routingKey = $queue; |
128
|
|
|
} |
129
|
|
|
|
130
|
|
|
return $driver->publish_message($message->msgBody, $vTagList, $routingKey); |
131
|
|
|
} |
132
|
|
|
|
133
|
3 |
|
return $driver->send_message($message, array_get($options, 'delay', 0)); |
|
|
|
|
134
|
|
|
} |
135
|
|
|
|
136
|
|
|
/** |
137
|
|
|
* Push a new job onto the queue after a delay. |
138
|
|
|
* |
139
|
|
|
* @param \DateTimeInterface|\DateInterval|int $delay |
140
|
|
|
* @param string|object $job |
141
|
|
|
* @param mixed $data |
142
|
|
|
* @param string $queue |
143
|
|
|
* |
144
|
|
|
* @return mixed |
145
|
|
|
*/ |
146
|
1 |
|
public function later($delay, $job, $data = '', $queue = null) |
147
|
|
|
{ |
148
|
1 |
|
$payload = $this->isPlain() ? $job->getPayload() : $this->createPayload($job, $data); |
149
|
|
|
|
150
|
1 |
|
$delay = method_exists($this, 'getSeconds') |
151
|
1 |
|
? $this->getSeconds($delay) |
152
|
1 |
|
: $this->secondsUntil($delay); |
153
|
|
|
|
154
|
1 |
|
return $this->pushRaw($payload, $queue, ['delay' => $delay]); |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
/** |
158
|
|
|
* Pop the next job off of the queue. |
159
|
|
|
* |
160
|
|
|
* @param string $queue |
161
|
|
|
* |
162
|
|
|
* @return \Illuminate\Contracts\Queue\Job|null |
163
|
|
|
*/ |
164
|
1 |
|
public function pop($queue = null) |
165
|
|
|
{ |
166
|
|
|
try { |
167
|
1 |
|
$queue = $this->getQueue($queue); |
168
|
1 |
|
$message = $queue->receive_message($this->queueOptions['polling_wait_seconds']); |
169
|
1 |
|
} catch (CMQServerException $e) { |
170
|
|
|
if ($e->getCode() == self::CMQ_QUEUE_NO_MESSAGE_CODE) { //ignore no message |
171
|
|
|
return; |
172
|
|
|
} |
173
|
|
|
|
174
|
|
|
throw $e; |
175
|
|
|
} |
176
|
|
|
|
177
|
1 |
|
return new CMQJob($this->container, $this, $message, $queue); |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
/** |
181
|
|
|
* Get the queue. |
182
|
|
|
* |
183
|
|
|
* @param string $queue |
184
|
|
|
* |
185
|
|
|
* @return Driver\Queue |
186
|
|
|
*/ |
187
|
7 |
|
public function getQueue($queue = null) |
188
|
1 |
|
{ |
189
|
7 |
|
return $this->queueAccount->get_queue($queue ?: $this->queueOptions['name']); |
190
|
|
|
} |
191
|
|
|
|
192
|
|
|
/** |
193
|
|
|
* Get the topic. |
194
|
|
|
* |
195
|
|
|
* @param string $topic |
196
|
|
|
* |
197
|
|
|
* @return Driver\Topic |
198
|
|
|
*/ |
199
|
1 |
|
public function getTopic($topic = null) |
200
|
|
|
{ |
201
|
1 |
|
return $this->topicAccount->get_topic($topic ?: $this->topicOptions['name']); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* Parse name to topic or queue. |
206
|
|
|
* |
207
|
|
|
* @param string $queue |
208
|
|
|
* |
209
|
|
|
* @return Driver\Queue|Driver\Topic |
210
|
|
|
*/ |
211
|
4 |
|
public function parseQueue($queue = null) |
212
|
|
|
{ |
213
|
4 |
|
if ($this->topicOptions['enable']) { |
214
|
|
|
$exchangeName = $this->topicOptions['name'] ?: $queue; |
215
|
|
|
|
216
|
|
|
return $this->getTopic($exchangeName); |
217
|
|
|
} |
218
|
|
|
|
219
|
4 |
|
$queueName = $queue ?: $this->queueOptions['name']; |
220
|
|
|
|
221
|
4 |
|
return $this->getQueue($queueName); |
222
|
|
|
} |
223
|
|
|
} |
224
|
|
|
|
This function has been deprecated. The supplier of the function has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the function will be removed and what other function to use instead.