1 | <?php |
||
13 | class DoctrineDriver implements \Bernard\Driver |
||
14 | { |
||
15 | protected $connection; |
||
16 | |||
17 | /** |
||
18 | * {@inheritdoc} |
||
19 | */ |
||
20 | public function __construct(Connection $connection) |
||
24 | |||
25 | /** |
||
26 | * {@inheritdoc} |
||
27 | */ |
||
28 | public function listQueues() |
||
35 | |||
36 | /** |
||
37 | * {@inheritDoc} |
||
38 | */ |
||
39 | public function createQueue($queueName) |
||
40 | { |
||
41 | try { |
||
42 | $this->connection->transactional(function () use ($queueName) { |
||
43 | $queueExistsQb = $this->connection->createQueryBuilder(); |
||
44 | |||
45 | $queueExists = $queueExistsQb |
||
46 | ->select('name') |
||
47 | ->from('bernard_queues', 'bq') |
||
48 | ->where($queueExistsQb->expr()->eq('bq.name', ':name')) |
||
49 | ->setParameter('name', $queueName) |
||
50 | ->execute(); |
||
51 | |||
52 | if ($queueExists->fetch()) { |
||
53 | // queue was already created |
||
54 | return; |
||
55 | } |
||
56 | |||
57 | $this->connection->insert('bernard_queues', array('name' => $queueName)); |
||
58 | }); |
||
59 | } catch (ConstraintViolationException $ignored) { |
||
60 | // Because SQL server does not support a portable INSERT ON IGNORE syntax |
||
61 | // this ignores error based on primary key. |
||
62 | } |
||
63 | } |
||
64 | |||
65 | /** |
||
66 | * {@inheritdoc} |
||
67 | */ |
||
68 | public function countMessages($queueName) |
||
77 | |||
78 | /** |
||
79 | * {@inheritdoc} |
||
80 | */ |
||
81 | public function pushMessage($queueName, $message) |
||
93 | |||
94 | /** |
||
95 | * {@inheritdoc} |
||
96 | */ |
||
97 | public function popMessage($queueName, $duration = 5) |
||
120 | /** |
||
121 | * {@inheritdoc} |
||
122 | */ |
||
123 | public function acknowledgeMessage($queueName, $receipt) |
||
127 | |||
128 | /** |
||
129 | * {@inheritdoc} |
||
130 | */ |
||
131 | public function peekQueue($queueName, $index = 0, $limit = 20) |
||
144 | |||
145 | /** |
||
146 | * {@inheritdoc} |
||
147 | */ |
||
148 | public function removeQueue($queueName) |
||
153 | |||
154 | /** |
||
155 | * {@inheritdoc} |
||
156 | */ |
||
157 | public function info() |
||
165 | |||
166 | /** |
||
167 | * Execute the actual query and process the response |
||
168 | * |
||
169 | * @param string $queueName |
||
170 | * |
||
171 | * @return array|null |
||
172 | */ |
||
173 | protected function doPopMessage($queueName) |
||
190 | } |
||
191 |