DoctrineWorkerRunner   A
last analyzed

Complexity

Total Complexity 27

Size/Duplication

Total Lines 280
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 27
lcom 1
cbo 9
dl 0
loc 280
ccs 110
cts 110
cp 1
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A run() 0 15 4
A runIteration() 0 18 4
A fetchNext() 0 16 3
A processNext() 0 18 3
A fetchNextCommandData() 0 22 3
A prepareFetchNextSql() 0 10 1
A sleep() 0 4 1
A setProcessed() 0 17 2
A setProcessedFailure() 0 19 1
A prepareOptions() 0 3 1
A checkRunTimes() 0 4 2
1
<?php
2
3
/*
4
 * To change this license header, choose License Headers in Project Properties.
5
 * To change this template file, choose Tools | Templates
6
 * and open the template in the editor.
7
 */
8
9
namespace Gendoria\CommandQueueDoctrineDriverBundle\Worker;
10
11
use DateTime;
12
use Doctrine\DBAL\Connection;
13
use Doctrine\DBAL\DBALException;
14
use Doctrine\DBAL\LockMode;
15
use Exception;
16
use Gendoria\CommandQueue\Worker\WorkerRunnerInterface;
17
use Gendoria\CommandQueueDoctrineDriverBundle\Worker\Exception\FetchException;
18
use Psr\Log\LoggerInterface;
19
use Psr\Log\NullLogger;
20
use Symfony\Component\Console\Output\NullOutput;
21
use Symfony\Component\Console\Output\OutputInterface;
22
23
/**
24
 * Description of DoctrineWorkerRunner
25
 *
26
 * @author Tomasz Struczyński <[email protected]>
27
 */
