GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#158)
by Bernardo Vieira da
12:24
created

GearmanClient::doJob()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 3
crap 2
1
<?php
2
3
/**
4
 * Gearman Bundle for Symfony2
5
 *
6
 * For the full copyright and license information, please view the LICENSE
7
 * file that was distributed with this source code.
8
 *
9
 * Feel free to edit as you please, and have fun.
10
 *
11
 * @author Marc Morera <[email protected]>
12
 */
13
14
namespace Mmoreram\GearmanBundle\Service;
15
16
use Mmoreram\GearmanBundle\Dispatcher\GearmanCallbacksDispatcher;
17
use Mmoreram\GearmanBundle\GearmanMethods;
18
use Mmoreram\GearmanBundle\Generator\UniqueJobIdentifierGenerator;
19
use Mmoreram\GearmanBundle\Module\JobStatus;
20
use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService;
21
22
/**
23
 * GearmanClient. Implementation of AbstractGearmanService
24
 *
25
 * @since 2.3.1
26
 */
27
class GearmanClient extends AbstractGearmanService
28
{
29
    /**
30
     * @var \GearmanClient
31
     *
32
     * Gearman native client instance
33
     */
34
    protected $gearmanClient;
35
36
    /**
37
     * @var GearmanCallbacksDispatcher
38
     *
39
     * Gearman callbacks dispatcher
40
     */
41
    protected $gearmanCallbacksDispatcher;
42
43
    /**
44
     * @var array
45
     *
46
     * Server set to define in what server must connect to
47
     */
48
    protected $servers = array();
49
50
    /**
51
     * @var array
52
     *
53
     * task structure to store all about called tasks
54
     */
55
    protected $taskStructure = array();
56
57
    /**
58
     * @var array
59
     *
60
     * Set default servers
61
     */
62
    protected $defaultServers;
63
64
    /**
65
     * @var array
66
     *
67
     * Set default settings
68
     */
69
    protected $settings;
70
71
    /**
72
     * @var UniqueJobIdentifierGenerator
73
     *
74
     * Unique Job Intefier Generator
75
     */
76
    protected $uniqueJobIdentifierGenerator;
77
78
    /**
79
     * @var int
80
     *
81
     * Return code from internal client.
82
     */
83
    protected $returnCode;
84
85
    /**
86
     * @return \GearmanClient
87
     */
88
    public function getNativeClient(){
89
        if ($this->gearmanClient === null){
90
            $this->gearmanClient = new \GearmanClient();
91
        }
92
        return $this->gearmanClient;
93
    }
94
95
    /**
96
     * Init tasks structure
97
     *
98
     * @return GearmanClient self Object
99
     */
100 1
    public function initTaskStructure()
101
    {
102 1
        $this->taskStructure = array();
103
104 1
        return $this;
105
    }
106
107
    /**
108
     * Set  default servers
109
     *
110
     * @param array $defaultServers Default servers
111
     *
112
     * @return GearmanClient self Object
113
     */
114 1
    public function setDefaultServers(array $defaultServers)
115
    {
116 1
        $this->defaultServers = $defaultServers;
117
118 1
        return $this;
119
    }
120
121
    /**
122
     * Set UniqueJobIdentifierGenerator object
123
     *
124
     * @param UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator Unique Job Intefier Generator
125
     *
126
     * @return GearmanClient self Object
127
     */
128 1
    public function setUniqueJobIdentifierGenerator(UniqueJobIdentifierGenerator $uniqueJobIdentifierGenerator)
129
    {
130 1
        $this->uniqueJobIdentifierGenerator = $uniqueJobIdentifierGenerator;
131
132 1
        return $this;
133
    }
134
135
    /**
136
     * Set gearman callbacks
137
     *
138
     * @param GearmanCallbacksDispatcher $gearmanCallbacksDispatcher Gearman callbacks dispatcher
139
     *
140
     * @return GearmanClient self Object
141
     */
142 1
    public function setGearmanCallbacksDispatcher(GearmanCallbacksDispatcher $gearmanCallbacksDispatcher)
143
    {
144 1
        $this->gearmanCallbacksDispatcher = $gearmanCallbacksDispatcher;
145
146 1
        return $this;
147
    }
148
149
    /**
150
     * Set default settings
151
     *
152
     * @param array $settings
153
     *
154
     * @return GearmanClient self Object
155
     */
156 1
    public function setDefaultSettings($settings)
157
    {
158 1
        $this->settings = $settings;
159
160 1
        return $this;
161
    }
162
163
    /**
164
     * Set server to client. Empty all servers and set this one
165
     *
166
     * @param string $servername Server name (must be ip)
167
     * @param int    $port       Port of server. By default 4730
168
     *
169
     * @return GearmanClient Returns self object
170
     */
171
    public function setServer($servername, $port = 4730)
172
    {
173
        $this
174
            ->clearServers()
175
            ->addServer($servername, $port);
176
177
        return $this;
178
    }
179
180
    /**
181
     * Add server to client
182
     *
183
     * @param string $servername Server name (must be ip)
184
     * @param int    $port       Port of server. By default 4730
185
     *
186
     * @return GearmanClient Returns self object
187
     */
188
    public function addServer($servername, $port = 4730)
189
    {
190
        $this->servers[] = array(
191
            'host' => $servername,
192
            'port' => $port,
193
        );
194
195
        return $this;
196
    }
197
198
    /**
199
     * Clear server list
200
     *
201
     * @return GearmanClient Returns self object
202
     */
203
    public function clearServers()
204
    {
205
        $this->servers = array();
206
207
        return $this;
208
    }
209
210
    /**
211
     * Get real worker from job name and enqueues the action given one
212
     *     method.
213
     *
214
     * @param string $jobName A GearmanBundle registered function the worker is to execute
215
     * @param string $params  Parameters to send to job as string
216
     * @param string $method  Method to execute
217
     * @param string $unique  A unique ID used to identify a particular task
218
     *
219
     * @return mixed Return result of the call. If worker is not valid, return false
220
     */
221
    protected function enqueue($jobName, $params, $method, $unique)
222
    {
223
        $worker = $this->getJob($jobName);
224
225
        $unique = $this
226
            ->uniqueJobIdentifierGenerator
227
            ->generateUniqueKey($jobName, $params, $unique, $method);
228
229
        return $worker
230
            ? $this->doEnqueue($worker, $params, $method, $unique)
231
            : false;
232
    }
233
234
    /**
235
     * Execute a GearmanClient call given a worker, params and a method.
236
     *
237
     * If he GarmanClient call is asyncronous, result value will be a handler.
238
     * Otherwise, will return job result.
239
     *
240
     * @param array  $worker Worker definition
241
     * @param string $params Parameters to send to job as string
242
     * @param string $method Method to execute
243
     * @param string $unique A unique ID used to identify a particular task
244
     *
245
     * @return mixed Return result of the GearmanClient call
246
     */
247
    protected function doEnqueue(array $worker, $params, $method, $unique)
248
    {
249
        $gearmanClient = $this->getNativeClient();
250
        $this->assignServers($gearmanClient);
251
252
        $result = $gearmanClient->$method($worker['job']['realCallableName'], $params, $unique);
253
        $this->returnCode = $gearmanClient->returnCode();
254
255
        $this->gearmanClient = null;
256
257
        return $result;
258
    }
259
260
    /**
261
     * Given a GearmanClient, set all included servers
262
     *
263
     * @param \GearmanClient $gearmanClient Object to include servers
264
     *
265
     * @return GearmanClient Returns self object
266
     */
267
    protected function assignServers(\GearmanClient $gearmanClient)
268
    {
269
        $servers = $this->defaultServers;
270
271
        if (!empty($this->servers)) {
272
273
            $servers = $this->servers;
274
        }
275
276
        /**
277
         * We include each server into gearman client
278
         */
279
        foreach ($servers as $server) {
280
281
            $gearmanClient->addServer($server['host'], $server['port']);
282
        }
283
284
        return $this;
285
    }
286
287
    /**
288
     * Job methods
289
     */
290
291
    /**
292
     * Runs a single task and returns some result, depending of method called.
293
     * Method called depends of default callable method setted on gearman
294
     * settings or overwritted on work or job annotations
295
     *
296
     * @param string $name   A GearmanBundle registered function the worker is to execute
297
     * @param string $params Parameters to send to job as string
298
     * @param string $unique A unique ID used to identify a particular task
299
     *
300
     * @return mixed result depending of method called.
301
     */
302
    public function callJob($name, $params = '', $unique = null)
303
    {
304
        $worker = $this->getJob($name);
305
        $methodCallable = $worker['job']['defaultMethod'];
306
307
        return $this->enqueue($name, $params, $methodCallable, $unique);
308
    }
309
310
    /**
311
     * Runs a single task and returns a string representation of the result.
312
     * It is up to the GearmanClient and GearmanWorker to agree on the format of
313
     * the result.
314
     *
315
     * The GearmanClient::do() method is deprecated as of pecl/gearman 1.0.0.
316
     * Use GearmanClient::doNormal().
317
     *
318
     * @param string $name   A GearmanBundle registered function the worker is to execute
319
     * @param string $params Parameters to send to job as string
320
     * @param string $unique A unique ID used to identify a particular task
321
     *
322
     * @return string A string representing the results of running a task.
323
     * @deprecated
324
     */
325
    public function doJob($name, $params = '', $unique = null)
326
    {
327
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
328
    }
329
330
    /**
331
     * Runs a single task and returns a string representation of the result.
332
     * It is up to the GearmanClient and GearmanWorker to agree on the format of
333
     * the result.
334
     *
335
     * @param string $name   A GearmanBundle registered function the worker is to execute
336
     * @param string $params Parameters to send to job as string
337
     * @param string $unique A unique ID used to identify a particular task
338
     *
339
     * @return string A string representing the results of running a task.
340
     */
341
    public function doNormalJob($name, $params = '', $unique = null)
342
    {
343
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DONORMAL, $unique);
344
    }
