1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Uecode\Bundle\QPushBundle\Provider; |
4
|
|
|
|
5
|
|
|
use Doctrine\Common\Cache\Cache; |
6
|
|
|
use Monolog\Logger; |
7
|
|
|
use Uecode\Bundle\QPushBundle\Message\Message; |
8
|
|
|
use Uecode\Bundle\QPushBundle\Entity\DoctrineMessage; |
9
|
|
|
|
10
|
|
|
class DoctrineProvider extends AbstractProvider |
11
|
|
|
{ |
12
|
|
|
|
13
|
|
|
protected $em; |
14
|
|
|
protected $repository; |
15
|
|
|
|
16
|
|
|
/** |
17
|
|
|
* Constructor for Provider classes |
18
|
|
|
* |
19
|
|
|
* @param string $name Name of the Queue the provider is for |
20
|
|
|
* @param array $options An array of configuration options for the Queue |
21
|
|
|
* @param mixed $client A Queue Client for the provider |
22
|
|
|
* @param Cache $cache An instance of Doctrine\Common\Cache\Cache |
23
|
|
|
* @param Logger $logger An instance of Symfony\Bridge\Mongolog\Logger |
24
|
|
|
*/ |
25
|
|
|
public function __construct($name, array $options, $client, Cache $cache, Logger $logger) |
26
|
|
|
{ |
27
|
|
|
$this->name = $name; |
28
|
|
|
$this->options = $options; |
29
|
|
|
$this->cache = $cache; |
30
|
|
|
$this->logger = $logger; |
31
|
|
|
$this->em = $client; |
32
|
|
|
$this->repository = $this->em->getRepository('Uecode\Bundle\QPushBundle\Entity\DoctrineMessage'); |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Returns the name of the Queue that this Provider is for |
37
|
|
|
* |
38
|
|
|
* @return string |
39
|
|
|
*/ |
40
|
|
|
public function getName() |
41
|
|
|
{ |
42
|
|
|
return $this->name; |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* Returns the Queue Name prefixed with the QPush Prefix |
47
|
|
|
* |
48
|
|
|
* If a Queue name is explicitly set in the configuration, use just that |
49
|
|
|
* name - which is beneficial for reuising existing queues not created by |
50
|
|
|
* qpush. Otherwise, create the queue with the qpush prefix/ |
51
|
|
|
* |
52
|
|
|
* @return string |
53
|
|
|
*/ |
54
|
|
View Code Duplication |
public function getNameWithPrefix() |
|
|
|
|
55
|
|
|
{ |
56
|
|
|
if (!empty($this->options['queue_name'])) { |
57
|
|
|
return $this->options['queue_name']; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
return sprintf("%s_%s", self::QPUSH_PREFIX, $this->name); |
61
|
|
|
} |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* Returns the Queue Provider name |
65
|
|
|
* |
66
|
|
|
* @return string |
67
|
|
|
*/ |
68
|
|
|
public function getProvider() |
69
|
|
|
{ |
70
|
|
|
return 'Doctrine'; |
71
|
|
|
} |
72
|
|
|
|
73
|
|
|
/** |
74
|
|
|
* Returns the Provider's Configuration Options |
75
|
|
|
* |
76
|
|
|
* @return array |
77
|
|
|
*/ |
78
|
|
|
public function getOptions() |
79
|
|
|
{ |
80
|
|
|
return $this->options; |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
/** |
84
|
|
|
* Returns the Cache service |
85
|
|
|
* |
86
|
|
|
* @return Cache |
87
|
|
|
*/ |
88
|
|
|
public function getCache() |
89
|
|
|
{ |
90
|
|
|
return $this->cache; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* Returns the Logger service |
95
|
|
|
* |
96
|
|
|
* @return Logger |
97
|
|
|
*/ |
98
|
|
|
public function getLogger() |
99
|
|
|
{ |
100
|
|
|
return $this->logger; |
101
|
|
|
} |
102
|
|
|
|
103
|
|
|
/** |
104
|
|
|
* Get repository |
105
|
|
|
* |
106
|
|
|
* @return array |
107
|
|
|
*/ |
108
|
|
|
public function getRepository() |
109
|
|
|
{ |
110
|
|
|
if (!$this->repository) { |
111
|
|
|
return; |
112
|
|
|
} |
113
|
|
|
return $this->repository; |
114
|
|
|
} |
115
|
|
|
|
116
|
|
|
/** |
117
|
|
|
* Creates the Queue |
118
|
|
|
* |
119
|
|
|
* All Create methods are idempotent, if the resource exists, the current ARN |
120
|
|
|
* will be returned |
121
|
|
|
*/ |
122
|
|
|
public function create() |
123
|
|
|
{ |
124
|
|
|
|
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* Publishes a message to the Queue |
129
|
|
|
* |
130
|
|
|
* This method should return a string MessageId or Response |
131
|
|
|
* |
132
|
|
|
* @param array $message The message to queue |
133
|
|
|
* @param array $options An array of options that override the queue defaults |
134
|
|
|
* |
135
|
|
|
* @return string |
136
|
|
|
*/ |
137
|
|
|
public function publish(array $message, array $options = []) |
138
|
|
|
{ |
139
|
|
|
if (!$this->em) { |
140
|
|
|
return ''; |
141
|
|
|
} |
142
|
|
|
|
143
|
|
|
$doctrineMessage = new DoctrineMessage(); |
144
|
|
|
$doctrineMessage->setQueue($this->name) |
145
|
|
|
->setDelivered(false) |
146
|
|
|
->setMessage($message) |
147
|
|
|
->setLength(strlen(serialize($message))); |
148
|
|
|
|
149
|
|
|
$this->em->persist($doctrineMessage); |
150
|
|
|
$this->em->flush(); |
151
|
|
|
|
152
|
|
|
return (string) $doctrineMessage->getId(); |
153
|
|
|
} |
154
|
|
|
|
155
|
|
|
/** |
156
|
|
|
* Polls the Queue for Messages |
157
|
|
|
* |
158
|
|
|
* Depending on the Provider, this method may keep the connection open for |
159
|
|
|
* a configurable amount of time, to allow for long polling. In most cases, |
160
|
|
|
* this method is not meant to be used to long poll indefinitely, but should |
161
|
|
|
* return in reasonable amount of time |
162
|
|
|
* |
163
|
|
|
* @param array $options An array of options that override the queue defaults |
164
|
|
|
* |
165
|
|
|
* @return array |
166
|
|
|
*/ |
167
|
|
|
public function receive(array $options = []) |
168
|
|
|
{ |
169
|
|
|
if (!$this->em) { |
170
|
|
|
return []; |
171
|
|
|
} |
172
|
|
|
|
173
|
|
|
$doctrineMessages = $this->repository->findBy( |
174
|
|
|
[ |
175
|
|
|
'delivered' => false, |
176
|
|
|
'queue' => $this->name |
177
|
|
|
]); |
178
|
|
|
|
179
|
|
|
$messages = []; |
180
|
|
|
foreach ($doctrineMessages as $doctrineMessage) { |
181
|
|
|
$messages[] = new Message($doctrineMessage->getId(), $doctrineMessage->getMessage(), []); |
182
|
|
|
$doctrineMessage->setDelivered(true); |
183
|
|
|
} |
184
|
|
|
$this->em->flush(); |
185
|
|
|
|
186
|
|
|
return $messages; |
187
|
|
|
} |
188
|
|
|
|
189
|
|
|
/** |
190
|
|
|
* Deletes the Queue Message |
191
|
|
|
* |
192
|
|
|
* @param mixed $id A message identifier or resource |
193
|
|
|
*/ |
194
|
|
|
public function delete($id) |
195
|
|
|
{ |
196
|
|
|
$doctrineMessage = $this->repository->findById($id); |
197
|
|
|
$doctrineMessage->setDelivered(true); |
198
|
|
|
$this->em->flush(); |
199
|
|
|
} |
200
|
|
|
|
201
|
|
|
/** |
202
|
|
|
* Destroys a Queue and clears any Queue related Cache |
203
|
|
|
* |
204
|
|
|
* @return bool |
205
|
|
|
*/ |
206
|
|
|
public function destroy() |
207
|
|
|
{ |
208
|
|
|
$qb = $this->repository->createQueryBuilder('dm'); |
209
|
|
|
$qb->delete(); |
210
|
|
|
$qb->where('dm.queue = :queue'); |
211
|
|
|
$qb->setParameter('queue', $this->name); |
212
|
|
|
$qb->getQuery()->getResult(); |
213
|
|
|
} |
214
|
|
|
|
215
|
|
|
} |
216
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.