Completed
Push — master ( 55a9c0...ed128c )
by Emil
14:56
created

QueueProcessor::process()   B

Complexity

Conditions 5
Paths 3

Size

Total Lines 38
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 38
rs 8.439
c 0
b 0
f 0
cc 5
eloc 23
nc 3
nop 0
1
<?php
2
3
namespace Glooby\TaskBundle\Queue;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Glooby\TaskBundle\Entity\QueuedTask;
7
use Symfony\Component\Console\Output\OutputInterface;
8
use Symfony\Component\Process\Process;
9
10
/**
11
 * @author Emil Kilhage
12
 */
13
class QueueProcessor
14
{
15
    /**
16
     * @var int
17
     */
18
    private $limit;
19
20
    /**
21
     * @var bool
22
     */
23
    private $debug;
24
25
    /**
26
     * @var Process[]
27
     */
28
    private $processes = [];
29
30
    /**
31
     * @var OutputInterface
32
     */
33
    protected $output;
34
35
    /**
36
     * @var ManagerRegistry
37
     */
38
    protected $doctrine;
39
40
    /**
41
     * @param ManagerRegistry $doctrine
42
     */
43
    public function setDoctrine($doctrine)
44
    {
45
        $this->doctrine = $doctrine;
46
    }
47
48
    /**
49
     * @param OutputInterface $output
50
     */
51
    public function setOutput(OutputInterface $output)
52
    {
53
        $this->output = $output;
54
    }
55
56
    /**
57
     * @param boolean $debug
58
     */
59
    public function setDebug(bool $debug)
60
    {
61
        $this->debug = $debug;
62
    }
63
64
    /**
65
     * @param int $limit
66
     */
67
    public function setLimit(int $limit)
68
    {
69
        $this->limit = $limit;
70
    }
71
72
    /**
73
     * @throws \Exception
74
     */
75
    public function process()
76
    {
77
        $queueRepo = $this->doctrine->getManager()
78
            ->getRepository('GloobyTaskBundle:QueuedTask');
79
80
        $that = $this;
81
82
        foreach ($queueRepo->findPending($this->limit) as $queuedTask) {
83
            $command = sprintf(
84
                'php -d memory_limit=%s app/console task:run --id=%s %s',
85
                ini_get('memory_limit'),
86
                $queuedTask->getId(),
87
                $this->getProcessParams()
88
            );
89
90
            $nl = false;
91
            $process = new Process($command);
92
            $process->setTimeout(0);
93
            $process->start(function ($type, $data) use ($that, &$nl) {
94
                if (null !== $that->output) {
95
                    if ($nl) {
96
                        $nl = false;
97
                        $that->output->write("\n");
98
                    }
99
100
                    $that->output->write($data);
101
                }
102
            });
103
104
            $this->processes[] = $process;
105
106
            if (null !== $that->output) {
107
                $this->output->writeln("$command");
108
            }
109
        }
110
111
        $this->wait();
112
    }
113
114
    /**
115
     * @return string
116
     */
117
    private function getProcessParams()
118
    {
119
        $params = [];
120
121
        if (!$this->debug) {
122
            $params[] = '--env=prod';
123
        }
124
125
        return implode(' ', $params);
126
    }
127
128
    /**
129
     *
130
     */
131
    private function wait()
132
    {
133
        while (count($this->processes) > 0) {
134
            sleep(1);
135
136
            foreach ($this->processes as $i => $process) {
137
                if (!$process->isRunning()) {
138
                    unset($this->processes[$i]);
139
                    echo $process->getOutput();
140
                }
141
            }
142
        }
143
    }
144
}
145