1 | <?php |
||
28 | class DoctrineWorkerRunner implements WorkerRunnerInterface |
||
29 | { |
||
30 | |||
31 | /** |
||
32 | * Number of maximum consecutive failed fetch operations before worker will fail permanently. |
||
33 | * |
||
34 | * @var integer |
||
35 | */ |
||
36 | const MAX_FAILED_FETCHES = 10; |
||
37 | |||
38 | /** |
||
39 | * Maximum process failures before command is discarded. |
||
40 | * |
||
41 | * @var integer |
||
42 | */ |
||
43 | const MAX_FAILED_PROCESS_ATTEMPTS = 10; |
||
44 | |||
45 | /** |
||
46 | * Doctrine worker. |
||
47 | * |
||
48 | * @var DoctrineWorker |
||
49 | */ |
||
50 | private $worker; |
||
51 | |||
52 | /** |
||
53 | * Database connection. |
||
54 | * |
||
55 | * @var Connection |
||
56 | */ |
||
57 | private $connection; |
||
58 | |||
59 | /** |
||
60 | * Table name. |
||
61 | * |
||
62 | * @var string |
||
63 | */ |
||
64 | private $tableName; |
||
65 | |||
66 | /** |
||
67 | * Pool. |
||
68 | * |
||
69 | * @var string |
||
70 | */ |
||
71 | private $pool; |
||
72 | |||
73 | /** |
||
74 | * Logger. |
||
75 | * |
||
76 | * @var LoggerInterface |
||
77 | */ |
||
78 | private $logger; |
||
79 | |||
80 | /** |
||
81 | * Number of consecutive failed fetches. |
||
82 | * |
||
83 | * @var integer |
||
84 | */ |
||
85 | private $failedFetches = 0; |
||
86 | |||
87 | /** |
||
88 | * Current options. |
||
89 | * |
||
90 | * @var array |
||
91 | */ |
||
92 | private $options = array(); |
||
93 | |||
94 | private $optionsProto = array( |
||
95 | 'run_times' => null, |
||
96 | 'exit_if_empty' => false, |
||
97 | 'sleep_intervals' => array(3000000, 6000000), |
||
98 | ); |
||
99 | |||
100 | /** |
||
101 | * Class constructor. |
||
102 | * |
||
103 | * @param DoctrineWorker $worker |
||
104 | * @param Connection $connection |
||
105 | * @param string $tableName |
||
106 | * @param string $pool |
||
107 | */ |
||
108 | 5 | public function __construct(DoctrineWorker $worker, Connection $connection, $tableName, $pool, LoggerInterface $logger = null) |
|
116 | |||
117 | 5 | public function run(array $options, OutputInterface $output = null) |
|
132 | |||
133 | 5 | private function runIteration(OutputInterface $output, $exitIfEmpty = false) |
|
151 | |||
152 | /** |
||
153 | * Fetch next command data to process. |
||
154 | * |
||
155 | * @param OutputInterface $output |
||
156 | * @return array|boolean |
||
157 | * @throws FetchException Thrown, when fetch failed permanently. |
||
158 | */ |
||
159 | 5 | private function fetchNext(OutputInterface $output) |
|
175 | |||
176 | 3 | private function processNext($commandData, OutputInterface $output) |
|
194 | |||
195 | /** |
||
196 | * Fetch next command data. |
||
197 | * |
||
198 | * @return array|boolean Array with command data, if there are pending commands; false otherwise. |
||
199 | * @throws FetchException Thrown, when fetch resulted in an error (lock impossible). |
||
200 | */ |
||
201 | 5 | private function fetchNextCommandData() |
|
223 | |||
224 | 5 | private function prepareFetchNextSql() |
|
234 | |||
235 | /** |
||
236 | * Sleep to prevent hammering database. |
||
237 | * |
||
238 | * Sleep interval is somewhat random to minify risk of database collisions. |
||
239 | * |
||
240 | * @return void |
||
241 | */ |
||
242 | 3 | private function sleep() |
|
246 | |||
247 | /** |
||
248 | * Set processed status for row. |
||
249 | * |
||
250 | * @param integer $id |
||
251 | * @return void |
||
252 | * @throws DBALException Thrown, if race condition has been detected or other database error occurred. |
||
253 | */ |
||
254 | 4 | private function setProcessed($id) |
|
271 | |||
272 | /** |
||
273 | * Update processed status for row. |
||
274 | * |
||
275 | * @param integer $id |
||
276 | * @param integer $failedRetries |
||
277 | * @return integer |
||
278 | */ |
||
279 | 1 | private function setProcessedFailure($id, $failedRetries) |
|
298 | |||
299 | 5 | private function prepareOptions($options) { |
|
302 | |||
303 | 5 | private function checkRunTimes() |
|
307 | } |
||
308 |