345
346
    /**
347
     * Runs a task in the background, returning a job handle which can be used
348
     * to get the status of the running task.
349
     *
350
     * @param string $name   A GearmanBundle registered function the worker is to execute
351
     * @param string $params Parameters to send to job as string
352
     * @param string $unique A unique ID used to identify a particular task
353
     *
354
     * @return string Job handle for the submitted task.
355
     */
356
    public function doBackgroundJob($name, $params = '', $unique = null)
357
    {
358
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOBACKGROUND, $unique);
359
    }
360
361
    /**
362
     * Runs a single high priority task and returns a string representation of
363
     * the result.
364
     *
365
     * It is up to the GearmanClient and GearmanWorker to agree on the format of
366
     * the result.
367
     *
368
     * High priority tasks will get precedence over normal and low priority
369
     * tasks in the job queue.
370
     *
371
     * @param string $name   A GearmanBundle registered function the worker is to execute
372
     * @param string $params Parameters to send to job as string
373
     * @param string $unique A unique ID used to identify a particular task
374
     *
375
     * @return string A string representing the results of running a task.
376
     */
377
    public function doHighJob($name, $params = '', $unique = null)
378
    {
379
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGH, $unique);
380
    }
381
382
    /**
383
     * Runs a high priority task in the background, returning a job handle which
384
     * can be used to get the status of the running task.
385
     *
386
     * High priority tasks take precedence over normal and low priority tasks in
387
     * the job queue.
388
     *
389
     * @param string $name   A GearmanBundle registered function the worker is to execute
390
     * @param string $params Parameters to send to job as string
391
     * @param string $unique A unique ID used to identify a particular task
392
     *
393
     * @return string The job handle for the submitted task.
394
     */
