1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bernard\Driver\IronMQ; |
4
|
|
|
|
5
|
|
|
use Bernard\Driver\AbstractPrefetchDriver; |
6
|
|
|
use IronMQ\IronMQ; |
7
|
|
|
|
8
|
|
|
/** |
9
|
|
|
* Implements a Driver for use with Iron MQ: |
10
|
|
|
* https://github.com/iron-io/iron_mq_php. |
11
|
|
|
*/ |
12
|
|
|
final class Driver extends AbstractPrefetchDriver |
13
|
|
|
{ |
14
|
|
|
private $ironmq; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* @param IronMQ $ironmq |
18
|
|
|
* @param int|null $prefetch |
19
|
|
|
*/ |
20
|
10 |
|
public function __construct(IronMQ $ironmq, $prefetch = null) |
21
|
|
|
{ |
22
|
10 |
|
parent::__construct($prefetch); |
23
|
|
|
|
24
|
10 |
|
$this->ironmq = $ironmq; |
25
|
10 |
|
} |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* {@inheritdoc} |
29
|
|
|
*/ |
30
|
1 |
|
public function listQueues() |
31
|
|
|
{ |
32
|
1 |
|
$queueNames = []; |
33
|
1 |
|
$page = 0; |
34
|
|
|
|
35
|
1 |
|
while ($queues = $this->ironmq->getQueues($page, 100)) { |
36
|
1 |
|
$queueNames += $this->pluck($queues, 'name'); |
37
|
|
|
|
38
|
|
|
// If we get 100 results the probability of another page is high. |
39
|
1 |
|
if (count($queues) < 100) { |
40
|
1 |
|
break; |
41
|
|
|
} |
42
|
|
|
|
43
|
|
|
++$page; |
44
|
|
|
} |
45
|
|
|
|
46
|
1 |
|
return $queueNames; |
47
|
|
|
} |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* {@inheritdoc} |
51
|
|
|
*/ |
52
|
|
|
public function createQueue($queueName) |
53
|
|
|
{ |
54
|
|
|
} |
55
|
|
|
|
56
|
|
|
/** |
57
|
|
|
* {@inheritdoc} |
58
|
|
|
*/ |
59
|
1 |
|
public function countMessages($queueName) |
60
|
|
|
{ |
61
|
1 |
|
if ($info = $this->ironmq->getQueue($queueName)) { |
62
|
1 |
|
return $info->size; |
63
|
|
|
} |
64
|
1 |
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* {@inheritdoc} |
68
|
|
|
*/ |
69
|
1 |
|
public function pushMessage($queueName, $message) |
70
|
|
|
{ |
71
|
1 |
|
$this->ironmq->postMessage($queueName, $message); |
72
|
1 |
|
} |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* {@inheritdoc} |
76
|
|
|
*/ |
77
|
2 |
|
public function popMessage($queueName, $duration = 5) |
78
|
|
|
{ |
79
|
2 |
|
if ($message = $this->cache->pop($queueName)) { |
80
|
1 |
|
return $message; |
81
|
|
|
} |
82
|
|
|
|
83
|
2 |
|
$timeout = IronMQ::GET_MESSAGE_TIMEOUT; |
84
|
|
|
|
85
|
2 |
|
$messages = $this->ironmq->getMessages($queueName, $this->prefetch, $timeout, $duration); |
|
|
|
|
86
|
|
|
|
87
|
2 |
|
if (!$messages) { |
88
|
1 |
|
return [null, null]; |
89
|
|
|
} |
90
|
|
|
|
91
|
2 |
|
foreach ($messages as $message) { |
92
|
2 |
|
$this->cache->push($queueName, [$message->body, $message->id]); |
93
|
2 |
|
} |
94
|
|
|
|
95
|
2 |
|
return $this->cache->pop($queueName); |
96
|
|
|
} |
97
|
|
|
|
98
|
|
|
/** |
99
|
|
|
* {@inheritdoc} |
100
|
|
|
*/ |
101
|
1 |
|
public function acknowledgeMessage($queueName, $receipt) |
102
|
|
|
{ |
103
|
1 |
|
$this->ironmq->deleteMessage($queueName, $receipt); |
104
|
1 |
|
} |
105
|
|
|
|
106
|
|
|
/** |
107
|
|
|
* IronMQ does not support an offset when peeking messages. |
108
|
|
|
* |
109
|
|
|
* {@inheritdoc} |
110
|
|
|
*/ |
111
|
1 |
|
public function peekQueue($queueName, $index = 0, $limit = 20) |
112
|
|
|
{ |
113
|
1 |
|
if ($messages = $this->ironmq->peekMessages($queueName, $limit)) { |
114
|
1 |
|
return $this->pluck($messages, 'body'); |
115
|
|
|
} |
116
|
|
|
|
117
|
1 |
|
return []; |
118
|
|
|
} |
119
|
|
|
|
120
|
|
|
/** |
121
|
|
|
* {@inheritdoc} |
122
|
|
|
*/ |
123
|
1 |
|
public function removeQueue($queueName) |
124
|
|
|
{ |
125
|
1 |
|
$this->ironmq->deleteQueue($queueName); |
126
|
1 |
|
} |
127
|
|
|
|
128
|
|
|
/** |
129
|
|
|
* {@inheritdoc} |
130
|
|
|
*/ |
131
|
1 |
|
public function info() |
132
|
|
|
{ |
133
|
|
|
return [ |
134
|
1 |
|
'prefetch' => $this->prefetch, |
135
|
1 |
|
]; |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* The missing array_pluck but for objects array. |
140
|
|
|
* |
141
|
|
|
* @param array $objects |
142
|
|
|
* @param string $property |
143
|
|
|
* |
144
|
|
|
* @return array |
145
|
|
|
*/ |
146
|
|
|
protected function pluck(array $objects, $property) |
147
|
|
|
{ |
148
|
2 |
|
$function = function ($object) use ($property) { |
149
|
2 |
|
return $object->$property; |
150
|
2 |
|
}; |
151
|
|
|
|
152
|
2 |
|
return array_map($function, $objects); |
153
|
|
|
} |
154
|
|
|
} |
155
|
|
|
|
This method has been deprecated. The supplier of the class has supplied an explanatory message.
The explanatory message should give you some clue as to whether and when the method will be removed from the class and what other method or class to use instead.