1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Wanghanlin\QueuePool; |
4
|
|
|
|
5
|
|
|
use Illuminate\Console\Command; |
6
|
|
|
|
7
|
|
|
class QueuePoolCommand extends Command |
8
|
|
|
{ |
9
|
|
|
/** |
10
|
|
|
* The console command name. |
11
|
|
|
* |
12
|
|
|
* @var string |
13
|
|
|
*/ |
14
|
|
|
protected $signature = 'queue:pool |
15
|
|
|
{connection? : The name of the queue connection to work} |
16
|
|
|
{--workers= : The amount of the workers to start} |
17
|
|
|
{--queue= : The names of the queues to work} |
18
|
|
|
{--delay=0 : The number of seconds to delay failed jobs} |
19
|
|
|
{--force : Force the worker to run even in maintenance mode} |
20
|
|
|
{--memory=128 : The memory limit in megabytes} |
21
|
|
|
{--sleep=3 : Number of seconds to sleep when no job is available} |
22
|
|
|
{--timeout=60 : The number of seconds a child process can run} |
23
|
|
|
{--tries=0 : Number of times to attempt a job before logging it failed}'; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* The console command description. |
27
|
|
|
* |
28
|
|
|
* @var string |
29
|
|
|
*/ |
30
|
|
|
protected $description = 'Start a pool of workers to process the queue'; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* The queue pool instance. |
34
|
|
|
* |
35
|
|
|
* @var QueuePool |
36
|
|
|
*/ |
37
|
|
|
protected $pool; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* Create a new queue work command. |
41
|
|
|
* |
42
|
|
|
* @param QueuePool $pool |
43
|
|
|
* @return void |
44
|
|
|
*/ |
45
|
|
|
public function __construct(QueuePool $pool) |
46
|
|
|
{ |
47
|
|
|
parent::__construct(); |
48
|
|
|
$this->setOutputHandler($this->pool = $pool); |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* Execute the console command. |
53
|
|
|
* |
54
|
|
|
* @return void |
55
|
|
|
*/ |
56
|
|
|
public function handle() |
57
|
|
|
{ |
58
|
|
|
// We need to get the right queue for the connection which is set in the queue |
59
|
|
|
// configuration file for the application. We will pull it based on the set |
60
|
|
|
// connection being run for the queue operation currently being executed. |
61
|
|
|
$queue = $this->getQueue( |
62
|
|
|
$connection = $this->input->getArgument('connection') |
63
|
|
|
); |
64
|
|
|
$this->pool->start( |
65
|
|
|
$connection, $queue, $this->gatherOptions() |
66
|
|
|
); |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* Get the name of the queue connection to listen on. |
71
|
|
|
* |
72
|
|
|
* @param string $connection |
73
|
|
|
* @return string |
74
|
|
|
*/ |
75
|
|
|
protected function getQueue($connection) |
76
|
|
|
{ |
77
|
|
|
$connection = $connection ?: $this->laravel['config']['queue.default']; |
78
|
|
|
|
79
|
|
|
return $this->input->getOption('queue') ?: $this->laravel['config']->get( |
80
|
|
|
"queue.connections.{$connection}.queue", 'default' |
81
|
|
|
); |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* Get the listener options for the command. |
86
|
|
|
* |
87
|
|
|
* @return QueuePoolOption |
88
|
|
|
*/ |
89
|
|
|
protected function gatherOptions() |
90
|
|
|
{ |
91
|
|
|
return new QueuePoolOption( |
92
|
|
|
$this->option('workers'), $this->option('env'), |
|
|
|
|
93
|
|
|
$this->option('delay'), $this->option('memory'), |
94
|
|
|
$this->option('timeout'), $this->option('sleep'), |
95
|
|
|
$this->option('tries'), $this->option('force') |
96
|
|
|
); |
97
|
|
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* Set the options on the queue listener. |
101
|
|
|
* |
102
|
|
|
* @param QueuePool $pool |
103
|
|
|
* @return void |
104
|
|
|
*/ |
105
|
|
|
protected function setOutputHandler(QueuePool $pool) |
106
|
|
|
{ |
107
|
|
|
$pool->setOutputHandler(function ($type, $line) { |
108
|
|
|
$this->output->write($line); |
109
|
|
|
}); |
110
|
|
|
} |
111
|
|
|
} |
112
|
|
|
|
This check looks at variables that are passed out again to other methods.
If the outgoing method call has stricter type requirements than the method itself, an issue is raised.
An additional type check may prevent trouble.