395
    public function doHighBackgroundJob($name, $params = '', $unique = null)
396
    {
397
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOHIGHBACKGROUND, $unique);
398
    }
399
400
    /**
401
     * Runs a single low priority task and returns a string representation of
402
     * the result.
403
     *
404
     * It is up to the GearmanClient and GearmanWorker to agree on the format of
405
     * the result.
406
     *
407
     * Normal and high priority tasks will get precedence over low priority
408
     * tasks in the job queue.
409
     *
410
     * @param string $name   A GearmanBundle registered function the worker is to execute
411
     * @param string $params Parameters to send to job as string
412
     * @param string $unique A unique ID used to identify a particular task
413
     *
414
     * @return string A string representing the results of running a task.
415
     */
416
    public function doLowJob($name, $params = '', $unique = null)
417
    {
418
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOW, $unique);
419
    }
420
421
    /**
422
     * Runs a low priority task in the background, returning a job handle which
423
     * can be used to get the status of the running task.
424
     *
425
     * Normal and high priority tasks will get precedence over low priority
426
     * tasks in the job queue.
427
     *
428
     * @param string $name   A GearmanBundle registered function the worker is to execute
429
     * @param string $params Parameters to send to job as string
430
     * @param string $unique A unique ID used to identify a particular task
431
     *
432
     * @return string The job handle for the submitted task.
433
     */
