QueueWorker::run()   F
last analyzed

Complexity

Conditions 31
Paths 1541

Size

Total Lines 248
Code Lines 105

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 992

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 31
eloc 105
c 1
b 0
f 0
nc 1541
nop 0
dl 0
loc 248
ccs 0
cts 124
cp 0
crap 992
rs 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * \AppserverIo\Appserver\MessageQueue\QueueWorker
5
 *
6
 * NOTICE OF LICENSE
7
 *
8
 * This source file is subject to the Open Software License (OSL 3.0)
9
 * that is available through the world-wide-web at this URL:
10
 * http://opensource.org/licenses/osl-3.0.php
11
 *
12
 * PHP version 5
13
 *
14
 * @author    Tim Wagner <[email protected]>
15
 * @copyright 2015 TechDivision GmbH <[email protected]>
16
 * @license   http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0)
17
 * @link      https://github.com/appserver-io/appserver
18
 * @link      http://www.appserver.io
19
 */
20
21
namespace AppserverIo\Appserver\MessageQueue;
22
23
use AppserverIo\Storage\GenericStackable;
24
use AppserverIo\Psr\Naming\InitialContext;
25
use AppserverIo\Psr\Pms\JobInterface;
26
use AppserverIo\Psr\Pms\MessageInterface;
27
use AppserverIo\Psr\Pms\PriorityKeyInterface;
28
use AppserverIo\Psr\Application\ApplicationInterface;
29
use AppserverIo\Messaging\Utils\StateActive;
30
use AppserverIo\Messaging\Utils\StateFailed;
31
use AppserverIo\Messaging\Utils\StateInProgress;
32
use AppserverIo\Messaging\Utils\StatePaused;
33
use AppserverIo\Messaging\Utils\StateProcessed;
34
use AppserverIo\Messaging\Utils\StateToProcess;
35
use AppserverIo\Messaging\Utils\StateUnknown;
36
use AppserverIo\Appserver\Core\AbstractDaemonThread;
37
use AppserverIo\Appserver\Core\Utilities\EnumState;
38
use AppserverIo\Appserver\Core\Environment;
39
use AppserverIo\Appserver\Core\Utilities\EnvironmentKeys;
40
use AppserverIo\Psr\Servlet\SessionUtils;
41
use AppserverIo\Appserver\Core\Utilities\LoggerUtils;
42
43
/**
44
 * A message queue worker implementation listening to a queue, defined in the passed application.
45
 *
46
 * @author    Tim Wagner <[email protected]>
47
 * @copyright 2015 TechDivision GmbH <[email protected]>
48
 * @license   http://opensource.org/licenses/osl-3.0.php Open Software License (OSL 3.0)
49
 * @link      https://github.com/appserver-io/appserver
50
 * @link      http://www.appserver.io
51
 *
52
 * @property \AppserverIo\Psr\Application\ApplicationInterface                 $application     The application instance with the queue manager/locator
53
 * @property \AppserverIo\Storage\GenericStackable                             $jobsToExecute   The storage for the jobs to be executed
54
 * @property \AppserverIo\Storage\GenericStackable                             $messages        The storage for the messages
55
 * @property \AppserverIo\Psr\Pms\PriorityKeyInterface                         $priorityKey     The priority of this queue worker
56
 * @property \AppserverIo\Appserver\MessageQueue\QueueManagerSettingsInterface $managerSettings The queue settings
57
 */