28
class DoctrineWorkerRunner implements WorkerRunnerInterface
29
{
30
31
    /**
32
     * Number of maximum consecutive failed fetch operations before worker will fail permanently.
33
     * 
34
     * @var integer
35
     */
36
    const MAX_FAILED_FETCHES = 10;
37
    
38
    /**
39
     * Maximum process failures before command is discarded.
40
     * 
41
     * @var integer
42
     */
43
    const MAX_FAILED_PROCESS_ATTEMPTS = 10;
44
    
45
    /**
46
     * Doctrine worker.
47
     * 
48
     * @var DoctrineWorker
49
     */
50
    private $worker;
51
52
    /**
53
     * Database connection.
54
     * 
55
     * @var Connection
56
     */
57
    private $connection;
58
59
    /**
60
     * Table name.
61
     * 
62
     * @var string
63
     */
64
    private $tableName;
65
66
    /**
67
     * Pool.
68
     * 
69
     * @var string
70
     */
71
    private $pool;
72
73
    /**
74
     * Logger.
75
     * 
76
     * @var LoggerInterface
77
     */
78
    private $logger;
79
    
80
    /**
81
     * Number of consecutive failed fetches.
82
     * 
83
     * @var integer
84
     */
85
    private $failedFetches = 0;
86
    
87
    /**
88
     * Current options.
89
     * 
90
     * @var array
91
     */
92
    private $options = array();
93
    
94
    private $optionsProto = array(
95
        'run_times' => null,
96
        'exit_if_empty' => false,
97
        'sleep_intervals' => array(3000000, 6000000),
98
    );
99
100
    /**
101
     * Class constructor.
102
     * 
103
     * @param DoctrineWorker $worker
104
     * @param Connection $connection
105
     * @param string $tableName
106
     * @param string $pool
107
     */
108 5
    public function __construct(DoctrineWorker $worker, Connection $connection, $tableName, $pool, LoggerInterface $logger = null)
109
    {
110 5
        $this->worker = $worker;
111 5
        $this->connection = $connection;
112 5
        $this->tableName = $tableName;
113 5
        $this->pool = $pool;
114 5
        $this->logger = $logger ? $logger : new NullLogger();
115 5
    }
116
117 5
    public function run(array $options, OutputInterface $output = null)
118
    {
119 5
        if (!$output) {
120 1
            $output = new NullOutput();
121 1
        }
122 5
        $this->prepareOptions($options);
123 5
        $output->writeln(sprintf("Worker run with options: %s", print_r($this->options, true), OutputInterface::VERBOSITY_VERBOSE));
124 5
        $this->logger->debug(sprintf("Worker run with options: %s", print_r($this->options, true)));
125
        
126 5
        $this->connection->setTransactionIsolation(Connection::TRANSACTION_SERIALIZABLE);
127
        
128 5
        while ($this->checkRunTimes() && $this->runIteration($output, (bool)$this->options['exit_if_empty'])) {
129 4
            $this->logger->debug('Doctrine worker tick.');
130 4
        }
131 5
    }
132
    
133 5
    private function runIteration(OutputInterface $output, $exitIfEmpty = false)
134
    {
135
        try {
136 5
            $commandData = $this->fetchNext($output);
137 5
            if (is_array($commandData)) {
138 3
                $this->processNext($commandData, $output);
139 5
            } elseif ($exitIfEmpty) {
140 1
                return false;
141
            } else {
142 3
                $output->writeln("No messages to process yet.", OutputInterface::VERBOSITY_DEBUG);
143
                //Sleep to prevent hammering database
144 3
                $this->sleep();
145
            }
146 4
            return true;
147 1
        } catch (Exception $e) {
148 1
            return false;
149
        }
150
    }
151
    
152
    /**
153
     * Fetch next command data to process.
154
     * 
155
     * @param OutputInterface $output
156
     * @return array|boolean
157
     * @throws FetchException Thrown, when fetch failed permanently.
158
     */
159 5
    private function fetchNext(OutputInterface $output)
160
    {
161
        try {
162
            //Get one new row for update
163 5
            return $this->fetchNextCommandData();
164 1
        } catch (FetchException $e) {
165 1
            $this->logger->error('Exception while processing message.', array($e));
166 1
            $output->writeln(sprintf("<error>Error when processing message: %s</error>\n%s\n\n%s", $e->getMessage(), $e->getTraceAsString(), $e->getPrevious()->getTraceAsString()));
167 1
            if ($this->failedFetches > self::MAX_FAILED_FETCHES) {
168 1
                $output->writeln("<error>Too many consequent fetch errors - exiting.</error>");
169 1
                throw $e;
170
            }
171 1
            $this->sleep();
172 1
            return false;
173
        }
174
    }
175
176 3
    private function processNext($commandData, OutputInterface $output)
177
    {
178
        try {
179 3
            $output->writeln("Processing command.", OutputInterface::VERBOSITY_DEBUG);
180 3
            $this->worker->process($commandData);
181 2
            $this->connection->delete($this->tableName, array('id' => $commandData['id']));
182 3
        } catch (\Exception $e) {
183 1
            $this->logger->error('Exception while processing message.', array($e));
184 1
            $output->writeln(sprintf("<error>Error when processing message: %s</error>\n%s", $e->getMessage(), $e->getTraceAsString()));
185 1
            if ($commandData['failed_no'] >= self::MAX_FAILED_PROCESS_ATTEMPTS-1) {
186 1
                $this->logger->error('Command failed too many times, discarding.');
187 1
                $output->writeln(sprintf("<error>Command failed too many times, discarding.</error>"));
188 1
                $this->connection->delete($this->tableName, array('id' => $commandData['id']));
189 1
            } else {
190 1
                $this->setProcessedFailure($commandData['id'], $commandData['failed_no']+1);
191
            }
192
        }
193 3
    }
194
195
    /**
196
     * Fetch next command data.
197
     * 
198
     * @return array|boolean Array with command data, if there are pending commands; false otherwise.
199
     * @throws FetchException Thrown, when fetch resulted in an error (lock impossible).
200
     */
201 5
    private function fetchNextCommandData()
202
    {
203
        try {
204 5
            $this->connection->beginTransaction();
205 5
            $row = $this->connection->fetchAssoc($this->prepareFetchNextSql(), array(0, $this->pool, new DateTime()), array('integer', 'string', 'datetime'));
206
207 5
            if (empty($row)) {
208 3
                $this->connection->commit();
209 3
                return false;
210
            }
211
212 4
            $this->setProcessed($row['id']);
213
            
214 3
            $this->connection->commit();
215 3
            $this->failedFetches = 0;
216 3
            return $row;
217 1
        } catch (Exception $e) {
218 1
            $this->connection->rollBack();
219 1
            $this->failedFetches++;
220 1
            throw new FetchException("Exception while fetching data: ".$e->getMessage(), 500, $e);
221
        }
222
    }
223
    
224 5
    private function prepareFetchNextSql()
225
    {
226 5
        $platform = $this->connection->getDatabasePlatform();
227
        $sqlProto = "SELECT * "
228 5
            . " FROM " . $platform->appendLockHint($this->tableName, LockMode::PESSIMISTIC_WRITE)
229 5
            . " WHERE processed = ? AND pool = ? AND process_after <= ? "
230 5
            . " ORDER BY id ASC"
231 5
        ;
232 5
        return $platform->modifyLimitQuery($sqlProto, 1) . " " . $platform->getWriteLockSQL();
233
    }
234
    
235
    /**
236
     * Sleep to prevent hammering database.
237
     * 
238
     * Sleep interval is somewhat random to minify risk of database collisions.
239
     * 
240
     * @return void
241
     */
242 3
    private function sleep()
243
    {
244 3
        usleep(mt_rand($this->options['sleep_intervals'][0], $this->options['sleep_intervals'][1]));
245 3
    }
246
    
247
    /**
248
     * Set processed status for row.
249
     * 
250
     * @param integer $id
251
     * @return void
252
     * @throws DBALException Thrown, if race condition has been detected or other database error occurred.
253
     */
254 4
    private function setProcessed($id)
255
    {
256
        $parameters = array(
257 4
            'processed' => true,
258 4
            'id' => (int)$id,
259 4
        );
260
        $types = array(
261 4
            'processed' => 'boolean',
262
            'id' => 'smallint'
263 4
        );
264 4
        $updateSql = "UPDATE " . $this->tableName
265 4
            . " SET processed = :processed"
266 4
            . " WHERE id = :id";
267 4
        if ($this->connection->executeUpdate($updateSql, $parameters, $types) !== 1) {
268 1
            throw new DBALException("Race condition detected. Aborting.");
269
        }
270 3
    }
271
    
272
    /**
273
     * Update processed status for row.
274
     * 
275
     * @param integer $id
276
     * @param integer $failedRetries
277
     * @return integer
278
     */
279 1
    private function setProcessedFailure($id, $failedRetries)
280
    {
281
        $parameters = array(
282 1
            'processed' => false,
283 1
            'failed_no' => (int)$failedRetries,
284 1
            'id' => (int)$id,
285 1
            'process_after' => new DateTime('@'.(time()+20*$failedRetries)),
286 1
        );
287
        $types = array(
288 1
            'processed' => 'boolean',
289 1
            'failed_no' => 'integer',
290 1
            'id' => 'smallint',
291 1
            'process_after' => 'datetime',
292 1
        );
293 1
        $updateSql = "UPDATE " . $this->tableName
294 1
            . " SET processed = :processed, failed_no = :failed_no, process_after = :process_after"
295 1
            . " WHERE id = :id";
296 1
        return $this->connection->executeUpdate($updateSql, $parameters, $types);
297
    }
298
    
299 5
    private function prepareOptions($options) {
300 5
        $this->options = array_merge($this->optionsProto, $options);
301 5
    }
302
    
303 5
    private function checkRunTimes()
304
    {
305 5
        return !($this->options['run_times'] !== null && $this->options['run_times']-- == 0);
306
    }
307
}
308