QueuePool::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
namespace Wanghanlin\QueuePool;
4
5
use Illuminate\Support\ProcessUtils;
6
use Symfony\Component\Process\Process;
7
use Symfony\Component\Process\PhpExecutableFinder;
8
9
class QueuePool
10
{
11
    /**
12
     * The command working path.
13
     *
14
     * @var string
15
     */
16
    protected $commandPath;
17
18
    /**
19
     * The environment the workers should run under.
20
     *
21
     * @var string
22
     */
23
    protected $environment;
24
25
    /**
26
     * The worker processes that started.
27
     *
28
     * @var array
29
     */
30
    protected $processes;
31
32
    /**
33
     * The amount of seconds to wait before polling the queue.
34
     *
35
     * @var int
36
     */
37
    protected $sleep = 3;
38
39
    /**
40
     * The amount of times to try a job before logging it failed.
41
     *
42
     * @var int
43
     */
44
    protected $maxTries = 0;
45
46
    /**
47
     * The queue worker command line.
48
     *
49
     * @var string
50
     */
51
    protected $workerCommand;
52
53
    /**
54
     * The output handler callback.
55
     *
56
     * @var \Closure|null
57
     */
58
    protected $outputHandler;
59
60
    /**
61
     * Create a new queue listener.
62
     *
63
     * @param  string  $commandPath
64
     * @return void
0 ignored issues
show
Comprehensibility Best Practice introduced by
Adding a @return annotation to constructors is generally not recommended as a constructor does not have a meaningful return value.

Adding a @return annotation to a constructor is not recommended, since a constructor does not have a meaningful return value.

Please refer to the PHP core documentation on constructors.

Loading history...
65
     */
66
    public function __construct($commandPath)
67
    {
68
        $this->commandPath = $commandPath;
69
        $this->workerCommand = $this->buildCommandTemplate();
70
    }
71
72
    /**
73
     * Build the environment specific worker command.
74
     *
75
     * @return string
76
     */
77
    protected function buildCommandTemplate()
78
    {
79
        $command = 'queue:work %s --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
80
81
        return "{$this->phpBinary()} {$this->artisanBinary()} {$command}";
82
    }
83
84
    /**
85
     * Get the PHP binary.
86
     *
87
     * @return string
88
     */
89
    protected function phpBinary()
90
    {
91
        return ProcessUtils::escapeArgument(
92
            (new PhpExecutableFinder)->find(false)
0 ignored issues
show
Security Bug introduced by
It seems like (new \Symfony\Component\...eFinder())->find(false) targeting Symfony\Component\Proces...xecutableFinder::find() can also be of type false; however, Illuminate\Support\ProcessUtils::escapeArgument() does only seem to accept string, did you maybe forget to handle an error condition?
Loading history...
93
        );
94
    }
95
96
    /**
97
     * Get the Artisan binary.
98
     *
99
     * @return string
100
     */
101
    protected function artisanBinary()
102
    {
103
        return defined('ARTISAN_BINARY')
104
            ? ProcessUtils::escapeArgument(ARTISAN_BINARY)
105
            : ProcessUtils::escapeArgument('artisan');
106
    }
107
108
    /**
109
     * Start workers.
110
     *
111
     * @param  string  $connection
112
     * @param  string  $queue
113
     * @param  QueuePoolOption  $options
114
     * @return void
115
     */
116
    public function start($connection, $queue, QueuePoolOption $options)
117
    {
118
        $processes = $this->makeProcesses($connection, $queue, $options);
119
120
        $this->setProcesses($processes);
121
122
        while (true) {
123
            $this->runProcesses($options->memory);
124
        }
125
    }
126
127
    /**
128
     * Create an array of Symfony processes.
129
     *
130
     * @param $connection
131
     * @param $queue
132
     * @param QueuePoolOption $options
133
     * @return array
134
     */
135
    public function makeProcesses($connection, $queue, QueuePoolOption $options)
136
    {
137
        $processes = [];
138
139
        foreach (range(1, $options->workers) as $key) {
140
            $processes[$key] = $this->makeProcess($connection, $queue, $options);
141
        }
142
143
        return $processes;
144
    }
145
146
    /**
147
     * Create a new Symfony process for the worker.
148
     *
149
     * @param  string  $connection
150
     * @param  string  $queue
151
     * @param  QueuePoolOption  $options
152
     * @return \Symfony\Component\Process\Process
153
     */
154
    public function makeProcess($connection, $queue, QueuePoolOption $options)
155
    {
156
        $command = $this->workerCommand;
157
158
        // If the environment is set, we will append it to the command string so the
159
        // workers will run under the specified environment. Otherwise, they will
160
        // just run under the production environment which is not always right.
161
        if (isset($options->environment)) {
162
            $command = $this->addEnvironment($command, $options);
163
        }
164
165
        // Next, we will just format out the worker commands with all of the various
166
        // options available for the command. This will produce the final command
167
        // line that we will pass into a Symfony process object for processing.
168
        $command = $this->formatCommand(
169
            $command, $connection, $queue, $options
170
        );
171
172
        return new Process(
173
            $command, $this->commandPath, null, null, $options->timeout
174
        );
175
    }
176
177
    /**
178
     * Add the environment option to the given command.
179
     *
180
     * @param  string  $command
181
     * @param  QueuePoolOption  $options
182
     * @return string
183
     */
184
    protected function addEnvironment($command, QueuePoolOption $options)
185
    {
186
        return $command.' --env='.ProcessUtils::escapeArgument($options->environment);
187
    }
188
189
    /**
190
     * Format the given command with the listener options.
191
     *
192
     * @param $command
193
     * @param $connection
194
     * @param $queue
195
     * @param QueuePoolOption $options
196
     * @return string
197
     */
198
    protected function formatCommand($command, $connection, $queue, QueuePoolOption $options)
199
    {
200
        return sprintf(
201
            $command,
202
            ProcessUtils::escapeArgument($connection),
203
            ProcessUtils::escapeArgument($queue),
204
            $options->delay, $options->memory,
205
            $options->sleep, $options->maxTries
206
        );
207
    }
208
209
    /**
210
     * Run worker processes.
211
     *
212
     * @param  int  $memory
213
     * @return void
214
     */
215
    public function runProcesses($memory)
216
    {
217
        $processes = $this->getProcesses();
218
219
        array_walk($processes, function ($process, $key) {
220
            if (! $process->isRunning()) {
221
                $process->start(function ($type, $line) use ($key) {
222
                    $line = "[Worker $key]: $line";
223
                    $this->handleWorkerOutput($type, $line);
224
                });
225
            }
226
        });
227
228
        // Once we have run the job we'll go check if the memory limit has been exceeded
229
        // for the script. If it has, we will kill this script so the process manager
230
        // will restart this with a clean slate of memory automatically on exiting.
231
        if ($this->memoryExceeded($memory)) {
232
            $this->stop();
233
        }
234
    }
235
236
    /**
237
     * Get processes.
238
     *
239
     * @return array
240
     */
241
    public function getProcesses()
242
    {
243
        return $this->processes;
244
    }
245
246
    /**
247
     * Set processes.
248
     *
249
     * @param array $processes
250
     */
251
    public function setProcesses($processes)
252
    {
253
        $this->processes = $processes;
254
    }
255
256
    /**
257
     * Handle output from the worker process.
258
     *
259
     * @param  int  $type
260
     * @param  string  $line
261
     * @return void
262
     */
263
    protected function handleWorkerOutput($type, $line)
264
    {
265
        if (isset($this->outputHandler)) {
266
            call_user_func($this->outputHandler, $type, $line);
267
        }
268
    }
269
270
    /**
271
     * Determine if the memory limit has been exceeded.
272
     *
273
     * @param  int  $memoryLimit
274
     * @return bool
275
     */
276
    public function memoryExceeded($memoryLimit)
277
    {
278
        return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
279
    }
280
281
    /**
282
     * Stop listening and bail out of the script.
283
     *
284
     * @return void
285
     */
286
    public function stop()
287
    {
288
        die;
0 ignored issues
show
Coding Style Compatibility introduced by
The method stop() contains an exit expression.

An exit expression should only be used in rare cases. For example, if you write a short command line script.

In most cases however, using an exit expression makes the code untestable and often causes incompatibilities with other libraries. Thus, unless you are absolutely sure it is required here, we recommend to refactor your code to avoid its usage.

Loading history...
289
    }
290
291
    /**
292
     * Set the output handler callback.
293
     *
294
     * @param  \Closure  $outputHandler
295
     * @return void
296
     */
297
    public function setOutputHandler(\Closure $outputHandler)
298
    {
299
        $this->outputHandler = $outputHandler;
300
    }
301
}
302