1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Dtc\QueueBundle\Command; |
4
|
|
|
|
5
|
|
|
use Doctrine\Common\Persistence\ObjectManager; |
6
|
|
|
use Dtc\QueueBundle\Model\Job; |
7
|
|
|
use Dtc\QueueBundle\Model\Run; |
8
|
|
|
use Psr\Log\LoggerInterface; |
9
|
|
|
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; |
10
|
|
|
use Symfony\Component\Console\Input\InputArgument; |
11
|
|
|
use Symfony\Component\Console\Input\InputInterface; |
12
|
|
|
use Symfony\Component\Console\Input\InputOption; |
13
|
|
|
use Symfony\Component\Console\Output\OutputInterface; |
14
|
|
|
|
15
|
|
|
class RunCommand extends ContainerAwareCommand |
16
|
|
|
{ |
17
|
|
|
/** @var ObjectManager */ |
18
|
|
|
protected $runManager; |
19
|
|
|
|
20
|
|
|
/** @var Run $run */ |
21
|
|
|
protected $run; |
22
|
|
|
|
23
|
|
|
/** @var string */ |
24
|
|
|
protected $runClass; |
25
|
|
|
|
26
|
|
|
/** @var OutputInterface */ |
27
|
|
|
protected $output; |
28
|
|
|
|
29
|
|
|
/** @var LoggerInterface */ |
30
|
|
|
protected $logger; |
31
|
|
|
|
32
|
|
|
protected function configure() |
33
|
|
|
{ |
34
|
|
|
$this |
35
|
|
|
->setName('dtc:queue:run') |
36
|
|
|
->setDefinition( |
37
|
|
|
array( |
38
|
|
|
new InputArgument('worker_name', InputArgument::OPTIONAL, 'Name of worker', null), |
39
|
|
|
new InputArgument('method', InputArgument::OPTIONAL, 'DI method of worker', null), |
40
|
|
|
new InputOption( |
41
|
|
|
'id', |
42
|
|
|
'i', |
43
|
|
|
InputOption::VALUE_REQUIRED, |
44
|
|
|
'Id of Job to run', |
45
|
|
|
null |
46
|
|
|
), |
47
|
|
|
new InputOption( |
48
|
|
|
'max_count', |
49
|
|
|
'm', |
50
|
|
|
InputOption::VALUE_REQUIRED, |
51
|
|
|
'Maximum number of jobs to work on before exiting', |
52
|
|
|
null |
53
|
|
|
), |
54
|
|
|
new InputOption( |
55
|
|
|
'duration', |
56
|
|
|
'd', |
57
|
|
|
InputOption::VALUE_REQUIRED, |
58
|
|
|
'Duration to run for in seconds', |
59
|
|
|
null |
60
|
|
|
), |
61
|
|
|
new InputOption( |
62
|
|
|
'timeout', |
63
|
|
|
't', |
64
|
|
|
InputOption::VALUE_REQUIRED, |
65
|
|
|
'Process timeout in seconds (hard exit of process regardless)', |
66
|
|
|
3600 |
67
|
|
|
), |
68
|
|
|
new InputOption( |
69
|
|
|
'nano_sleep', |
70
|
|
|
's', |
71
|
|
|
InputOption::VALUE_REQUIRED, |
72
|
|
|
'If using duration, this is the time to sleep when there\'s no jobs in nanoseconds', |
73
|
|
|
500000000 |
74
|
|
|
), |
75
|
|
|
new InputOption( |
76
|
|
|
'logger', |
77
|
|
|
'l', |
78
|
|
|
InputOption::VALUE_REQUIRED, |
79
|
|
|
'Log using the logger service specified, or output to console if null (or an invalid logger service id) is passed in' |
80
|
|
|
), |
81
|
|
|
) |
82
|
|
|
) |
83
|
|
|
->setDescription('Start up a job in queue'); |
84
|
|
|
} |
85
|
|
|
|
86
|
|
|
protected function runJobById($start, $jobId) |
87
|
|
|
{ |
88
|
|
|
$this->runStart($start); |
89
|
|
|
$container = $this->getContainer(); |
90
|
|
|
$jobManager = $container->get('dtc_queue.job_manager'); |
91
|
|
|
$workerManager = $container->get('dtc_queue.worker_manager'); |
92
|
|
|
|
93
|
|
|
$job = $jobManager->getRepository()->find($jobId); |
94
|
|
|
if (!$job) { |
95
|
|
|
$this->log('error', "Job id is not found: {$jobId}"); |
96
|
|
|
$this->runStop($start); |
97
|
|
|
|
98
|
|
|
return; |
99
|
|
|
} |
100
|
|
|
|
101
|
|
|
$job = $workerManager->runJob($job); |
102
|
|
|
$this->reportJob($job); |
103
|
|
|
$this->run->setProcessed(1); |
104
|
|
|
$this->runStop($start); |
105
|
|
|
|
106
|
|
|
return; |
107
|
|
|
} |
108
|
|
|
|
109
|
|
|
private function validateIntNull($varName, $var, $pow) |
110
|
|
|
{ |
111
|
|
|
if (null === $var) { |
112
|
|
|
return null; |
113
|
|
|
} |
114
|
|
|
if (!ctype_digit(strval($var))) { |
115
|
|
|
throw new \Exception("$varName must be an integer"); |
116
|
|
|
} |
117
|
|
|
|
118
|
|
|
if (strval(intval($var)) !== strval($var) || $var <= 0 || $var >= pow(2, $pow)) { |
119
|
|
|
throw new \Exception("$varName must be an base 10 integer within 2^32"); |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
return intval($var); |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
public function log($level, $msg, array $context = []) |
126
|
|
|
{ |
127
|
|
|
if ($this->logger) { |
128
|
|
|
$this->logger->$level($msg, $context); |
129
|
|
|
|
130
|
|
|
return; |
131
|
|
|
} |
132
|
|
|
|
133
|
|
|
$date = new \DateTime(); |
134
|
|
|
$this->output->write("[$level] [".$date->format('c').'] '.$msg); |
135
|
|
|
if ($context) { |
|
|
|
|
136
|
|
|
$this->output->write(print_r($context, true)); |
137
|
|
|
} |
138
|
|
|
$this->output->writeln(''); |
139
|
|
|
} |
140
|
|
|
|
141
|
|
|
protected function execute(InputInterface $input, OutputInterface $output) |
142
|
|
|
{ |
143
|
|
|
$start = microtime(true); |
144
|
|
|
$this->output = $output; |
145
|
|
|
$container = $this->getContainer(); |
146
|
|
|
$workerName = $input->getArgument('worker_name'); |
147
|
|
|
$methodName = $input->getArgument('method'); |
148
|
|
|
$maxCount = $input->getOption('max_count', null); |
|
|
|
|
149
|
|
|
$duration = $input->getOption('duration', null); |
|
|
|
|
150
|
|
|
$processTimeout = $input->getOption('timeout', 3600); |
|
|
|
|
151
|
|
|
$nanoSleep = $input->getOption('nano_sleep', 500000000); |
|
|
|
|
152
|
|
|
$loggerService = $input->getOption('logger'); |
153
|
|
|
|
154
|
|
|
if ($container->has($loggerService)) { |
155
|
|
|
$this->logger = $container->get($loggerService); |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
$maxCount = $this->validateIntNull('max_count', $maxCount, 32); |
159
|
|
|
$duration = $this->validateIntNull('duration', $duration, 32); |
160
|
|
|
$processTimeout = $this->validateIntNull('timeout', $processTimeout, 32); |
161
|
|
|
$nanoSleep = $this->validateIntNull('nano_sleep', $nanoSleep, 63); |
162
|
|
|
|
163
|
|
|
if (null !== $duration && null !== $processTimeout && $duration >= $processTimeout) { |
164
|
|
|
$this->log('info', "duration ($duration) >= to process timeout ($processTimeout), so doubling process timeout to: ".(2 * $processTimeout)); |
165
|
|
|
$processTimeout *= 2; |
166
|
|
|
} |
167
|
|
|
|
168
|
|
|
if (null === $maxCount && null === $duration) { |
169
|
|
|
$maxCount = 1; |
170
|
|
|
} |
171
|
|
|
// Check to see if there are other instances |
172
|
|
|
set_time_limit($processTimeout); // Set timeout on the process |
173
|
|
|
|
174
|
|
|
if ($jobId = $input->getOption('id')) { |
175
|
|
|
return $this->runJobById($start, $jobId); // Run a single job |
176
|
|
|
} |
177
|
|
|
|
178
|
|
|
return $this->runLoop($start, $workerName, $methodName, $nanoSleep, $maxCount, $duration); |
179
|
|
|
} |
180
|
|
|
|
181
|
|
|
protected function runLoop($start, $workerName, $methodName, $nanoSleep, $maxCount, $duration) |
182
|
|
|
{ |
183
|
|
|
$container = $this->getContainer(); |
184
|
|
|
$workerManager = $container->get('dtc_queue.worker_manager'); |
185
|
|
|
$workerManager->setLoggingFunc([$this, 'log']); |
186
|
|
|
$this->runStart($start, $maxCount, $duration); |
187
|
|
|
try { |
188
|
|
|
$this->log('info', 'Staring up a new job...'); |
189
|
|
|
|
190
|
|
|
$endTime = null; |
191
|
|
|
if ($duration) { |
192
|
|
|
$interval = new \DateInterval("PT${duration}S"); |
193
|
|
|
$endTime = $this->run->getStartedAt()->add($interval); |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
$currentJob = 1; |
197
|
|
|
$noMoreJobsToRun = false; |
198
|
|
|
do { |
199
|
|
|
$this->run->setLastHeartbeatAt(new \DateTime()); |
200
|
|
|
$this->run->setElapsed(microtime(true) - $start); |
201
|
|
|
if ($this->runManager) { |
202
|
|
|
$this->runManager->persist($this->run); |
203
|
|
|
$this->runManager->flush(); |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
$job = $workerManager->run($workerName, $methodName, true, $this->run->getId()); |
207
|
|
|
if ($job) { |
208
|
|
|
$noMoreJobsToRun = false; |
209
|
|
|
$this->reportJob($job); |
210
|
|
|
$this->run->setProcessed($currentJob); |
211
|
|
|
if ($this->runManager) { |
212
|
|
|
$this->runManager->persist($this->run); |
213
|
|
|
$this->runManager->flush(); |
214
|
|
|
} |
215
|
|
|
++$currentJob; |
216
|
|
|
} else { |
217
|
|
|
if (!$noMoreJobsToRun) { |
218
|
|
|
$this->log('info', 'No more jobs to run ('.($currentJob - 1).' processed so far).'); |
219
|
|
|
$noMoreJobsToRun = true; |
220
|
|
|
} |
221
|
|
|
if ($maxCount && !$duration) { |
222
|
|
|
// time to finish |
223
|
|
|
$this->runStop($start); |
224
|
|
|
|
225
|
|
|
return 0; |
226
|
|
|
} |
227
|
|
|
$nanoSleepTime = function_exists('random_int') ? random_int(0, $nanoSleep) : mt_rand(0, $nanoSleep); |
228
|
|
|
time_nanosleep(0, $nanoSleepTime); // 500ms ?? |
|
|
|
|
229
|
|
|
} |
230
|
|
|
} while ((null === $maxCount || $currentJob <= $maxCount) && (null === $duration || (new \DateTime()) < $endTime)); |
231
|
|
|
} catch (\Exception $e) { |
232
|
|
|
// Uncaught error: possibly with QueueBundle itself |
233
|
|
|
$this->log('critical', $e->getMessage(), $e->getTrace()); |
234
|
|
|
} |
235
|
|
|
$this->runStop($start); |
236
|
|
|
|
237
|
|
|
return 0; |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
/** |
241
|
|
|
* Sets up the runManager (document / entity persister) if appropriate. |
242
|
|
|
* |
243
|
|
|
* @param $maxCount |
244
|
|
|
* @param $duration |
245
|
|
|
*/ |
246
|
|
|
protected function runStart($start, $maxCount = null, $duration = null) |
247
|
|
|
{ |
248
|
|
|
$container = $this->getContainer(); |
249
|
|
|
$this->runClass = $container->getParameter('dtc_queue.class_run'); |
250
|
|
|
$defaultManager = $container->getParameter('dtc_queue.default_manager'); |
251
|
|
|
if ('mongodb' == $defaultManager && $container->has('dtc_queue.document_manager')) { |
252
|
|
|
$this->runManager = $container->get('dtc_queue.document_manager'); |
253
|
|
|
} elseif ('orm' == $defaultManager && $container->has('dtc_queue.entity_manager')) { |
254
|
|
|
$this->runManager = $container->get('dtc_queue.entity_manager'); |
255
|
|
|
} |
256
|
|
|
|
257
|
|
|
$this->run = new $this->runClass(); |
258
|
|
|
$startDate = \DateTime::createFromFormat('U.u', $start); |
259
|
|
|
$this->run->setLastHeartbeatAt($startDate); |
260
|
|
|
$this->run->setStartedAt($startDate); |
261
|
|
|
if (null !== $maxCount) { |
262
|
|
|
$this->run->setMaxCount($maxCount); |
263
|
|
|
} |
264
|
|
|
$timeEnd = null; |
|
|
|
|
265
|
|
|
if (null !== $duration) { |
266
|
|
|
$this->run->setDuration($duration); |
267
|
|
|
} |
268
|
|
|
$this->run->setHostname(gethostname()); |
269
|
|
|
$this->run->setPid(getmypid()); |
270
|
|
|
$this->run->setProcessed(0); |
271
|
|
|
if ($this->runManager) { |
272
|
|
|
$this->runManager->persist($this->run); |
273
|
|
|
$this->runManager->flush(); |
274
|
|
|
} |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
protected function runStop($start) |
278
|
|
|
{ |
279
|
|
|
$end = microtime(true); |
280
|
|
|
$endTime = \DateTime::createFromFormat('U.u', $end); |
281
|
|
|
$this->run->setEndedAt($endTime); |
|
|
|
|
282
|
|
|
$this->run->setElapsed($end - $start); |
283
|
|
|
if ($this->runManager) { |
284
|
|
|
$this->runManager->remove($this->run); |
285
|
|
|
$this->runManager->flush(); |
286
|
|
|
} |
287
|
|
|
$this->log('info', 'Ended with '.$this->run->getProcessed().' job(s) processed over '.strval($this->run->getElapsed()).' seconds.'); |
288
|
|
|
} |
289
|
|
|
|
290
|
|
|
protected function reportJob(Job $job) |
291
|
|
|
{ |
292
|
|
|
if (Job::STATUS_ERROR == $job->getStatus()) { |
293
|
|
|
$message = "Error with job id: {$job->getId()}\n".$job->getMessage(); |
294
|
|
|
$this->log('error', $message); |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
$message = "Finished job id: {$job->getId()} in {$job->getElapsed()} seconds\n"; |
298
|
|
|
$this->log('info', $message); |
299
|
|
|
} |
300
|
|
|
} |
301
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.