Completed
Push — master ( a56aa2...d72998 )
by Emil
02:17
created

QueueProcessor::createProcess()   A

Complexity

Conditions 3
Paths 1

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 20
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 12
nc 1
nop 1
1
<?php
2
3
namespace Glooby\TaskBundle\Queue;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Glooby\TaskBundle\Model\QueuedTaskInterface;
7
use Symfony\Component\Console\Output\OutputInterface;
8
use Symfony\Component\Process\Process;
9
10
/**
11
 * @author Emil Kilhage
12
 */
13
class QueueProcessor
14
{
15
    /**
16
     * @var int
17
     */
18
    private $limit;
19
20
    /**
21
     * @var bool
22
     */
23
    private $debug;
24
25
    /**
26
     * @var Process[]
27
     */
28
    private $processes = [];
29
30
    /**
31
     * @var OutputInterface
32
     */
33
    protected $output;
34
35
    /**
36
     * @var ManagerRegistry
37
     */
38
    protected $doctrine;
39
40
    /**
41
     * @param ManagerRegistry $doctrine
42
     */
43
    public function setDoctrine($doctrine)
44
    {
45
        $this->doctrine = $doctrine;
46
    }
47
48
    /**
49
     * @param OutputInterface $output
50
     */
51
    public function setOutput(OutputInterface $output)
52
    {
53
        $this->output = $output;
54
    }
55
56
    /**
57
     * @param boolean $debug
58
     */
59
    public function setDebug($debug)
60
    {
61
        $this->debug = $debug;
62
    }
63
64
    /**
65
     * @param int $limit
66
     */
67
    public function setLimit($limit)
68
    {
69
        $this->limit = $limit;
70
    }
71
72
    /**
73
     * @throws \Exception
74
     */
75
    public function process()
76
    {
77
        $queueRepo = $this->doctrine->getManager()
78
            ->getRepository('GloobyTaskBundle:QueuedTask');
79
80
        foreach ($queueRepo->findPending($this->limit) as $queuedTask) {
81
            $this->start($queuedTask);
82
        }
83
84
        $this->wait();
85
    }
86
87
    /**
88
     * @return string
89
     */
90
    private function getProcessParams()
91
    {
92
        $params = [];
93
94
        if (!$this->debug) {
95
            $params[] = '--env=prod';
96
        }
97
98
        return implode(' ', $params);
99
    }
100
101
    /**
102
     *
103
     */
104
    private function wait()
105
    {
106
        while (count($this->processes) > 0) {
107
            sleep(1);
108
109
            foreach ($this->processes as $i => $process) {
110
                if (!$process->isRunning()) {
111
                    unset($this->processes[$i]);
112
                    echo $process->getOutput();
113
                }
114
            }
115
        }
116
    }
117
118
    /**
119
     * @param QueuedTaskInterface $queuedTask
120
     */
121
    private function start(QueuedTaskInterface $queuedTask)
122
    {
123
        $command = $this->createCommand($queuedTask);
124
        $process = $this->createProcess($command);
125
126
        $this->processes[] = $process;
127
128
        if (null !== $this->output) {
129
            $this->output->writeln("$command");
130
        }
131
    }
132
133
    /**
134
     * @param string $command
135
     * @return Process
136
     */
137
    private function createProcess($command)
138
    {
139
        $that = $this;
140
        $nl = false;
141
142
        $process = new Process($command);
143
        $process->setTimeout(0);
144
        $process->start(function ($type, $data) use ($that, &$nl) {
145
            if (null !== $that->output) {
146
                if ($nl) {
147
                    $nl = false;
148
                    $that->output->write("\n");
149
                }
150
151
                $that->output->write($data);
152
            }
153
        });
154
155
        return $process;
156
    }
157
158
    /**
159
     * @param QueuedTaskInterface $queuedTask
160
     * @return string
161
     */
162
    private function createCommand(QueuedTaskInterface $queuedTask)
163
    {
164
        $command = sprintf(
165
            'php -d memory_limit=%s app/console task:run --id=%s %s',
166
            ini_get('memory_limit'),
167
            $queuedTask->getId(),
168
            $this->getProcessParams()
169
        );
170
        return $command;
171
    }
172
}
173