This project does not seem to handle request data directly as such no vulnerable execution paths were found.
include
, or for example
via PHP's auto-loading mechanism.
These results are based on our legacy PHP analysis, consider migrating to our new PHP analysis engine instead. Learn more
1 | <?php |
||
2 | |||
3 | /** |
||
4 | * Gearman Bundle for Symfony2 / Symfony3 |
||
5 | * |
||
6 | * For the full copyright and license information, please view the LICENSE |
||
7 | * file that was distributed with this source code. |
||
8 | * |
||
9 | * Feel free to edit as you please, and have fun. |
||
10 | * |
||
11 | * @author Marc Morera <[email protected]> |
||
12 | */ |
||
13 | |||
14 | namespace Mkk\GearmanBundle\Service; |
||
15 | |||
16 | use Symfony\Component\Console\Output\NullOutput; |
||
17 | use Symfony\Component\Console\Output\OutputInterface; |
||
18 | use Symfony\Component\DependencyInjection\ContainerAwareInterface; |
||
19 | use Symfony\Component\DependencyInjection\ContainerAwareTrait; |
||
20 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
21 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||
22 | use Mkk\GearmanBundle\Command\Util\GearmanOutputAwareInterface; |
||
23 | use Mkk\GearmanBundle\Event\GearmanWorkExecutedEvent; |
||
24 | use Mkk\GearmanBundle\Event\GearmanWorkStartingEvent; |
||
25 | use Mkk\GearmanBundle\GearmanEvents; |
||
26 | use Mkk\GearmanBundle\Service\Abstracts\AbstractGearmanService; |
||
27 | use Mkk\GearmanBundle\Exceptions\ServerConnectionException; |
||
28 | |||
29 | /** |
||
30 | * Gearman execute methods. All Worker methods |
||
31 | */ |
||
32 | class GearmanExecute extends AbstractGearmanService implements ContainerAwareInterface |
||
33 | { |
||
34 | use ContainerAwareTrait; |
||
35 | |||
36 | /** |
||
37 | * @var EventDispatcherInterface |
||
38 | * |
||
39 | * EventDispatcher instance |
||
40 | */ |
||
41 | protected $eventDispatcher; |
||
42 | |||
43 | /** |
||
44 | * @var OutputInterface |
||
45 | * |
||
46 | * Output instance |
||
47 | */ |
||
48 | protected $output; |
||
49 | |||
50 | /** |
||
51 | * @var OptionsResolver |
||
52 | */ |
||
53 | protected $executeOptionsResolver; |
||
0 ignored issues
–
show
|
|||
54 | |||
55 | /** |
||
56 | * Boolean to track if a system signal has been received |
||
57 | * @var boolean |
||
58 | */ |
||
59 | protected $stopWorkSignalReceived; |
||
0 ignored issues
–
show
|
|||
60 | |||
61 | /** |
||
62 | * Bucket with worker objects configuration for PECL |
||
63 | * @var array |
||
64 | */ |
||
65 | protected $workersBucket = array(); |
||
66 | |||
67 | /** |
||
68 | * Construct method |
||
69 | * |
||
70 | * @param GearmanCacheWrapper $gearmanCacheWrapper GearmanCacheWrapper |
||
71 | * @param array $defaultSettings The default settings for the bundle |
||
72 | */ |
||
73 | 2 | public function __construct(GearmanCacheWrapper $gearmanCacheWrapper, array $defaultSettings) |
|
74 | { |
||
75 | 2 | parent::__construct($gearmanCacheWrapper, $defaultSettings); |
|
76 | |||
77 | 2 | $this->executeOptionsResolver = new OptionsResolver(); |
|
78 | 2 | $this->executeOptionsResolver |
|
79 | 2 | ->setDefaults(array( |
|
80 | 2 | 'iterations' => null, |
|
81 | 'minimum_execution_time' => null, |
||
82 | 'timeout' => null, |
||
83 | )) |
||
84 | 2 | ->setAllowedTypes('iterations', array('null', 'scalar')) |
|
85 | 2 | ->setAllowedTypes('minimum_execution_time', array('null', 'scalar')) |
|
86 | 2 | ->setAllowedTypes('timeout', array('null', 'scalar')); |
|
87 | |||
88 | |||
89 | 2 | $this->stopWorkSignalReceived = false; |
|
90 | |||
91 | /** |
||
92 | * If the pcntl_signal exists, subscribe to the terminate and restart events for graceful worker stops. |
||
93 | */ |
||
94 | 2 | if(false !== function_exists('pcntl_signal')) |
|
95 | { |
||
96 | declare(ticks = 1); |
||
97 | 2 | pcntl_signal(SIGTERM, array($this,"handleSystemSignal")); |
|
98 | 2 | pcntl_signal(SIGHUP, array($this,"handleSystemSignal")); |
|
99 | |||
100 | } |
||
101 | 2 | } |
|
102 | |||
103 | /** |
||
104 | * Toggles that work should be stopped, we only subscribe to SIGTERM and SIGHUP |
||
105 | * @param int $signo Signal number |
||
106 | */ |
||
107 | public function handleSystemSignal($signo) |
||
108 | { |
||
109 | $this->stopWorkSignalReceived = true; |
||
110 | } |
||
111 | |||
112 | /** |
||
113 | * Set event dispatcher |
||
114 | * |
||
115 | * @param EventDispatcherInterface $eventDispatcher |
||
116 | * |
||
117 | * @return GearmanExecute self Object |
||
118 | */ |
||
119 | 2 | public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) |
|
120 | { |
||
121 | 2 | $this->eventDispatcher = $eventDispatcher; |
|
122 | |||
123 | 2 | return $this; |
|
124 | } |
||
125 | |||
126 | /** |
||
127 | * Set output |
||
128 | * |
||
129 | * @param OutputInterface $output |
||
130 | * |
||
131 | * @return GearmanExecute self Object |
||
132 | */ |
||
133 | 6 | public function setOutput(OutputInterface $output) |
|
134 | { |
||
135 | 6 | $this->output = $output; |
|
136 | |||
137 | 6 | return $this; |
|
138 | } |
||
139 | |||
140 | /** |
||
141 | * Executes a job given a jobName and given settings and annotations of job |
||
142 | * |
||
143 | * @param string $jobName Name of job to be executed |
||
144 | * @param array $options Array of options passed to the callback |
||
145 | * @param \GearmanWorker $gearmanWorker Worker instance to use |
||
146 | */ |
||
147 | 1 | public function executeJob($jobName, array $options = array(), \GearmanWorker $gearmanWorker = null) |
|
148 | { |
||
149 | 1 | $worker = $this->getJob($jobName); |
|
150 | |||
151 | 1 | if (false !== $worker) { |
|
152 | 1 | $this->callJob($worker, $options, $gearmanWorker); |
|
153 | } |
||
154 | 1 | } |
|
155 | |||
156 | /** |
||
157 | * Given a worker, execute GearmanWorker function defined by job. |
||
158 | * |
||
159 | * @param array $worker Worker definition |
||
160 | * @param array $options Array of options passed to the callback |
||
161 | * @param \GearmanWorker $gearmanWorker Worker instance to use |
||
162 | * |
||
163 | * @throws ServerConnectionException if a connection to a server was not possible. |
||
164 | * |
||
165 | * @return GearmanExecute self Object |
||
166 | */ |
||
167 | 1 | private function callJob(Array $worker, array $options = array(), \GearmanWorker $gearmanWorker = null) |
|
168 | { |
||
169 | 1 | if(is_null($gearmanWorker)) { |
|
170 | $gearmanWorker = new \GearmanWorker; |
||
171 | } |
||
172 | |||
173 | 1 | if (isset($worker['job'])) { |
|
174 | |||
175 | 1 | $jobs = array($worker['job']); |
|
176 | 1 | $iterations = $worker['job']['iterations']; |
|
177 | 1 | $minimumExecutionTime = $worker['job']['minimumExecutionTime']; |
|
178 | 1 | $timeout = $worker['job']['timeout']; |
|
179 | 1 | $successes = $this->addServers($gearmanWorker, $worker['job']['servers']); |
|
180 | |||
181 | } else { |
||
182 | |||
183 | $jobs = $worker['jobs']; |
||
184 | $iterations = $worker['iterations']; |
||
185 | $minimumExecutionTime = $worker['minimumExecutionTime']; |
||
186 | $timeout = $worker['timeout']; |
||
187 | $successes = $this->addServers($gearmanWorker, $worker['servers']); |
||
188 | } |
||
189 | |||
190 | 1 | $options = $this->executeOptionsResolver->resolve($options); |
|
191 | |||
192 | 1 | $iterations = $options['iterations'] ?: $iterations; |
|
193 | 1 | $minimumExecutionTime = $options['minimum_execution_time'] ?: $minimumExecutionTime; |
|
194 | 1 | $timeout = $options['timeout'] ?: $timeout; |
|
195 | |||
196 | 1 | if (count($successes) < 1) { |
|
197 | if ($minimumExecutionTime > 0) { |
||
198 | sleep($minimumExecutionTime); |
||
199 | } |
||
200 | throw new ServerConnectionException('Worker was unable to connect to any server.'); |
||
201 | } |
||
202 | |||
203 | 1 | $objInstance = $this->createJob($worker); |
|
204 | |||
205 | /** |
||
206 | * Start the timer before running the worker. |
||
207 | */ |
||
208 | 1 | $time = time(); |
|
209 | 1 | $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations, $timeout); |
|
210 | |||
211 | /** |
||
212 | * If there is a minimum expected duration, wait out the remaining period if there is any. |
||
213 | */ |
||
214 | 1 | if ($minimumExecutionTime > 0) { |
|
215 | $now = time(); |
||
216 | $remaining = $minimumExecutionTime - ($now - $time); |
||
217 | |||
218 | if ($remaining > 0) { |
||
219 | sleep($remaining); |
||
220 | } |
||
221 | } |
||
222 | |||
223 | 1 | return $this; |
|
224 | } |
||
225 | |||
226 | /** |
||
227 | * Given a worker settings, return Job instance |
||
228 | * |
||
229 | * @param array $worker Worker settings |
||
230 | * |
||
231 | * @return Object Job instance |
||
232 | */ |
||
233 | 1 | private function createJob(array $worker) |
|
234 | { |
||
235 | /** |
||
236 | * If service is defined, we must retrieve this class with dependency injection |
||
237 | * |
||
238 | * Otherwise we just create it with a simple new() |
||
239 | */ |
||
240 | 1 | if ($worker['service']) { |
|
241 | |||
242 | $objInstance = $this->container->get($worker['service']); |
||
243 | |||
244 | } else { |
||
245 | |||
246 | 1 | $objInstance = new $worker['className']; |
|
247 | |||
248 | /** |
||
249 | * If instance of given object is instanceof |
||
250 | * ContainerAwareInterface, we inject full container by calling |
||
251 | * container setter. |
||
252 | * |
||
253 | * @see https://github.com/mmoreram/gearman-bundle/pull/12 |
||
254 | */ |
||
255 | 1 | if ($objInstance instanceof ContainerAwareInterface) { |
|
256 | |||
257 | $objInstance->setContainer($this->container); |
||
258 | } |
||
259 | } |
||
260 | |||
261 | 1 | return $objInstance; |
|
262 | } |
||
263 | |||
264 | /** |
||
265 | * Given a GearmanWorker and an instance of Job, run it |
||
266 | * |
||
267 | * @param \GearmanWorker $gearmanWorker Gearman Worker |
||
268 | * @param Object $objInstance Job instance |
||
269 | * @param array $jobs Array of jobs to subscribe |
||
270 | * @param integer $iterations Number of iterations |
||
271 | * @param integer $timeout Timeout |
||
272 | * |
||
273 | * @return GearmanExecute self Object |
||
274 | */ |
||
275 | 1 | private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations, $timeout = null) |
|
276 | { |
||
277 | /** |
||
278 | * Set the output of this instance, this should allow workers to use the console output. |
||
279 | */ |
||
280 | 1 | if ($objInstance instanceof GearmanOutputAwareInterface) { |
|
281 | $objInstance->setOutput($this->output ? : new NullOutput()); |
||
282 | } |
||
283 | |||
284 | /** |
||
285 | * Every job defined in worker is added into GearmanWorker |
||
286 | */ |
||
287 | 1 | foreach ($jobs as $job) { |
|
288 | |||
289 | /** |
||
290 | * worker needs to have it's context into separated memory space; |
||
291 | * if it's passed as a value, then garbage collector remove the target |
||
292 | * what causes a segfault (see https://github.com/wcgallego/pecl-gearman/issues/19) |
||
293 | */ |
||
294 | 1 | $this->workersBucket[$job['realCallableName']] = array( |
|
295 | 1 | 'job_object_instance' => $objInstance, |
|
296 | 1 | 'job_method' => $job['methodName'], |
|
297 | 1 | 'jobs' => $jobs |
|
298 | ); |
||
299 | |||
300 | 1 | $gearmanWorker->addFunction( |
|
301 | 1 | $job['realCallableName'], |
|
302 | 1 | array($this, 'handleJob') |
|
303 | ); |
||
304 | } |
||
305 | |||
306 | /** |
||
307 | * If iterations value is 0, is like worker will never die |
||
308 | */ |
||
309 | 1 | $alive = (0 === $iterations); |
|
310 | |||
311 | 1 | if ($timeout > 0) { |
|
312 | $gearmanWorker->setTimeout($timeout * 1000); |
||
313 | } |
||
314 | |||
315 | /** |
||
316 | * Executes GearmanWorker with all jobs defined |
||
317 | */ |
||
318 | 1 | while (false === $this->stopWorkSignalReceived && $gearmanWorker->work()) { |
|
319 | |||
320 | 1 | $iterations--; |
|
321 | |||
322 | 1 | $event = new GearmanWorkExecutedEvent($jobs, $iterations, $gearmanWorker->returnCode()); |
|
323 | 1 | $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_EXECUTED, $event); |
|
324 | |||
325 | 1 | if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) { |
|
326 | |||
327 | break; |
||
328 | } |
||
329 | |||
330 | /** |
||
331 | * Only finishes its execution if alive is false and iterations |
||
332 | * arrives to 0 |
||
333 | */ |
||
334 | 1 | if (!$alive && $iterations <= 0) { |
|
335 | |||
336 | 1 | break; |
|
337 | } |
||
338 | } |
||
339 | 1 | } |
|
340 | |||
341 | /** |
||
342 | * Adds into worker all defined Servers. |
||
343 | * If any is defined, performs default method |
||
344 | * |
||
345 | * @param \GearmanWorker $gmworker Worker to perform configuration |
||
346 | * @param array $servers Servers array |
||
347 | * |
||
348 | * @throws ServerConnectionException if a connection to a server was not possible. |
||
349 | * |
||
350 | * @return array Successfully added servers |
||
351 | */ |
||
352 | 1 | private function addServers(\GearmanWorker $gmworker, array $servers) |
|
353 | { |
||
354 | 1 | $successes = array(); |
|
355 | |||
356 | 1 | if (!empty($servers)) { |
|
357 | |||
358 | foreach ($servers as $server) { |
||
359 | if ($gmworker->addServer($server['host'], $server['port'])) { |
||
360 | $successes[] = $server; |
||
361 | } |
||
362 | } |
||
363 | } else { |
||
364 | 1 | if ($gmworker->addServer()) { |
|
365 | 1 | $successes[] = array('127.0.0.1', 4730); |
|
366 | } |
||
367 | } |
||
368 | |||
369 | 1 | return $successes; |
|
370 | } |
||
371 | |||
372 | /** |
||
373 | * Executes a worker given a workerName subscribing all his jobs inside and |
||
374 | * given settings and annotations of worker and jobs |
||
375 | * |
||
376 | * @param string $workerName Name of worker to be executed |
||
377 | */ |
||
378 | public function executeWorker($workerName, array $options = array()) |
||
379 | { |
||
380 | $worker = $this->getWorker($workerName); |
||
381 | |||
382 | if (false !== $worker) { |
||
383 | |||
384 | $this->callJob($worker, $options); |
||
385 | } |
||
386 | } |
||
387 | |||
388 | /** |
||
389 | * Wrapper function handler for all registered functions |
||
390 | * This allows us to do some nice logging when jobs are started/finished |
||
391 | * |
||
392 | * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206 |
||
393 | * |
||
394 | * @param \GearmanJob $job |
||
395 | * |
||
396 | * @return mixed |
||
397 | */ |
||
398 | 1 | public function handleJob(\GearmanJob $job) |
|
399 | { |
||
400 | 1 | if(!isset($this->workersBucket[$job->functionName()])){ |
|
401 | $context = false; |
||
402 | }else{ |
||
403 | 1 | $context = $this->workersBucket[$job->functionName()]; |
|
404 | } |
||
405 | |||
406 | if ( |
||
407 | 1 | !is_array($context) |
|
408 | 1 | || !array_key_exists('job_object_instance', $context) |
|
409 | 1 | || !array_key_exists('job_method', $context) |
|
410 | ) { |
||
411 | throw new \InvalidArgumentException('$context shall be an array with job_object_instance and job_method key.'); |
||
412 | } |
||
413 | |||
414 | 1 | $event = new GearmanWorkStartingEvent($context['jobs']); |
|
415 | 1 | $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_STARTING, $event); |
|
416 | |||
417 | 1 | $result = call_user_func_array( |
|
418 | 1 | array($context['job_object_instance'], $context['job_method']), |
|
419 | 1 | array($job, $context) |
|
420 | ); |
||
421 | |||
422 | /** |
||
423 | * Workaround for PECL bug #17114 |
||
424 | * http://pecl.php.net/bugs/bug.php?id=17114 |
||
425 | */ |
||
426 | 1 | $type = gettype($result); |
|
427 | 1 | settype($result, $type); |
|
428 | |||
429 | 1 | return $result; |
|
430 | } |
||
431 | } |
||
432 |
Very long variable names usually make code harder to read. It is therefore recommended not to make variable names too verbose.