Completed
Pull Request — master (#110)
by
unknown
05:54
created

DoctrineProvider::create()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
cc 1
eloc 1
nc 1
nop 0
crap 2
1
<?php
2
3
/**
4
 * Copyright Talisman Innovations Ltd. (2016). All rights reserved
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Talisman Innovations Ltd. (2016)
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Provider;
24
25
use Doctrine\Common\Cache\Cache;
26
use Monolog\Logger;
27
use Uecode\Bundle\QPushBundle\Message\Message;
28
use Uecode\Bundle\QPushBundle\Entity\DoctrineMessage;
29
30
class DoctrineProvider extends AbstractProvider
31
{
32
33
    protected $em;
34
    protected $repository;
35
36
    /**
37
     * Constructor for Provider classes
38
     *
39
     * @param string $name    Name of the Queue the provider is for
40
     * @param array  $options An array of configuration options for the Queue
41
     * @param mixed  $client  A Queue Client for the provider
42
     * @param Cache  $cache   An instance of Doctrine\Common\Cache\Cache
43
     * @param Logger $logger  An instance of Symfony\Bridge\Mongolog\Logger
44
     */
45
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
46
    {
47
        $this->name = $name;
48
        $this->options = $options;
49
        $this->cache = $cache;
50
        $this->logger = $logger;
51
        $this->em = $client;
52
        $this->repository = $this->em->getRepository('Uecode\Bundle\QPushBundle\Entity\DoctrineMessage');
53
    }
54
55
    /**
56
     * Returns the name of the Queue that this Provider is for
57
     *
58
     * @return string
59
     */
60
    public function getName()
61
    {
62
        return $this->name;
63
    }
64
65
    /**
66
     * Returns the Queue Provider name
67
     *
68
     * @return string
69
     */
70
    public function getProvider()
71
    {
72
        return 'Doctrine';
73
    }
74
75
    /**
76
     * Returns the Provider's Configuration Options
77
     *
78
     * @return array
79
     */
80
    public function getOptions()
81
    {
82
        return $this->options;
83
    }
84
85
    /**
86
     * Returns the Cache service
87
     *
88
     * @return Cache
89
     */
90
    public function getCache()
91
    {
92
        return $this->cache;
93
    }
94
95
    /**
96
     * Returns the Logger service
97
     *
98
     * @return Logger
99
     */
100
    public function getLogger()
101
    {
102
        return $this->logger;
103
    }
104
105
    /**
106
     * Get repository
107
     * 
108
     * @return array
109
     */
110
    public function getRepository()
111
    {
112
        if (!$this->repository) {
113
            return;
114
        }
115
        return $this->repository;
116
    }
117
118
    /**
119
     * Creates the Queue
120
     *
121
     * All Create methods are idempotent, if the resource exists, the current ARN
122
     * will be returned
123
     */
124
    public function create()
125
    {
126
        
127
    }
128
129
    /**
130
     * Publishes a message to the Queue
131
     *
132
     * This method should return a string MessageId or Response
133
     *
134
     * @param array $message The message to queue
135
     * @param  array $options An array of options that override the queue defaults
136
     *
137
     * @return string
138
     */
139
    public function publish(array $message, array $options = [])
140
    {
141
        if (!$this->em) {
142
            return '';
143
        }
144
145
        $doctrineMessage = new DoctrineMessage();
146
        $doctrineMessage->setQueue($this->name)
147
                ->setDelivered(false)
148
                ->setMessage($message)
149
                ->setLength(strlen(serialize($message)));
150
151
        $this->em->persist($doctrineMessage);
152
        $this->em->flush();
153
154
        return (string) $doctrineMessage->getId();
155
    }
156
157
    /**
158
     * Polls the Queue for Messages
159
     *
160
     * Depending on the Provider, this method may keep the connection open for
161
     * a configurable amount of time, to allow for long polling.  In most cases,
162
     * this method is not meant to be used to long poll indefinitely, but should
163
     * return in reasonable amount of time
164
     *
165
     * @param  array $options An array of options that override the queue defaults
166
     *
167
     * @return array
168
     */
169
    public function receive(array $options = [])
170
    {
171
        if (!$this->em) {
172
            return [];
173
        }
174
175
        $doctrineMessages = $this->repository->findBy(
176
                [
177
            'delivered' => false,
178
            'queue' => $this->name
179
                ], [
180
            'id' => 'ASC'
181
                ]
182
        );
183
184
        $messages = [];
185
        foreach ($doctrineMessages as $doctrineMessage) {
186
            $messages[] = new Message($doctrineMessage->getId(), $doctrineMessage->getMessage(), []);
187
            $doctrineMessage->setDelivered(true);
188
        }
189
        $this->em->flush();
190
191
        return $messages;
192
    }
193
194
    /**
195
     * Deletes the Queue Message
196
     *
197
     * @param mixed $id A message identifier or resource
198
     */
199
    public function delete($id)
200
    {
201
        $doctrineMessage = $this->repository->findById($id);
202
        $doctrineMessage->setDelivered(true);
203
        $this->em->flush();
204
    }
205
206
    /**
207
     * Destroys a Queue and clears any Queue related Cache
208
     *
209
     * @return bool
210
     */
211
    public function destroy()
212
    {
213
        $qb = $this->repository->createQueryBuilder('dm');
214
        $qb->delete();
215
        $qb->where('dm.queue = :queue');
216
        $qb->setParameter('queue', $this->name);
217
        $qb->getQuery()->getResult();
218
    }
219
220
    /**
221
     * Returns a specific message
222
     * 
223
     * @param integer $id
224
     * 
225
     * @return Message
226
     */
227
    public function getById($id)
228
    {
229
        return $this->repository->find($id);
230
    }
231
232
    /*
233
     * Returns a query of the message queue
234
     * 
235
     * @param string $contains
236
     * @param DateTime $from
237
     * @param DateTime $to
238
     * 
239
     * @return Query
240
     */
241
     
242
     public function findBy($contains = null, $from = null, $to = null)
243
     {
244
        
245
        $qb = $this->repository->createQueryBuilder('p');
246
        $qb->select('p');
247
        $qb->where('p.queue = :queue');
248
        $qb->setParameter('queue', $this->name);
249
250
        if ($contains !== null) {
251
            $qb->andWhere('p.message LIKE :contains');
252
            $qb->setParameter('contains', '%'.$contains.'%');
253
        }
254
        
255
        if ($from !== null) {
256
            $qb->andWhere('p.created >= :from');
257
            $qb->setParameter('from', $from);
258
        }
259
        
260
        if ($to !== null) {
261
            $qb->andWhere('p.created <= :to');
262
            $qb->setParameter('to', $to);
263
        }
264
265
        return $qb->getQuery();
266
    }
267
}
268