58
class QueueWorker extends AbstractDaemonThread
59
{
60
61
    /**
62
     * Injects the priority of the queue worker.
63
     *
64
     * @param \AppserverIo\Psr\Pms\PriorityKeyInterface $priorityKey The priority of this queue worker
65
     *
66
     * @return void
67
     */
68
    public function injectPriorityKey(PriorityKeyInterface $priorityKey)
69
    {
70
        $this->priorityKey = $priorityKey;
71
    }
72
73
    /**
74
     * Inject the storage for the messages.
75
     *
76
     * @param \AppserverIo\Storage\GenericStackable $messages The storage for the messages
77
     *
78
     * @return void
79
     */
80
    public function injectMessages(GenericStackable $messages)
81
    {
82
        $this->messages = $messages;
83
    }
84
85
    /**
86
     * Inject the storage for the jobs to be executed.
87
     *
88
     * @param \AppserverIo\Storage\GenericStackable $jobsToExecute The storage for the jobs to be executed
89
     *
90
     * @return void
91
     */
92
    public function injectJobsToExecute(GenericStackable $jobsToExecute)
93
    {
94
        $this->jobsToExecute = $jobsToExecute;
95
    }
96
97
    /**
98
     * Inject the application instance the worker is bound to.
99
     *
100
     * @param \AppserverIo\Psr\Application\ApplicationInterface $application The application instance
101
     *
102
     * @return void
103
     */
104
    public function injectApplication(ApplicationInterface $application)
105
    {
106
        $this->application = $application;
107
    }
108
109
    /**
110
     * Injects the queue manager settings.
111
     *
112
     * @param \AppserverIo\Appserver\MessageQueue\QueueManagerSettingsInterface $managerSettings The queue manager settings
113
     *
114
     * @return void
115
     */
116
    public function injectManagerSettings(QueueManagerSettingsInterface $managerSettings)
117
    {
118
        $this->managerSettings = $managerSettings;
119
    }
120
121
    /**
122
     * Returns the application instance the worker is bound to.
123
     *
124
     * @return \AppserverIo\Psr\Application\ApplicationInterface The application instance
125
     */
126
    public function getApplication()
127
    {
128
        return $this->application;
129
    }
130
131
    /**
132
     * Return's the queue manager settings.
133
     *
134
     * @return \AppserverIo\Appserver\MessageQueue\QueueManagerSettingsInterface The queue manager settings
135
     */
136
    public function getManagerSettings()
137
    {
138
        return $this->managerSettings;
139
    }
140
141
    /**
142
     * Returns the default timeout.
143
     *
144
     * Reduce CPU load depending on the queues priority, whereas priority
145
     * can be 1, 2 or 3 actually, so possible values for usleep are:
146
     *
147
     * PriorityHigh:         100 === 0.0001 s
148
     * PriorityMedium:    10.000 === 0.01 s
149
     * PriorityLow:    1.000.000 === 1 s
150
     *
151
     * @return integer The default timeout in microseconds
152
     * @see \AppserverIo\Appserver\Core\AbstractDaemonThread::getDefaultTimeout()
153
     */
154
    public function getQueueTimeout()
155
    {
156
        return pow(10, $this->priorityKey->getPriority() * 2);
157
    }
158
159
    /**
160
     * Attaches a job for the passed wrapper to the worker instance.
161
     *
162
     * @param \stdClass $jobWrapper The job wrapper to attach the job for
163
     *
164
     * @return void
165
     */
166
    public function attach(\stdClass $jobWrapper)
167
    {
168
169
        // attach the job wrapper
170
        $this->synchronized(function (QueueWorker $self, \stdClass $jw) {
171
            $self->jobsToExecute[$jw->jobId] = $jw;
172
        }, $this, $jobWrapper);
173
    }
174
175
    /**
176
     * This is a very basic method to log some stuff by using the error_log() method of PHP.
177
     *
178
     * @param mixed  $level   The log level to use
179
     * @param string $message The message we want to log
180
     * @param array  $context The context we of the message
181
     *
182
     * @return void
183
     */
184
    public function log($level, $message, array $context = array())
185
    {
186
        LoggerUtils::log($level, $message, $context);
187
    }
188
189
    /**
190
     * This method will be invoked before the while() loop starts and can be used
191
     * to implement some bootstrap functionality.
192
     *
193
     * @return void
194
     */
195
    public function bootstrap()
196
    {
197
198
        // register the default autoloader
199
        require SERVER_AUTOLOADER;
200
201
        // synchronize the application instance and register the class loaders
202
        $application = $this->application;
203
        $application->registerClassLoaders();
0 ignored issues
show
Bug introduced by
The method registerClassLoaders() does not exist on AppserverIo\Psr\Application\ApplicationInterface. It seems like you code against a sub-type of AppserverIo\Psr\Application\ApplicationInterface such as AppserverIo\Appserver\Application\Application. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

203
        $application->/** @scrutinizer ignore-call */ 
204
                      registerClassLoaders();
Loading history...
204
205
        // register the applications annotation registries
206
        $application->registerAnnotationRegistries();
0 ignored issues
show
Bug introduced by
The method registerAnnotationRegistries() does not exist on AppserverIo\Psr\Application\ApplicationInterface. It seems like you code against a sub-type of AppserverIo\Psr\Application\ApplicationInterface such as AppserverIo\Appserver\Application\Application. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

206
        $application->/** @scrutinizer ignore-call */ 
207
                      registerAnnotationRegistries();
Loading history...
207
208
        // add the application instance to the environment
209
        Environment::singleton()->setAttribute(EnvironmentKeys::APPLICATION, $application);
210
211
        // create s simulated request/session ID whereas session equals request ID
212
        Environment::singleton()->setAttribute(EnvironmentKeys::SESSION_ID, $sessionId = SessionUtils::generateRandomString());
213
        Environment::singleton()->setAttribute(EnvironmentKeys::REQUEST_ID, $sessionId);
214
215
        // try to load the profile logger
216
        if ($this->profileLogger = $application->getInitialContext()->getLogger(\AppserverIo\Logger\LoggerUtils::PROFILE)) {
0 ignored issues
show
Bug introduced by
The method getInitialContext() does not exist on AppserverIo\Psr\Application\ApplicationInterface. It seems like you code against a sub-type of AppserverIo\Psr\Application\ApplicationInterface such as AppserverIo\Appserver\Application\Application. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

216
        if ($this->profileLogger = $application->/** @scrutinizer ignore-call */ getInitialContext()->getLogger(\AppserverIo\Logger\LoggerUtils::PROFILE)) {
Loading history...
Bug Best Practice introduced by
The property profileLogger does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
217
            $this->profileLogger->appendThreadContext(sprintf('queue-worker-%s', $this->priorityKey));
0 ignored issues
show
Bug introduced by
$this->priorityKey of type AppserverIo\Psr\Pms\PriorityKeyInterface is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

217
            $this->profileLogger->appendThreadContext(sprintf('queue-worker-%s', /** @scrutinizer ignore-type */ $this->priorityKey));
Loading history...
218
        }
219
    }
220
221
    /**
222
     * We process the messages/jobs here.
223
     *
224
     * @return void
225
     */
226
    public function run()
227
    {
228
229
        try {
230
            // register shutdown handler
231
            register_shutdown_function($this->getDefaultShutdownMethod());
232
233
            // bootstrap the daemon
234
            $this->bootstrap();
235
236
            // synchronize the application instance and register the class loaders
237
            $application = $this->application;
238
239
            // mark the daemon as successfully shutdown
240
            $this->synchronized(function ($self) {
241
                $self->state = EnumState::get(EnumState::RUNNING);
242
            }, $this);
243
244
            // create local instances of the storages
245
            $messages = $this->messages;
246
            $priorityKey = $this->priorityKey;
247
            $jobsToExecute = $this->jobsToExecute;
248
249
            // load the maximum number of jobs to process in parallel
250
            $maximumJobsToProcess = $this->getManagerSettings()->getMaximumJobsToProcess();
251
252
            // initialize the arrays for the message states and the jobs executing
253
            $jobsExecuting = array();
254
            $messagesFailed = array();
255
            $retryCounter = array();
256
            $callbacksToExecute = array();
257
258
            // keep the daemon running
259
            while ($this->keepRunning()) {
260
                // iterate over all job wrappers
261
                foreach ($jobsToExecute as $jobWrapper) {
262
                    try {
263
                        // load the message
264
                        $message = $messages[$jobWrapper->jobId];
265
266
                        // check if we've a message found
267
                        if ($message instanceof MessageInterface) {
268
                            // check the message state
269
                            switch ($message->getState()->getState()) {
270
271
                                // message is active and ready to be processed
272
                                case StateActive::KEY:
273
274
                                    // set the new state now
275
                                    $message->setState(StateToProcess::get());
276
277
                                    break;
278
279
                                // message is paused or in progress
280
                                case StatePaused::KEY:
281
282
                                    // invoke the callbacks for the state
283
                                    if ($message->hasCallbacks($message->getState())) {
284
                                        $callbacksToExecute[] = new Callback(clone $message, $application);
285
                                    }
286
287
                                    // log a message that we've a message that has been paused
288
                                    \info(sprintf('Message %s has been paused', $message->getMessageId()));
289
290
                                    break;
291
292
                                case StateInProgress::KEY:
293
294
                                    // query whether or not the job is still available
295
                                    if (isset($jobsExecuting[$message->getMessageId()])) {
296
                                        // make sure the job has been finished
297
                                        if ($jobsExecuting[$message->getMessageId()] instanceof JobInterface && $jobsExecuting[$message->getMessageId()]->isFinished()) {
298
                                            // log a message that the job is still in progress
299
                                            \info(sprintf('Job %s has been finished', $message->getMessageId()));
300
301
                                            // set the new state now
302
                                            $message->setState($jobsExecuting[$message->getMessageId()]->getMessage()->getState());
303
304
                                        } else {
305
                                            // log a message that the job is still in progress
306
                                            \info(sprintf('Job %s is still in progress', $message->getMessageId()));
307
                                        }
308
309
                                    } else {
310
                                        // log a message that the job is still in progress
311
                                        \critical(sprintf('Message %s has been deleted, but should still be there', $message->getMessageId()));
312
                                    }
313
314
                                    break;
315
316
                                // message failed
317
                                case StateFailed::KEY:
318
319
                                    // remove the old job from the queue
320
                                    unset($jobsExecuting[$message->getMessageId()]);
321
322
                                    // query whether or not the message has to be processed again
323
                                    if (isset($messagesFailed[$message->getMessageId()]) && $message->getRetryCounter() > 0) {
324
                                        // query whether or not the message has to be processed now
325
                                        if ($messagesFailed[$message->getMessageId()] < time() && $retryCounter[$message->getMessageId()] < $message->getRetryCounter()) {
326
                                            // retry to process the message
327
                                            $message->setState(StateToProcess::get());
328
329
                                            // update the execution time and raise the retry counter
330
                                            $messagesFailed[$message->getMessageId()] = time() + $message->getRetryTimeout($retryCounter[$message->getMessageId()]);
331
                                            $retryCounter[$message->getMessageId()]++;
332
333
                                        } elseif ($messagesFailed[$message->getMessageId()] < time() && $retryCounter[$message->getMessageId()] === $message->getRetryCounter()) {
334
                                            // log a message that we've a message with a unknown state
335
                                            \critical(sprintf('Message %s finally failed after %d retries', $message->getMessageId(), $retryCounter[$message->getMessageId()]));
336
337
                                            // stop executing the job because we've reached the maximum number of retries
338
                                            unset($jobsToExecute[$messageId = $message->getMessageId()]);
339
                                            unset($messagesFailed[$messageId]);
340
                                            unset($retryCounter[$messageId]);
341
342
                                            // invoke the callbacks for the state
343
                                            if ($message->hasCallbacks($message->getState())) {
344
                                                $callbacksToExecute[] = new Callback(clone $message, $application);
345
                                            }
346
347
                                        } else {
348
                                            // wait for the next try here
349
                                        }
350
351
                                    } elseif (!isset($messagesFailed[$message->getMessageId()]) && $message->getRetryCounter() > 0) {
352
                                        // first retry, so we've to initialize the next execution time and the retry counter
353
                                        $retryCounter[$message->getMessageId()] = 0;
354
                                        $messagesFailed[$message->getMessageId()] = time() + $message->getRetryTimeout($retryCounter[$message->getMessageId()]);
355
356
                                    } else {
357
                                        // log a message that we've a message with a unknown state
358
                                        \critical(sprintf('Message %s failed with NO retries', $message->getMessageId()));
359
360
                                        // stop executing the job because we've reached the maximum number of retries
361
                                        unset($jobsToExecute[$messageId = $message->getMessageId()]);
0 ignored issues
show
Unused Code introduced by
The assignment to $messageId is dead and can be removed.
Loading history...
362
363
                                        // invoke the callbacks for the state
364
                                        if ($message->hasCallbacks($message->getState())) {
365
                                            $callbacksToExecute[] = new Callback(clone $message, $application);
366
                                        }
367
                                    }
368
369
                                    break;
370
371
                                case StateToProcess::KEY:
372
373
                                    // count messages in queue
374
                                    $inQueue = sizeof($jobsExecuting);
375
376
                                    // we only process 200 jobs in parallel
377
                                    if ($inQueue < $maximumJobsToProcess) {
378
                                        // set the new message state now
379
                                        $message->setState(StateInProgress::get());
380
381
                                        // start the job and add it to the internal array
382
                                        $jobsExecuting[$message->getMessageId()] = new Job(clone $message, $application);
383
384
                                    } else {
385
                                        // log a message that queue is actually full
386
                                        \info(sprintf('Job queue full - (%d jobs/%d msg wait)', $inQueue, sizeof($messages)));
387
388
                                        // if the job queue is full, restart iteration to remove processed jobs from queue first
389
                                        continue 2;
390
                                    }
391
392
                                    break;
393
394
                                // message processing has been successfully processed
395
                                case StateProcessed::KEY:
396
397
                                    // invoke the callbacks for the state
398
                                    if ($message->hasCallbacks($message->getState())) {
399
                                        $callbacksToExecute[] = new Callback(clone $message, $application);
400
                                    }
401
402
                                    // remove the job from the queue with jobs that has to be executed
403
                                    unset($jobsToExecute[$messageId = $message->getMessageId()]);
404
405
                                    // also remove the job + the message from the queue
406
                                    unset($jobsExecuting[$messageId]);
407
                                    unset($messages[$messageId]);
408
409
                                    break;
410
411
                                // message is in an unknown state -> this is weired and should never happen!
412
                                case StateUnknown::KEY:
413
414
                                    // log a message that we've a message with a unknown state
415
                                    \critical(sprintf('Message %s has state %s', $message->getMessageId(), $message->getState()));
0 ignored issues
show
Bug introduced by
$message->getState() of type AppserverIo\Psr\Pms\StateKeyInterface is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

415
                                    \critical(sprintf('Message %s has state %s', $message->getMessageId(), /** @scrutinizer ignore-type */ $message->getState()));
Loading history...
416
417
                                    // set new state now
418
                                    $message->setState(StateFailed::get());
419
420
                                    break;
421
422
                                // we don't know the message state -> this is weired and should never happen!
423
                                default:
424
425
                                    // set the failed message state
426
                                    $message->setState(StateFailed::get());
427
428
                                    // log a message that we've a message with an invalid state
429
                                    \critical(sprintf('Message %s has an invalid state', $message->getMessageId()));
430
431
                                    break;
432
                            }
433
                        }
434
435
                        // add the message back to the stack (because we've a copy here)
436
                        if (isset($messages[$message->getMessageId()])) {
437
                            $messages[$jobWrapper->jobId] = $message;
438
                        }
439
440
                        // catch all exceptions
441
                    } catch (\Exception $e) {
442
                        $application->getInitialContext()->getSystemLogger()->critical($e->__toString());
443
                    }
444
445
                    // reduce CPU load depending on queue priority
446
                    $this->iterate($this->getQueueTimeout());
447
                }
448
449
                // reduce CPU load after each iteration
450
                $this->iterate($this->getDefaultTimeout());
451
452
                // profile the size of the session pool
453
                if ($this->profileLogger) {
454
                    $this->profileLogger->debug(
455
                        sprintf(
456
                            'Processed queue worker with priority %s, size of queue size is: %d',
457
                            $priorityKey,
0 ignored issues
show
Bug introduced by
$priorityKey of type AppserverIo\Psr\Pms\PriorityKeyInterface is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

457
                            /** @scrutinizer ignore-type */ $priorityKey,
Loading history...
458
                            sizeof($jobsToExecute)
459
                        )
460
                    );
461
                }
462
            }
463
464
            // clean up the instances and free memory
465
            $this->cleanUp();
466
467
            // mark the daemon as successfully shutdown
468
            $this->synchronized(function ($self) {
469
                $self->state = EnumState::get(EnumState::SHUTDOWN);
470
            }, $this);
471
472
        } catch (\Exception $e) {
473
            \error($e->__toString());
474
        }
475
    }
476
}
477