Completed
Push — master ( 8784f3...9e63c5 )
by Peter
13:05
created

RunCommand::attachConsoleHandler()   B

Complexity

Conditions 5
Paths 4

Size

Total Lines 14
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 14
ccs 0
cts 12
cp 0
rs 8.8571
cc 5
eloc 7
nc 4
nop 2
crap 30
1
<?php
2
3
namespace TreeHouse\WorkerBundle\Command;
4
5
use Psr\Log\LoggerInterface;
6
use Symfony\Bridge\Monolog\Handler\ConsoleHandler;
7
use Symfony\Component\Console\Command\Command;
8
use Symfony\Component\Console\Input\InputInterface;
9
use Symfony\Component\Console\Input\InputOption;
10
use Symfony\Component\Console\Output\OutputInterface;
11
use TreeHouse\WorkerBundle\Exception\AbortException;
12
use TreeHouse\WorkerBundle\QueueManager;
13
use TreeHouse\WorkerBundle\WorkerEvents;
14
15
class RunCommand extends Command
16
{
17
    /**
18
     * @var QueueManager
19
     */
20
    protected $manager;
21
22
    /**
23
     * @var OutputInterface
24
     */
25
    protected $output;
26
27
    /**
28
     * @param QueueManager $queueManager
29
     */
30
    public function __construct(QueueManager $queueManager)
31
    {
32
        $this->manager = $queueManager;
33
34
        parent::__construct();
35
    }
36
37
    /**
38
     * @inheritdoc
39
     */
40
    protected function configure()
41
    {
42
        $this
43
            ->setName('worker:run')
44
            ->setDescription('Starts a worker')
45
            ->addOption('action', 'a', InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Selects actions to run, defaults to all')
46
            ->addOption('exclude', null, InputOption::VALUE_OPTIONAL | InputOption::VALUE_IS_ARRAY, 'Excludes actions to run')
47
            ->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Maximum number of jobs to execute', 20)
48
            ->addOption('max-memory', 'm', InputOption::VALUE_OPTIONAL, 'Maximum amount of memory to use (in MB). The worker will try to stop before this limit is reached. Set to 0 for infinite.', 0)
49
            ->addOption('max-time', 't', InputOption::VALUE_OPTIONAL, 'Maximum running time in seconds. Set to 0 for infinite', 0)
50
            ->addOption('batch-size', 'b', InputOption::VALUE_OPTIONAL, 'Number of jobs to process before completing a batch', 15)
51
            ->addOption('min-duration', 'd', InputOption::VALUE_OPTIONAL, 'Number of seconds to the worker process should minimal take to run', 15)
52
        ;
53
    }
54
55
    /**
56
     * @inheritdoc
57
     */
58
    protected function execute(InputInterface $input, OutputInterface $output)
59
    {
60
        $this->output = $output;
61
62
        $dispatcher   = $this->manager->getDispatcher();
63
64
        $maxMemory   = intval($input->getOption('max-memory')) * 1024 * 1024;
65
        $maxTime     = intval($input->getOption('max-time'));
66
        $maxJobs     = intval($input->getOption('limit'));
67
        $batchSize   = intval($input->getOption('batch-size'));
68
        $minDuration = intval($input->getOption('min-duration'));
69
70
        $logger = $this->manager->getLogger();
71
72
        $this->attachConsoleHandler($logger, $output);
73
74
        // configure pheanstalk to watch the right tubes
75
        $this->watchActions($input->getOption('action'), $input->getOption('exclude'));
76
77
        $start         = time();
78
        $jobsCompleted = 0;
79
80
        // wait for job, timeout after 1 minute
81
        $timeout = 60;
82
        $this->output(sprintf('Waiting at most <info>%d seconds</info> for a reserved job...', $timeout));
83
84
        $exit = 0;
85
        while ($job = $this->manager->get($timeout)) {
86
            $stats = $this->manager->getJobStats($job);
87
88
            $timeStart = microtime(true) * 1000;
89
            $memStart = memory_get_usage(true);
90
91
            try {
92
                $this->output(
93
                    sprintf(
94
                        'Working job <info>%d</info> for action <comment>%s</comment> with payload <info>%s</info>',
95
                        $job->getId(),
96
                        $stats['tube'],
97
                        $job->getData()
98
                    )
99
                );
100
101
                $result = $this->manager->executeJob($job);
102
            } catch (AbortException $e) {
103
                $message = 'Worker aborted ' . ($e->getReason() ? ('with reason: ' . $e->getReason()) : 'without a given reason');
104
                $this->output($message);
105
106
                $exit = 1;
107
108
                break;
109
            }
110
111
            $duration = microtime(true) * 1000 - $timeStart;
112
            $usage    = memory_get_usage(true) - $memStart;
113
            $message = sprintf(
114
                'Completed job <info>%d</info> in <comment>%dms</comment> using <comment>%s</comment> with result: <info>%s</info>',
115
                $job->getId(),
116
                $duration,
117
                $this->formatBytes($usage),
118
                json_encode($result, JSON_UNESCAPED_SLASHES)
119
            );
120
            $this->output($message);
121
122
            ++$jobsCompleted;
123
124
            // intermediate flush
125
            if ($jobsCompleted % $batchSize === 0) {
126
                $this->output('Batch complete', OutputInterface::VERBOSITY_VERBOSE);
127
                $dispatcher->dispatch(WorkerEvents::FLUSH);
128
            }
129
130
            if ($jobsCompleted >= $maxJobs) {
131
                $this->output(sprintf('Maximum number of jobs completed (%d)', $maxJobs), OutputInterface::VERBOSITY_VERBOSE);
132
133
                break;
134
            }
135
136
            if (($maxMemory > 0) && memory_get_usage(true) > $maxMemory) {
137
                $this->output(
138
                    sprintf('Memory peak of %dMB reached (peak: %sMB)', $maxMemory / 1024 / 1024, memory_get_usage(true) / 1024 / 1024),
139
                    OutputInterface::VERBOSITY_VERBOSE
140
                );
141
142
                break;
143
            }
144
145
            if (($maxTime > 0) && ((time() - $start) > $maxTime)) {
146
                $this->output(
147
                    sprintf('Maximum execution time of %ds reached', $maxTime),
148
                    OutputInterface::VERBOSITY_VERBOSE
149
                );
150
151
                break;
152
            }
153
        }
154
155
        // flush remaining
156
        $dispatcher->dispatch(WorkerEvents::FLUSH);
157
158
        // make sure worker doesn't quit to quickly, or supervisor will mark it
159
        // as a failed restart, and put the worker in FATAL state.
160
        $duration = time() - $start;
161
        if ($duration < $minDuration) {
162
            $this->output(sprintf('Sleeping until worker has run for at least %s seconds', $minDuration));
163
            sleep($minDuration - $duration);
164
        }
165
166
        $this->output('Shutting down worker');
167
168
        return $exit;
169
    }
170
171
    /**
172
     * @param LoggerInterface $logger
173
     * @param OutputInterface $output
174
     */
175
    protected function attachConsoleHandler(LoggerInterface $logger, OutputInterface $output)
176
    {
177
        if (!class_exists('Monolog\\Logger') || !$logger instanceof \Monolog\Logger) {
178
            return;
179
        }
180
181
        foreach ($logger->getHandlers() as $handler) {
182
            if ($handler instanceof ConsoleHandler) {
183
                return;
184
            }
185
        }
186
187
        $logger->pushHandler(new ConsoleHandler($output));
188
    }
189
190
    /**
191
     * @param string[] $include
192
     * @param string[] $exclude
193
     */
194
    protected function watchActions(array $include = [], array $exclude = [])
195
    {
196
        $actions = array_keys($this->manager->getExecutors());
197
198
        if (empty($include)) {
199
            $include = $actions;
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $include. This often makes code more readable.
Loading history...
200
        }
201
202 View Code Duplication
        if (!empty($diff = array_diff($include, $actions))) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
203
            throw new \InvalidArgumentException(sprintf('Action(s) "%s" are not defined by QueueManager', implode(', ', $diff)));
204
        }
205
206 View Code Duplication
        if (!empty($diff = array_diff($exclude, $actions))) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
207
            throw new \InvalidArgumentException(sprintf('Filter(s) "%s" are not defined by QueueManager', implode(', ', $diff)));
208
        }
209
210
        $include = array_diff($include, $exclude);
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $include. This often makes code more readable.
Loading history...
211
212
        if (empty($include)) {
213
            throw new \InvalidArgumentException('No actions specified to run');
214
        }
215
216
        // watch only these actions
217
        $this->manager->watchOnly($include);
218
    }
219
220
    /**
221
     * @param string $msg
222
     * @param int    $threshold
223
     */
224
    protected function output($msg, $threshold = OutputInterface::VERBOSITY_NORMAL)
225
    {
226
        if ($this->output->getVerbosity() >= $threshold) {
227
            $this->output->writeln(sprintf('[%s] %s', date('Y-m-d H:i:s'), $msg));
228
        }
229
    }
230
231
    /**
232
     * @param int $bytes
233
     *
234
     * @return string
235
     */
236 View Code Duplication
    private function formatBytes($bytes)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
237
    {
238
        $bytes = (int) $bytes;
0 ignored issues
show
Coding Style introduced by
Consider using a different name than the parameter $bytes. This often makes code more readable.
Loading history...
239
240
        if ($bytes > 1024*1024) {
241
            return round($bytes/1024/1024, 2).'MB';
242
        } elseif ($bytes > 1024) {
243
            return round($bytes/1024, 2).'KB';
244
        }
245
246
        return $bytes . 'B';
247
    }
248
}
249