Completed
Push — master ( 67a92e...b03a00 )
by Grégoire
01:54
created

src/Entity/MessageManager.php (1 issue)

Upgrade to new PHP Analysis Engine

These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more

1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Sonata Project package.
7
 *
8
 * (c) Thomas Rabaix <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Sonata\NotificationBundle\Entity;
15
16
use Doctrine\ORM\QueryBuilder;
17
use Sonata\CoreBundle\Model\BaseEntityManager;
18
use Sonata\DatagridBundle\Pager\Doctrine\Pager;
19
use Sonata\DatagridBundle\ProxyQuery\Doctrine\ProxyQuery;
20
use Sonata\NotificationBundle\Model\MessageInterface;
21
use Sonata\NotificationBundle\Model\MessageManagerInterface;
22
23
class MessageManager extends BaseEntityManager implements MessageManagerInterface
0 ignored issues
show
Deprecated Code introduced by
The class Sonata\CoreBundle\Model\BaseEntityManager has been deprecated with message: since 3.x, to be removed in 4.0.

This class, trait or interface has been deprecated. The supplier of the file has supplied an explanatory message.

The explanatory message should give you some clue as to whether and when the type will be removed from the class and what other constant to use instead.

Loading history...
24
{
25
    /**
26
     * {@inheritdoc}
27
     */
28
    public function save($message, $andFlush = true): void
29
    {
30
        //Hack for ConsumerHandlerCommand->optimize()
31
        if ($message->getId() && !$this->em->getUnitOfWork()->isInIdentityMap($message)) {
32
            $message = $this->getEntityManager()->getUnitOfWork()->merge($message);
33
        }
34
35
        parent::save($message, $andFlush);
36
    }
37
38
    /**
39
     * {@inheritdoc}
40
     */
41
    public function findByTypes(array $types, $state, $batchSize)
42
    {
43
        $params = [];
44
        $query = $this->prepareStateQuery($state, $types, $batchSize, $params);
45
46
        $query->setParameters($params);
47
48
        return $query->getQuery()->execute();
49
    }
50
51
    /**
52
     * {@inheritdoc}
53
     */
54
    public function findByAttempts(array $types, $state, $batchSize, $maxAttempts = null, $attemptDelay = 10)
55
    {
56
        $params = [];
57
        $query = $this->prepareStateQuery($state, $types, $batchSize, $params);
58
59
        if ($maxAttempts) {
60
            $query
61
                ->andWhere('m.restartCount < :maxAttempts')
62
                ->andWhere('m.updatedAt < :delayDate');
63
64
            $params['maxAttempts'] = $maxAttempts;
65
            $now = new \DateTime();
66
            $params['delayDate'] = $now->add(\DateInterval::createFromDateString(($attemptDelay * -1).' second'));
67
        }
68
69
        $query->setParameters($params);
70
71
        return $query->getQuery()->execute();
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77
    public function countStates()
78
    {
79
        $tableName = $this->getEntityManager()->getClassMetadata($this->class)->table['name'];
80
81
        $stm = $this->getConnection()->query(
82
            sprintf('SELECT state, count(state) as cnt FROM %s GROUP BY state', $tableName)
83
        );
84
85
        $states = [
86
            MessageInterface::STATE_DONE => 0,
87
            MessageInterface::STATE_ERROR => 0,
88
            MessageInterface::STATE_IN_PROGRESS => 0,
89
            MessageInterface::STATE_OPEN => 0,
90
        ];
91
92
        while ($data = $stm->fetch()) {
93
            $states[$data['state']] = $data['cnt'];
94
        }
95
96
        return $states;
97
    }
98
99
    /**
100
     * {@inheritdoc}
101
     */
102
    public function cleanup($maxAge): void
103
    {
104
        $tableName = $this->getEntityManager()->getClassMetadata($this->class)->table['name'];
105
106
        $date = new \DateTime('now');
107
        $date->sub(new \DateInterval(sprintf('PT%sS', $maxAge)));
108
109
        $qb = $this->getRepository()->createQueryBuilder('message')
110
            ->delete()
111
            ->where('message.state = :state')
112
            ->andWhere('message.completedAt < :date')
113
            ->setParameter('state', MessageInterface::STATE_DONE)
114
            ->setParameter('date', $date);
115
116
        $qb->getQuery()->execute();
117
    }
118
119
    /**
120
     * {@inheritdoc}
121
     */
122
    public function cancel(MessageInterface $message, $force = false): void
123
    {
124
        if (($message->isRunning() || $message->isError()) && !$force) {
125
            return;
126
        }
127
128
        $message->setState(MessageInterface::STATE_CANCELLED);
129
130
        $this->save($message);
131
    }
132
133
    /**
134
     * {@inheritdoc}
135
     */
136
    public function restart(MessageInterface $message)
137
    {
138
        if ($message->isOpen() || $message->isRunning() || $message->isCancelled()) {
139
            return;
140
        }
141
142
        $this->cancel($message, true);
143
144
        $newMessage = clone $message;
145
        $newMessage->setRestartCount($message->getRestartCount() + 1);
146
        $newMessage->setType($message->getType());
147
148
        return $newMessage;
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154
    public function getPager(array $criteria, $page, $limit = 10, array $sort = [])
155
    {
156
        $query = $this->getRepository()
157
            ->createQueryBuilder('m')
158
            ->select('m');
159
160
        $fields = $this->getEntityManager()->getClassMetadata($this->class)->getFieldNames();
161
        foreach ($sort as $field => $direction) {
162
            if (!\in_array($field, $fields)) {
163
                throw new \RuntimeException(sprintf("Invalid sort field '%s' in '%s' class", $field, $this->class));
164
            }
165
        }
166
        if (0 == \count($sort)) {
167
            $sort = ['type' => 'ASC'];
168
        }
169
        foreach ($sort as $field => $direction) {
170
            $query->orderBy(sprintf('m.%s', $field), strtoupper($direction));
171
        }
172
173
        $parameters = [];
174
175
        if (isset($criteria['type'])) {
176
            $query->andWhere('m.type = :type');
177
            $parameters['type'] = $criteria['type'];
178
        }
179
180
        if (isset($criteria['state'])) {
181
            $query->andWhere('m.state = :state');
182
            $parameters['state'] = $criteria['state'];
183
        }
184
185
        $query->setParameters($parameters);
186
        $pager = new Pager();
187
188
        $pager->setMaxPerPage($limit);
189
        $pager->setQuery(new ProxyQuery($query));
190
        $pager->setPage($page);
191
        $pager->init();
192
193
        return $pager;
194
    }
195
196
    /**
197
     * @param int   $state
198
     * @param array $types
199
     * @param int   $batchSize
200
     * @param array $parameters
201
     *
202
     * @return QueryBuilder
203
     */
204
    protected function prepareStateQuery($state, $types, $batchSize, &$parameters)
205
    {
206
        $query = $this->getRepository()
207
            ->createQueryBuilder('m')
208
            ->where('m.state = :state')
209
            ->orderBy('m.createdAt');
210
211
        $parameters['state'] = $state;
212
213
        if (\count($types) > 0) {
214
            if (array_key_exists('exclude', $types) || array_key_exists('include', $types)) {
215
                if (array_key_exists('exclude', $types)) {
216
                    $query->andWhere('m.type NOT IN (:exclude)');
217
                    $parameters['exclude'] = $types['exclude'];
218
                }
219
220
                if (array_key_exists('include', $types)) {
221
                    $query->andWhere('m.type IN (:include)');
222
                    $parameters['include'] = $types['include'];
223
                }
224
            } else { // BC
225
                $query->andWhere('m.type IN (:types)');
226
                $parameters['types'] = $types;
227
            }
228
        }
229
230
        $query->setMaxResults($batchSize);
231
232
        return $query;
233
    }
234
}
235