DoctrineProvider   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 244
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 11

Importance

Changes 6
Bugs 3 Features 0
Metric Value
wmc 21
c 6
b 3
f 0
lcom 1
cbo 11
dl 0
loc 244
rs 10

12 Methods

Rating   Name   Duplication   Size   Complexity  
A setFactory() 0 4 1
A setEventDispatcher() 0 4 1
A getProvider() 0 4 1
A __construct() 0 14 1
A setRepositoryForClass() 0 5 1
A create() 0 5 1
B publish() 0 24 3
B receive() 0 45 5
A delete() 0 10 2
B destroy() 0 25 3
A onKernelTerminate() 0 6 1
A isCommandLineInterface() 0 4 1
1
<?php
2
3
namespace DoS\QueueBundle\Provider;
4
5
use Doctrine\Common\Cache\Cache;
6
use Doctrine\ORM\EntityManager;
7
use DoS\QueueBundle\Model\QueueMessageInterface;
8
use DoS\ResourceBundle\Doctrine\ORM\EntityRepository;
9
use DoS\ResourceBundle\Factory\FactoryInterface;
10
use Symfony\Bridge\Monolog\Logger;
11
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
12
use Uecode\Bundle\QPushBundle\Event\Events;
13
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
14
use Uecode\Bundle\QPushBundle\Message\Message;
15
use Uecode\Bundle\QPushBundle\Provider\AbstractProvider;
16
17
class DoctrineProvider extends AbstractProvider
18
{
19
    /**
20
     * @var array
21
     */
22
    protected $queues = array();
23
24
    /**
25
     * @var EntityManager
26
     */
27
    protected $dispatcher;
28
29
    /**
30
     * @var EventDispatcherInterface
31
     */
32
    protected $eventDispatcher;
33
34
    /**
35
     * @var EntityRepository
36
     */
37
    protected $repisotory;
38
39
    /**
40
     * @var FactoryInterface
41
     */
42
    protected $factory;
43
44
    /**
45
     * @var string
46
     */
47
    protected $dataClass;
48
49
    /**
50
     * @var array
51
     */
52
    protected $queueBuffers = [];
53
54
    /**
55
     * @var array
56
     */
57
    protected $queueOptions = array();
58
59
    /**
60
     * @var bool
61
     */
62
    protected $postponeOnCli = true;
63
64
    public function __construct($name, array $options, $client, Cache $cache, Logger $logger)
65
    {
66
        $options = array_merge(array(
67
            'messages_to_receive' => 1,
68
            'fifo_receive' => true,
69
            'logging_enabled' => true,
70
        ), $options);
71
72
        $this->name = $name;
73
        $this->options = $options;
74
        $this->dispatcher = $client;
75
        $this->cache = $cache;
76
        $this->logger = $logger;
77
    }
78
79
    /**
80
     * @param string $dataClass
81
     */
82
    public function setRepositoryForClass($dataClass)
83
    {
84
        $this->dataClass = $dataClass;
85
        $this->repisotory = $this->dispatcher->getRepository($this->dataClass);
86
    }
87
88
    /**
89
     * @param FactoryInterface $factory
90
     */
91
    public function setFactory(FactoryInterface $factory)
92
    {
93
        $this->factory = $factory;
94
    }
95
96
    /**
97
     * @return QueueMessageInterface
98
     */
99
    public function create()
100
    {
101
        $this->log(200, "Queue has been created.");
102
        return $this->factory->createNew();
103
    }
104
105
    /**
106
     * @param EventDispatcherInterface $eventDispatcher
107
     */
108
    public function setEventDispatcher(EventDispatcherInterface $eventDispatcher)
109
    {
110
        $this->eventDispatcher = $eventDispatcher;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116
    public function getProvider()
117
    {
118
        return 'DoctrineORM';
119
    }
120
121
    /**
122
     * {@inheritdoc}
123
     */
124
    public function publish(array $message, array $options = [])
125
    {
126
        $publishStart = microtime(true);
127
        $queue = $this->create();
128
        $queue->setBody($message);
129
        $queue->setName($this->getNameWithPrefix());
130
        $this->dispatcher->persist($queue);
131
        $this->dispatcher->flush($queue);
132
133
        $this->queueBuffers[$queue->getId()] = $queue;
134
135
        $context = array(
136
            'message_id' => $queue->getId(),
137
            'publish_time' => microtime(true) - $publishStart
138
        );
139
140
        $this->log(200, "Message has been published.", $context);
141
142
        // if the message is generated from the cli the message is handled
143
        // directly as there is no kernel.terminate in cli
144
        if ($this->postponeOnCli && $this->isCommandLineInterface()) {
145
            $this->receive(array('messages_to_receive' => 1));
146
        }
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152
    public function receive(array $options = [])
153
    {
154
        $this->options = array_merge($this->options, $options);
155
156
        if (!empty($options['queues'])) {
157
            $results = $options['queues'];
158
        } else {
159
            $results = $this->repisotory->createQueryBuilder('o')
160
                ->orderBy('o.id', $this->options['fifo_receive'] ? 'ASC' : 'DESC')
161
                ->where('o.name = :name')->setParameter('name', $this->getNameWithPrefix())
162
                ->setMaxResults($this->options['messages_to_receive'])
163
                ->getQuery()->getResult()
164
            ;
165
        }
166
167
        if (!count($results)) {
168
            $this->log(200, "No messages found in queue.");
169
170
            return array();
171
        }
172
173
        $messages = array();
174
175
        /** @var QueueMessageInterface $message */
176
        foreach($results as $message)
177
        {
178
            $message->setReceivedAt(new \DateTime());
179
180
            $messages[] = new Message($message->getId(), $message->getBody(), array());
181
182
            $this->log(200, "Message has been received.", ['message_id' => $message->getId()]);
183
184
            $this->eventDispatcher->dispatch(
185
                Events::Message($this->name),
186
                new MessageEvent($this->name, new Message($message->getId(), $message->getBody(), array()))
187
            );
188
189
            // recived then delete.
190
            $this->dispatcher->remove($message);
191
        }
192
193
        $this->dispatcher->flush();
194
195
        return $messages;
196
    }
197
198
    /**
199
     * {@inheritdoc}
200
     */
201
    public function delete($id)
202
    {
203
        if ($message = $this->repisotory->find($id)) {
204
            $this->dispatcher->remove($message);
205
            $this->dispatcher->flush($message);
206
            $this->log(200, "Message deleted.", ['message_id' => $id]);
207
        } else {
208
            $this->log(400, "Queue did not exist", ['message_id' => $id]);
209
        }
210
    }
211
212
    /**
213
     * {@inheritdoc}
214
     */
215
    public function destroy()
216
    {
217
        // Catch `queue not found` exceptions, throw the rest.
218
        try {
219
            $this->repisotory->createQueryBuilder('o')
220
                ->select(null)
221
                ->delete('o')
222
                ->where('o.name = :name')
223
                ->setParameter('name', $this->getNameWithPrefix())
224
            ;
225
        } catch ( \Exception $e) {
226
            if (false !== strpos($e->getMessage(), "Queue not found")) {
227
                $this->log(400, "Queue did not exist");
228
            } else {
229
                throw $e;
230
            }
231
        }
232
233
        $key = $this->getNameWithPrefix();
234
        $this->cache->delete($key);
235
236
        $this->log(200, "Queue has been destroyed.");
237
238
        return true;
239
    }
240
241
    /**
242
     * Receive message.
243
     */
244
    public function onKernelTerminate()
245
    {
246
        $queues = $this->queueBuffers;
247
        $this->receive(array('queues' => $queues));
248
        $this->queueBuffers = [];
249
    }
250
251
    /**
252
     * Check whether this Backend is run on the CLI.
253
     *
254
     * @return bool
255
     */
256
    protected function isCommandLineInterface()
257
    {
258
        return 'cli' === PHP_SAPI;
259
    }
260
}
261