Completed
Pull Request — master (#110)
by
unknown
05:11
created

DoctrineProvider::getOptions()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 3
Bugs 0 Features 1
Metric Value
c 3
b 0
f 1
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
/**
4
 * Copyright Talisman Innovations Ltd. (2016). All rights reserved
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Talisman Innovations Ltd. (2016)
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Doctrine\Common\Cache\Cache;
26
use Monolog\Logger;
27
use Uecode\Bundle\QPushBundle\Message\Message;
28
use Uecode\Bundle\QPushBundle\Entity\DoctrineMessage;
29
30
class DoctrineProvider extends AbstractProvider
31
{
32
33
    protected $em;
34
    protected $repository;
35
    protected static $entityName = 'Uecode\Bundle\QPushBundle\Entity\DoctrineMessage';
36
37
    /**
38
     * Constructor for Provider classes
39
     *
40
     * @param string $name    Name of the Queue the provider is for
41
     * @param array  $options An array of configuration options for the Queue
42
     * @param mixed  $client  A Queue Client for the provider
43
     * @param Cache  $cache   An instance of Doctrine\Common\Cache\Cache
44
     * @param Logger $logger  An instance of Symfony\Bridge\Mongolog\Logger
45
     */
46
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
47
    {
48
        $this->name = $name;
49
        $this->options = $options;
50
        $this->cache = $cache;
51
        $this->logger = $logger;
52
        $this->em = $client;
53
        $this->repository = $this->em->getRepository(self::$entityName);
54
    }
55
56
    /**
57
     * Returns the name of the Queue that this Provider is for
58
     *
59
     * @return string
60
     */
61
    public function getName()
62
    {
63
        return $this->name;
64
    }
65
66
    /**
67
     * Returns the Queue Provider name
68
     *
69
     * @return string
70
     */
71
    public function getProvider()
72
    {
73
        return 'Doctrine';
74
    }
75
76
    /**
77
     * Returns the Provider's Configuration Options
78
     *
79
     * @return array
80
     */
81
    public function getOptions()
82
    {
83
        return $this->options;
84
    }
85
86
    /**
87
     * Returns the Cache service
88
     *
89
     * @return Cache
90
     */
91
    public function getCache()
92
    {
93
        return $this->cache;
94
    }
95
96
    /**
97
     * Returns the Logger service
98
     *
99
     * @return Logger
100
     */
101
    public function getLogger()
102
    {
103
        return $this->logger;
104
    }
105
106
    /**
107
     * Get repository
108
     * 
109
     * @return array
110
     */
111
    public function getRepository()
112
    {
113
        if (!$this->repository) {
114
            return;
115
        }
116
        
117
        return $this->repository;
118
    }
119
120
    /**
121
     * Creates the Queue
122
     * Checks to see if the underlying table has been created or not
123
     * 
124
     * @return bool
125
     */
126
    public function create()
127
    {
128
        $sm = $this->em->getConnection()->getSchemaManager();
129
        $table = $this->em->getClassMetadata(self::$entityName)->getTableName();
130
        
131
        return $sm->tablesExist(array($table));
132
    }
133
134
    /**
135
     * Publishes a message to the Queue
136
     *
137
     * This method should return a string MessageId or Response
138
     *
139
     * @param array $message The message to queue
140
     * @param array $options An array of options that override the queue defaults
141
     *
142
     * @return string
143
     */
144
    public function publish(array $message, array $options = [])
145
    {
146
        if (!$this->em) {
147
            return '';
148
        }
149
150
        $doctrineMessage = new DoctrineMessage();
151
        $doctrineMessage->setQueue($this->name)
152
                ->setDelivered(false)
153
                ->setMessage($message)
154
                ->setLength(strlen(serialize($message)));
155
156
        $this->em->persist($doctrineMessage);
157
        $this->em->flush();
158
159
        return (string) $doctrineMessage->getId();
160
    }
161
162
    /**
163
     * Polls the Queue for Messages
164
     *
165
     * Depending on the Provider, this method may keep the connection open for
166
     * a configurable amount of time, to allow for long polling.  In most cases,
167
     * this method is not meant to be used to long poll indefinitely, but should
168
     * return in reasonable amount of time
169
     *
170
     * @param  array $options An array of options that override the queue defaults
171
     *
172
     * @return array
173
     */
174
    public function receive(array $options = [])
175
    {
176
        if (!$this->em) {
177
            return [];
178
        }
179
180
        $doctrineMessages = $this->repository->findBy(
181
                array('delivered' => false, 'queue' => $this->name),
182
                array('id' => 'ASC')
183
        );
184
185
        $messages = [];
186
        foreach ($doctrineMessages as $doctrineMessage) {
187
            $messages[] = new Message($doctrineMessage->getId(), $doctrineMessage->getMessage(), []);
188
            $doctrineMessage->setDelivered(true);
189
        }
190
        $this->em->flush();
191
192
        return $messages;
193
    }
194
195
    /**
196
     * Deletes the Queue Message
197
     *
198
     * @param mixed $id A message identifier or resource
199
     */
200
    public function delete($id)
201
    {
202
        $doctrineMessage = $this->repository->findById($id);
203
        $doctrineMessage->setDelivered(true);
204
        $this->em->flush();
205
        
206
        return true;
207
    }
208
209
    /**
210
     * Destroys a Queue and clears any Queue related Cache
211
     *
212
     * @return bool
213
     */
214
    public function destroy()
215
    {
216
        $qb = $this->repository->createQueryBuilder('dm');
217
        $qb->delete();
218
        $qb->where('dm.queue = :queue');
219
        $qb->setParameter('queue', $this->name);
220
        $qb->getQuery()->execute();
221
        
222
        return true;
223
    }
224
225
    /**
226
     * Returns a specific message
227
     * 
228
     * @param integer $id
229
     * 
230
     * @return Message
231
     */
232
    public function getById($id)
233
    {
234
        return $this->repository->find($id);
235
    }
236
237
    /*
238
     * Returns a query of the message queue
239
     * 
240
     * @param string $contains
241
     * @param DateTime $from
242
     * @param DateTime $to
243
     * 
244
     * @return Query
245
     */
246
247
    public function findBy($contains = null, $from = null, $to = null)
248
    {
249
250
        $qb = $this->repository->createQueryBuilder('p');
251
        $qb->select('p');
252
        $qb->where('p.queue = :queue');
253
        $qb->setParameter('queue', $this->name);
254
255
        if ($contains !== null) {
256
            $qb->andWhere('p.message LIKE :contains');
257
            $qb->setParameter('contains', '%' . $contains . '%');
258
        }
259
260
        if ($from !== null) {
261
            $qb->andWhere('p.created >= :from');
262
            $qb->setParameter('from', $from);
263
        }
264
265
        if ($to !== null) {
266
            $qb->andWhere('p.created <= :to');
267
            $qb->setParameter('to', $to);
268
        }
269
270
        return $qb->getQuery();
271
    }
272
}
273