434
    public function doLowBackgroundJob($name, $params = '', $unique = null)
435
    {
436
        return $this->enqueue($name, $params, GearmanMethods::GEARMAN_METHOD_DOLOWBACKGROUND, $unique);
437
    }
438
439
    /**
440
     * Fetches the Status of a special Background Job.
441
     *
442
     * @param string $idJob The job handle string
443
     *
444
     * @return JobStatus Job status
445
     */
446
    public function getJobStatus($idJob)
447
    {
448
        $gearmanClient = $this->getNativeClient();
449
        $this->assignServers($gearmanClient);
450
        $statusData = $gearmanClient->jobStatus($idJob);
451
452
        $jobStatus = new JobStatus($statusData);
453
454
        $this->gearmanClient = null;
455
456
        return $jobStatus;
457
    }
458
459
    /**
460
     * Gets the return code from the last run job.
461
     *
462
     * @return int
463
     */
464
    public function getReturnCode()
465
    {
466
        return $this->returnCode;
467
    }
468
469
    /**
470
     * Task methods
471
     */
472
473
    /**
474
     * Adds a task to be run in parallel with other tasks.
475
     * Call this method for all the tasks to be run in parallel, then call
476
     * GearmanClient::runTasks() to perform the work.
477
     *
478
     * Note that enough workers need to be available for the tasks to all run in
479
     * parallel.
480
     *
481
     * @param string $name     A GermanBundle registered function to be executed
482
     * @param string $params   Parameters to send to task as string
483
     * @param Mixed  &$context Application context to associate with a task
484
     * @param string $unique   A unique ID used to identify a particular task
485
     *
486
     * @return GearmanClient Return this object
487
     */
488
    public function addTask($name, $params = '', &$context = null, $unique = null)
489
    {
490
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASK);
491
492
        return $this;
493
    }
494
495
    /**
496
     * Adds a high priority task to be run in parallel with other tasks.
497
     * Call this method for all the high priority tasks to be run in parallel,
498
     * then call GearmanClient::runTasks() to perform the work.
499
     *
500
     * Tasks with a high priority will be selected from the queue before those
501
     * of normal or low priority.
502
     *
503
     * @param string $name     A GermanBundle registered function to be executed
504
     * @param string $params   Parameters to send to task as string
505
     * @param Mixed  &$context Application context to associate with a task
506
     * @param string $unique   A unique ID used to identify a particular task
507
     *
508
     * @return GearmanClient Return this object
509
     */
510
    public function addTaskHigh($name, $params = '', &$context = null, $unique = null)
511
    {
512
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGH);
513
514
        return $this;
515
    }
516
517
    /**
518
     * Adds a low priority background task to be run in parallel with other
519
     * tasks.
520
     *
521
     * Call this method for all the tasks to be run in parallel, then call
522
     * GearmanClient::runTasks() to perform the work.
523
     *
524
     * Tasks with a low priority will be selected from the queue after those of
525
     * normal or low priority.
526
     *
527
     * @param string $name     A GermanBundle registered function to be executed
528
     * @param string $params   Parameters to send to task as string
529
     * @param Mixed  &$context Application context to associate with a task
530
     * @param string $unique   A unique ID used to identify a particular task
531
     *
532
     * @return GearmanClient Return this object
533
     */
534
    public function addTaskLow($name, $params = '', &$context = null, $unique = null)
535
    {
536
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOW);
537
538
        return $this;
539
    }
540
541
    /**
542
     * Adds a background task to be run in parallel with other tasks
543
     * Call this method for all the tasks to be run in parallel, then call
544
     * GearmanClient::runTasks() to perform the work.
545
     *
546
     * @param string $name     A GermanBundle registered function to be executed
547
     * @param string $params   Parameters to send to task as string
548
     * @param Mixed  &$context Application context to associate with a task
549
     * @param string $unique   A unique ID used to identify a particular task
550
     *
551
     * @return GearmanClient Return this object
552
     */
553
    public function addTaskBackground($name, $params = '', &$context = null, $unique = null)
554
    {
555
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKBACKGROUND);
556
557
        return $this;
558
    }
