Completed
Push — master ( 3f24d7...0040e8 )
by Tomasz
06:15
created

DoctrineWorkerRunner::prepareOptions()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 3
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
/*
4
 * To change this license header, choose License Headers in Project Properties.
5
 * To change this template file, choose Tools | Templates
6
 * and open the template in the editor.
7
 */
8
9
namespace Gendoria\CommandQueueDoctrineDriverBundle\Worker;
10
11
use DateTime;
12
use Doctrine\DBAL\Connection;
13
use Doctrine\DBAL\DBALException;
14
use Doctrine\DBAL\LockMode;
15
use Exception;
16
use Gendoria\CommandQueue\Worker\WorkerRunnerInterface;
17
use Gendoria\CommandQueueDoctrineDriverBundle\Worker\Exception\FetchException;
18
use Psr\Log\LoggerInterface;
19
use Psr\Log\NullLogger;
20
use Symfony\Component\Console\Output\NullOutput;
21
use Symfony\Component\Console\Output\OutputInterface;
22
23
/**
24
 * Description of DoctrineWorkerRunner
25
 *
26
 * @author Tomasz Struczyński <[email protected]>
27
 */
28
class DoctrineWorkerRunner implements WorkerRunnerInterface
29
{
30
31
    /**
32
     * Number of maximum consecutive failed fetch operations before worker will fail permanently.
33
     * 
34
     * @var integer
35
     */
36
    const MAX_FAILED_FETCHES = 10;
37
    
38
    /**
39
     * Maximum process failures before command is discarded.
40
     * 
41
     * @var integer
42
     */
43
    const MAX_FAILED_PROCESS_ATTEMPTS = 10;
44
    
45
    /**
46
     * Doctrine worker.
47
     * 
48
     * @var DoctrineWorker
49
     */
50
    private $worker;
51
52
    /**
53
     * Database connection.
54
     * 
55
     * @var Connection
56
     */
57
    private $connection;
58
59
    /**
60
     * Table name.
61
     * 
62
     * @var string
63
     */
64
    private $tableName;
65
66
    /**
67
     * Pool.
68
     * 
69
     * @var string
70
     */
71
    private $pool;
72
73
    /**
74
     * Logger.
75
     * 
76
     * @var LoggerInterface
77
     */
78
    private $logger;
79
    
80
    /**
81
     * Number of consecutive failed fetches.
82
     * 
83
     * @var integer
84
     */
85
    private $failedFetches = 0;
86
    
87
    /**
88
     * Current options.
89
     * 
90
     * @var array
91
     */
92
    private $options = array();
93
    
94
    private $optionsProto = array(
95
        'run_times' => null,
96
        'exit_if_empty' => false,
97
        'sleep_intervals' => array(3000000, 6000000),
98
    );
99
100
    /**
101
     * Class constructor.
102
     * 
103
     * @param DoctrineWorker $worker
104
     * @param Connection $connection
105
     * @param string $tableName
106
     * @param string $pool
107
     */
108 5
    public function __construct(DoctrineWorker $worker, Connection $connection, $tableName, $pool, LoggerInterface $logger = null)
109
    {
110 5
        $this->worker = $worker;
111 5
        $this->connection = $connection;
112 5
        $this->tableName = $tableName;
113 5
        $this->pool = $pool;
114 5
        $this->logger = $logger ? $logger : new NullLogger();
115 5
    }
116
117 5
    public function run(array $options, OutputInterface $output = null)
118
    {
119 5
        if (!$output) {
120 1
            $output = new NullOutput();
121 1
        }
122 5
        $this->prepareOptions($options);
123 5
        $output->writeln(sprintf("Worker run with options: %s", print_r($this->options, true), OutputInterface::VERBOSITY_VERBOSE));
124 5
        $this->logger->debug(sprintf("Worker run with options: %s", print_r($this->options, true)));
125
        
126 5
        $this->connection->setTransactionIsolation(Connection::TRANSACTION_SERIALIZABLE);
127
        
128 5
        while ($this->checkRunTimes() && $this->runIteration($output, (bool)$this->options['exit_if_empty'])) {
129 4
            $this->logger->debug('Doctrine worker tick.');
130 4
        }
131 5
    }
132
    
133 5
    private function runIteration(OutputInterface $output, $exitIfEmpty = false)
134
    {
135
        try {
136 5
            $commandData = $this->fetchNext($output);
137 5
            if (is_array($commandData)) {
138 3
                $this->processNext($commandData, $output);
139 5
            } elseif ($exitIfEmpty) {
140 1
                return false;
141
            } else {
142 3
                $output->writeln("No messages to process yet.", OutputInterface::VERBOSITY_DEBUG);
143
                //Sleep to prevent hammering database
144 3
                $this->sleep();
145
            }
146 4
            return true;
147 1
        } catch (Exception $e) {
148 1
            return false;
149
        }
150
    }
151
    
152
    /**
153
     * Fetch next command data to process.
154
     * 
155
     * @param OutputInterface $output
156
     * @return array|boolean
157
     * @throws FetchException Thrown, when fetch failed permanently.
158
     */
159 5
    private function fetchNext(OutputInterface $output)
160
    {
161
        try {
162
            //Get one new row for update
163 5
            return $this->fetchNextCommandData();
164 1
        } catch (FetchException $e) {
165 1
            $this->logger->error('Exception while processing message.', array($e));
166 1
            $output->writeln(sprintf("<error>Error when processing message: %s</error>\n%s\n\n%s", $e->getMessage(), $e->getTraceAsString(), $e->getPrevious()->getTraceAsString()));
167 1
            if ($this->failedFetches > self::MAX_FAILED_FETCHES) {
168 1
                $output->writeln("<error>Too many consequent fetch errors - exiting.</error>");
169 1
                throw $e;
170
            }
171 1
            $this->sleep();
172 1
            return false;
173
        }
174
    }
175
176 3
    private function processNext($commandData, OutputInterface $output)
177
    {
178
        try {
179 3
            $output->writeln("Processing command.", OutputInterface::VERBOSITY_DEBUG);
180 3
            $this->worker->process($commandData);
181 2
            $this->connection->delete($this->tableName, array('id' => $commandData['id']));
182 3
        } catch (\Exception $e) {
183 1
            $this->logger->error('Exception while processing message.', array($e));
184 1
            $output->writeln(sprintf("<error>Error when processing message: %s</error>\n%s", $e->getMessage(), $e->getTraceAsString()));
185 1
            if ($commandData['failed_no'] >= self::MAX_FAILED_PROCESS_ATTEMPTS-1) {
186 1
                $this->logger->error('Command failed too many times, discarding.');
187 1
                $output->writeln(sprintf("<error>Command failed too many times, discarding.</error>"));
188 1
                $this->connection->delete($this->tableName, array('id' => $commandData['id']));
189 1
            } else {
190 1
                $this->setProcessedFailure($commandData['id'], $commandData['failed_no']+1);
191
            }
192
        }
193 3
    }
194
195
    /**
196
     * Fetch next command data.
197
     * 
198
     * @return array|boolean Array with command data, if there are pending commands; false otherwise.
199
     * @throws FetchException Thrown, when fetch resulted in an error (lock impossible).
200
     */
201 5
    private function fetchNextCommandData()
202
    {
203 5
        $platform = $this->connection->getDatabasePlatform();
204
        try {
205 5
            $this->connection->beginTransaction();
206
            $sqlProto = "SELECT * "
207 5
                . " FROM " . $platform->appendLockHint($this->tableName, LockMode::PESSIMISTIC_WRITE)
208 5
                . " WHERE processed = ? AND pool = ? AND process_after <= ? "
209 5
                . " ORDER BY id ASC"
210 5
            ;
211 5
            $sql = $platform->modifyLimitQuery($sqlProto, 1) . " " . $platform->getWriteLockSQL();
212 5
            $row = $this->connection->fetchAssoc($sql, array(0, $this->pool, new DateTime()), array('integer', 'string', 'datetime'));
213
214 5
            if (empty($row)) {
215 3
                $this->connection->commit();
216 3
                return $row;
217
            }
218
219 4
            $rows = $this->setProcessed($row['id']);
220
221 4
            if ($rows !== 1) {
222 1
                throw new DBALException("Race condition detected. Aborting.");
223
            }
224 3
            $this->connection->commit();
225 3
            $this->failedFetches = 0;
226 3
            return $row;
227 1
        } catch (Exception $e) {
228 1
            $this->connection->rollBack();
229 1
            $this->failedFetches++;
230 1
            throw new FetchException("Exception while fetching data: ".$e->getMessage(), 500, $e);
231
        }
232
    }
233
    
234
    /**
235
     * Sleep to prevent hammering database.
236
     * 
237
     * Sleep interval is somewhat random to minify risk of database collisions.
238
     * 
239
     * @return void
240
     */
241 3
    private function sleep()
242
    {
243 3
        usleep(mt_rand($this->options['sleep_intervals'][0], $this->options['sleep_intervals'][1]));
244 3
    }
245
    
246
    /**
247
     * Set processed status for row.
248
     * 
249
     * @param integer $id
250
     * @return integer
251
     */
252 4
    private function setProcessed($id)
253
    {
254
        $parameters = array(
255 4
            'processed' => true,
256 4
            'id' => (int)$id,
257 4
        );
258
        $types = array(
259 4
            'processed' => 'boolean',
260
            'id' => 'smallint'
261 4
        );
262 4
        $updateSql = "UPDATE " . $this->tableName
263 4
            . " SET processed = :processed"
264 4
            . " WHERE id = :id";
265 4
        return $this->connection->executeUpdate($updateSql, $parameters, $types);
266
    }
267
    
268
    /**
269
     * Update processed status for row.
270
     * 
271
     * @param integer $id
272
     * @param integer $failedRetries
273
     * @return integer
274
     */
275 1
    private function setProcessedFailure($id, $failedRetries)
276
    {
277
        $parameters = array(
278 1
            'processed' => false,
279 1
            'failed_no' => (int)$failedRetries,
280 1
            'id' => (int)$id,
281 1
            'process_after' => new DateTime('@'.(time()+20*$failedRetries)),
282 1
        );
283
        $types = array(
284 1
            'processed' => 'boolean',
285 1
            'failed_no' => 'integer',
286 1
            'id' => 'smallint',
287 1
            'process_after' => 'datetime',
288 1
        );
289 1
        $updateSql = "UPDATE " . $this->tableName
290 1
            . " SET processed = :processed, failed_no = :failed_no, process_after = :process_after"
291 1
            . " WHERE id = :id";
292 1
        return $this->connection->executeUpdate($updateSql, $parameters, $types);
293
    }
294
    
295 5
    private function prepareOptions($options) {
296 5
        $this->options = array_merge($this->optionsProto, $options);
297 5
    }
298
    
299 5
    private function checkRunTimes()
300
    {
301 5
        if ($this->options['run_times'] !== null && $this->options['run_times']-- == 0) {
0 ignored issues
show
Unused Code introduced by
This if statement, and the following return statement can be replaced with return !($this->options[...s['run_times']-- == 0);.
Loading history...
302 3
            return false;
303
        }
304 5
        return true;
305
    }
306
}
307