This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | namespace TreeHouse\WorkerBundle\Command; |
||
4 | |||
5 | use Psr\Log\LoggerInterface; |
||
6 | use Symfony\Bridge\Monolog\Handler\ConsoleHandler; |
||
7 | use Symfony\Component\Console\Command\Command; |
||
8 | use Symfony\Component\Console\Input\InputInterface; |
||
9 | use Symfony\Component\Console\Input\InputOption; |
||
10 | use Symfony\Component\Console\Output\OutputInterface; |
||
11 | use TreeHouse\WorkerBundle\Exception\AbortException; |
||
12 | use TreeHouse\WorkerBundle\QueueManager; |
||
13 | use TreeHouse\WorkerBundle\WorkerEvents; |
||
14 | |||
15 | class RunCommand extends Command |
||
16 | { |
||
17 | /** |
||
18 | * @var QueueManager |
||
19 | */ |
||
20 | protected $manager; |
||
21 | |||
22 | /** |
||
23 | * @var OutputInterface |
||
24 | */ |
||
25 | protected $output; |
||
26 | |||
27 | /** |
||
28 | * @param QueueManager $queueManager |
||
29 | */ |
||
30 | public function __construct(QueueManager $queueManager) |
||
31 | { |
||
32 | $this->manager = $queueManager; |
||
33 | |||
34 | parent::__construct(); |
||
35 | } |
||
36 | |||
37 | /** |
||
38 | * @inheritdoc |
||
39 | */ |
||
40 | protected function configure() |
||
41 | { |
||
42 | $this |
||
43 | ->setName('worker:run') |
||
44 | ->setDescription('Starts a worker') |
||
45 | ->addOption('action', 'a', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Selects actions to run, defaults to all') |
||
46 | ->addOption('exclude', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Excludes actions to run') |
||
47 | ->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Maximum number of jobs to execute', 20) |
||
48 | ->addOption('max-memory', 'm', InputOption::VALUE_OPTIONAL, 'Maximum amount of memory to use (in MB). The worker will try to stop before this limit is reached. Set to 0 for infinite.', 0) |
||
49 | ->addOption('max-time', 't', InputOption::VALUE_OPTIONAL, 'Maximum running time in seconds. Set to 0 for infinite', 0) |
||
50 | ->addOption('batch-size', 'b', InputOption::VALUE_OPTIONAL, 'Number of jobs to process before completing a batch', 15) |
||
51 | ->addOption('min-duration', 'd', InputOption::VALUE_OPTIONAL, 'Number of seconds to the worker process should minimal take to run', 15) |
||
52 | ->addOption('timeout', null, InputOption::VALUE_OPTIONAL, 'Maximum number of seconds to wait for a job. Below 0 for infinite', 60) |
||
53 | ; |
||
54 | } |
||
55 | |||
56 | /** |
||
57 | * @inheritdoc |
||
58 | */ |
||
59 | protected function execute(InputInterface $input, OutputInterface $output) |
||
60 | { |
||
61 | $this->output = $output; |
||
62 | |||
63 | $dispatcher = $this->manager->getDispatcher(); |
||
64 | |||
65 | $maxMemory = intval($input->getOption('max-memory')) * 1024 * 1024; |
||
66 | $maxTime = intval($input->getOption('max-time')); |
||
67 | $maxJobs = intval($input->getOption('limit')); |
||
68 | $batchSize = intval($input->getOption('batch-size')); |
||
69 | $minDuration = intval($input->getOption('min-duration')); |
||
70 | $timeout = intval($input->getOption('timeout')); |
||
71 | |||
72 | $logger = $this->manager->getLogger(); |
||
73 | |||
74 | $this->attachConsoleHandler($logger, $output); |
||
75 | |||
76 | // configure pheanstalk to watch the right tubes |
||
77 | $this->watchActions($input->getOption('action'), $input->getOption('exclude')); |
||
78 | |||
79 | $start = time(); |
||
80 | $jobsCompleted = 0; |
||
81 | |||
82 | $this->output(sprintf('Waiting at most <info>%d seconds</info> for a reserved job...', $timeout)); |
||
83 | |||
84 | $exit = 0; |
||
85 | while ($job = $this->manager->get($timeout)) { |
||
86 | $stats = $this->manager->getJobStats($job); |
||
87 | |||
88 | $timeStart = microtime(true) * 1000; |
||
89 | $memStart = memory_get_usage(true); |
||
90 | |||
91 | try { |
||
92 | $this->output( |
||
93 | sprintf( |
||
94 | 'Working job <info>%d</info> for action <comment>%s</comment> with payload <info>%s</info>', |
||
95 | $job->getId(), |
||
96 | $stats['tube'], |
||
97 | $job->getData() |
||
98 | ) |
||
99 | ); |
||
100 | |||
101 | $result = $this->manager->executeJob($job); |
||
102 | } catch (AbortException $e) { |
||
103 | $message = 'Worker aborted ' . ($e->getReason() ? ('with reason: ' . $e->getReason()) : 'without a given reason'); |
||
104 | $this->output($message); |
||
105 | |||
106 | $exit = 1; |
||
107 | |||
108 | break; |
||
109 | } |
||
110 | |||
111 | $duration = microtime(true) * 1000 - $timeStart; |
||
112 | $usage = memory_get_usage(true) - $memStart; |
||
113 | $message = sprintf( |
||
114 | 'Completed job <info>%d</info> in <comment>%dms</comment> using <comment>%s</comment> with result: <info>%s</info>', |
||
115 | $job->getId(), |
||
116 | $duration, |
||
117 | $this->formatBytes($usage), |
||
118 | json_encode($result, JSON_UNESCAPED_SLASHES) |
||
119 | ); |
||
120 | $this->output($message); |
||
121 | |||
122 | ++$jobsCompleted; |
||
123 | |||
124 | // intermediate flush |
||
125 | if ($jobsCompleted % $batchSize === 0) { |
||
126 | $this->output('Batch complete', OutputInterface::VERBOSITY_VERBOSE); |
||
127 | $dispatcher->dispatch(WorkerEvents::FLUSH); |
||
128 | } |
||
129 | |||
130 | if ($jobsCompleted >= $maxJobs) { |
||
131 | $this->output(sprintf('Maximum number of jobs completed (%d)', $maxJobs), OutputInterface::VERBOSITY_VERBOSE); |
||
132 | |||
133 | break; |
||
134 | } |
||
135 | |||
136 | if (($maxMemory > 0) && memory_get_usage(true) > $maxMemory) { |
||
137 | $this->output( |
||
138 | sprintf('Memory peak of %dMB reached (peak: %sMB)', $maxMemory / 1024 / 1024, memory_get_usage(true) / 1024 / 1024), |
||
139 | OutputInterface::VERBOSITY_VERBOSE |
||
140 | ); |
||
141 | |||
142 | break; |
||
143 | } |
||
144 | |||
145 | if (($maxTime > 0) && ((time() - $start) > $maxTime)) { |
||
146 | $this->output( |
||
147 | sprintf('Maximum execution time of %ds reached', $maxTime), |
||
148 | OutputInterface::VERBOSITY_VERBOSE |
||
149 | ); |
||
150 | |||
151 | break; |
||
152 | } |
||
153 | } |
||
154 | |||
155 | // flush remaining |
||
156 | $dispatcher->dispatch(WorkerEvents::FLUSH); |
||
157 | |||
158 | // make sure worker doesn't quit to quickly, or supervisor will mark it |
||
159 | // as a failed restart, and put the worker in FATAL state. |
||
160 | $duration = time() - $start; |
||
161 | if ($duration < $minDuration) { |
||
162 | $this->output(sprintf('Sleeping until worker has run for at least %s seconds', $minDuration)); |
||
163 | sleep($minDuration - $duration); |
||
164 | } |
||
165 | |||
166 | $this->output('Shutting down worker'); |
||
167 | |||
168 | return $exit; |
||
169 | } |
||
170 | |||
171 | /** |
||
172 | * @param LoggerInterface $logger |
||
173 | * @param OutputInterface $output |
||
174 | */ |
||
175 | protected function attachConsoleHandler(LoggerInterface $logger, OutputInterface $output) |
||
176 | { |
||
177 | if (!class_exists('Monolog\\Logger') || !$logger instanceof \Monolog\Logger) { |
||
178 | return; |
||
179 | } |
||
180 | |||
181 | foreach ($logger->getHandlers() as $handler) { |
||
182 | if ($handler instanceof ConsoleHandler) { |
||
183 | return; |
||
184 | } |
||
185 | } |
||
186 | |||
187 | $logger->pushHandler(new ConsoleHandler($output)); |
||
188 | } |
||
189 | |||
190 | /** |
||
191 | * @param string[] $include |
||
192 | * @param string[] $exclude |
||
193 | */ |
||
194 | protected function watchActions(array $include = [], array $exclude = []) |
||
195 | { |
||
196 | $actions = array_keys($this->manager->getExecutors()); |
||
197 | |||
198 | if (empty($include)) { |
||
199 | $include = $actions; |
||
0 ignored issues
–
show
Coding Style
introduced
by
Loading history...
|
|||
200 | } |
||
201 | |||
202 | View Code Duplication | if (!empty($diff = array_diff($include, $actions))) { |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
203 | throw new \InvalidArgumentException(sprintf('Action(s) "%s" are not defined by QueueManager', implode(', ', $diff))); |
||
204 | } |
||
205 | |||
206 | View Code Duplication | if (!empty($diff = array_diff($exclude, $actions))) { |
|
0 ignored issues
–
show
This code seems to be duplicated across your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
207 | throw new \InvalidArgumentException(sprintf('Filter(s) "%s" are not defined by QueueManager', implode(', ', $diff))); |
||
208 | } |
||
209 | |||
210 | $include = array_diff($include, $exclude); |
||
0 ignored issues
–
show
|
|||
211 | |||
212 | if (empty($include)) { |
||
213 | throw new \InvalidArgumentException('No actions specified to run'); |
||
214 | } |
||
215 | |||
216 | // watch only these actions |
||
217 | $this->manager->watchOnly($include); |
||
218 | } |
||
219 | |||
220 | /** |
||
221 | * @param string $msg |
||
222 | * @param int $threshold |
||
223 | */ |
||
224 | protected function output($msg, $threshold = OutputInterface::VERBOSITY_NORMAL) |
||
225 | { |
||
226 | if ($this->output->getVerbosity() >= $threshold) { |
||
227 | $this->output->writeln(sprintf('[%s] %s', date('Y-m-d H:i:s'), $msg)); |
||
228 | } |
||
229 | } |
||
230 | |||
231 | /** |
||
232 | * @param int $bytes |
||
233 | * |
||
234 | * @return string |
||
235 | */ |
||
236 | View Code Duplication | private function formatBytes($bytes) |
|
0 ignored issues
–
show
This method seems to be duplicated in your project.
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation. You can also find more detailed suggestions in the “Code” section of your repository.
Loading history...
|
|||
237 | { |
||
238 | $bytes = (int) $bytes; |
||
0 ignored issues
–
show
|
|||
239 | |||
240 | if ($bytes > 1024*1024) { |
||
241 | return round($bytes/1024/1024, 2).'MB'; |
||
242 | } elseif ($bytes > 1024) { |
||
243 | return round($bytes/1024, 2).'KB'; |
||
244 | } |
||
245 | |||
246 | return $bytes . 'B'; |
||
247 | } |
||
248 | } |
||
249 |