559
560
    /**
561
     * Adds a high priority background task to be run in parallel with other
562
     * tasks.
563
     *
564
     * Call this method for all the tasks to be run in parallel, then call
565
     * GearmanClient::runTasks() to perform the work.
566
     *
567
     * Tasks with a high priority will be selected from the queue before those
568
     * of normal or low priority.
569
     *
570
     * @param string $name     A GermanBundle registered function to be executed
571
     * @param string $params   Parameters to send to task as string
572
     * @param Mixed  &$context Application context to associate with a task
573
     * @param string $unique   A unique ID used to identify a particular task
574
     *
575
     * @return GearmanClient Return this object
576
     */
577
    public function addTaskHighBackground($name, $params = '', &$context = null, $unique = null)
578
    {
579
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKHIGHBACKGROUND);
580
581
        return $this;
582
    }
583
584
    /**
585
     * Adds a low priority background task to be run in parallel with other
586
     * tasks.
587
     *
588
     * Call this method for all the tasks to be run in parallel, then call
589
     * GearmanClient::runTasks() to perform the work.
590
     *
591
     * Tasks with a low priority will be selected from the queue after those of
592
     * normal or high priority.
593
     *
594
     * @param string $name     A GermanBundle registered function to be executed
595
     * @param string $params   Parameters to send to task as string
596
     * @param Mixed  &$context Application context to associate with a task
597
     * @param string $unique   A unique ID used to identify a particular task
598
     *
599
     * @return GearmanClient Return this object
600
     */
601
    public function addTaskLowBackground($name, $params = '', &$context = null, $unique = null)
602
    {
603
        $this->enqueueTask($name, $params, $context, $unique, GearmanMethods::GEARMAN_METHOD_ADDTASKLOWBACKGROUND);
604
605
        return $this;
606
    }
607
608
    /**
609
     * Adds a task into the structure of tasks with included type of call
610
     *
611
     * @param string $name    A GermanBundle registered function to be executed
612
     * @param string $params  Parameters to send to task as string
613
     * @param Mixed  $context Application context to associate with a task
614
     * @param string $unique  A unique ID used to identify a particular task
615
     * @param string $method  Method to perform
616
     *
617
     * @return GearmanClient Return this object
618
     */
619
    protected function enqueueTask($name, $params, &$context, $unique, $method)
620
    {
621
        $contextReference = array('context' => &$context);
622
        $task = array(
623
            'name'    => $name,
624
            'params'  => $params,
625
            'context' => $contextReference,
626
            'unique'  => $this->uniqueJobIdentifierGenerator->generateUniqueKey($name, $params, $unique, $method),
627
            'method'  => $method,
628
        );
629
630
        $this->addTaskToStructure($task);
631
632
        return $this;
633
    }
634
635
    /**
636
     * Appends a task structure into taskStructure array
637
     *
638
     * @param array $task Task structure
639
     *
640
     * @return GearmanClient Return this object
641
     */
642
    protected function addTaskToStructure(array $task)
643
    {
644
        $this->taskStructure[] = $task;
645
646
        return $this;
647
    }
648
649
    /**
650
     * For a set of tasks previously added with
651
     *
652
     * GearmanClient::addTask(),
653
     * GearmanClient::addTaskHigh(),
654
     * GearmanClient::addTaskLow(),
655
     * GearmanClient::addTaskBackground(),
656
     * GearmanClient::addTaskHighBackground(),
657
     * GearmanClient::addTaskLowBackground(),
658
     *
659
     * this call starts running the tasks in parallel.
660
     * Note that enough workers need to be available for the tasks to all run in parallel
661
     *
662
     * @return boolean run tasks result
663
     */
664
    public function runTasks()
665
    {
666
        $gearmanClient = $this->getNativeClient();
667
        $this->assignServers($gearmanClient);
668
669
        if ($this->settings['callbacks']) {
670
671
            $this->gearmanCallbacksDispatcher->assignTaskCallbacks($gearmanClient);
672
        }
673
674
        foreach ($this->taskStructure as $task) {
675
676
            $type = $task['method'];
677
            $jobName = $task['name'];
678
            $worker = $this->getJob($jobName);
679
680
            if (false !== $worker) {
681
682
                $gearmanClient->$type(
683
                    $worker['job']['realCallableName'],
684
                    $task['params'],
685
                    $task['context'],
686
                    $task['unique']
687
                );
688
            }
689
        }
690
691
        $this->initTaskStructure();
692
693
        $this->gearmanClient = null;
694
695
        return $gearmanClient->runTasks();
696
    }
697
}
698