Completed
Push — master ( 03835b...a56aa2 )
by Emil
02:19
created

QueueProcessor::setDoctrine()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
1
<?php
2
3
namespace Glooby\TaskBundle\Queue;
4
5
use Doctrine\Common\Persistence\ManagerRegistry;
6
use Glooby\TaskBundle\Model\QueuedTaskInterface;
7
use Symfony\Component\Console\Output\OutputInterface;
8
use Symfony\Component\Process\Process;
9
10
/**
11
 * @author Emil Kilhage
12
 */
13
class QueueProcessor
14
{
15
    /**
16
     * @var int
17
     */
18
    private $limit;
19
20
    /**
21
     * @var bool
22
     */
23
    private $debug;
24
25
    /**
26
     * @var Process[]
27
     */
28
    private $processes = [];
29
30
    /**
31
     * @var OutputInterface
32
     */
33
    protected $output;
34
35
    /**
36
     * @var ManagerRegistry
37
     */
38
    protected $doctrine;
39
40
    /**
41
     * @param ManagerRegistry $doctrine
42
     */
43
    public function setDoctrine($doctrine)
44
    {
45
        $this->doctrine = $doctrine;
46
    }
47
48
    /**
49
     * @param OutputInterface $output
50
     */
51
    public function setOutput(OutputInterface $output)
52
    {
53
        $this->output = $output;
54
    }
55
56
    /**
57
     * @param boolean $debug
58
     */
59
    public function setDebug(bool $debug)
60
    {
61
        $this->debug = $debug;
62
    }
63
64
    /**
65
     * @param int $limit
66
     */
67
    public function setLimit(int $limit)
68
    {
69
        $this->limit = $limit;
70
    }
71
72
    /**
73
     * @throws \Exception
74
     */
75
    public function process()
76
    {
77
        $queueRepo = $this->doctrine->getManager()
78
            ->getRepository('GloobyTaskBundle:QueuedTask');
79
80
        foreach ($queueRepo->findPending($this->limit) as $queuedTask) {
81
            $this->start($queuedTask);
82
        }
83
84
        $this->wait();
85
    }
86
87
    /**
88
     * @return string
89
     */
90
    private function getProcessParams()
91
    {
92
        $params = [];
93
94
        if (!$this->debug) {
95
            $params[] = '--env=prod';
96
        }
97
98
        return implode(' ', $params);
99
    }
100
101
    /**
102
     *
103
     */
104
    private function wait()
105
    {
106
        while (count($this->processes) > 0) {
107
            sleep(1);
108
109
            foreach ($this->processes as $i => $process) {
110
                if (!$process->isRunning()) {
111
                    unset($this->processes[$i]);
112
                    echo $process->getOutput();
113
                }
114
            }
115
        }
116
    }
117
118
    /**
119
     * @param QueuedTaskInterface $queuedTask
120
     */
121
    private function start(QueuedTaskInterface $queuedTask)
122
    {
123
        $that = $this;
124
125
        $command = sprintf(
126
            'php -d memory_limit=%s app/console task:run --id=%s %s',
127
            ini_get('memory_limit'),
128
            $queuedTask->getId(),
129
            $this->getProcessParams()
130
        );
131
132
        $nl = false;
133
        $process = new Process($command);
134
        $process->setTimeout(0);
135
        $process->start(function ($type, $data) use ($that, &$nl) {
136
            if (null !== $that->output) {
137
                if ($nl) {
138
                    $nl = false;
139
                    $that->output->write("\n");
140
                }
141
142
                $that->output->write($data);
143
            }
144
        });
145
146
        $this->processes[] = $process;
147
148
        if (null !== $that->output) {
149
            $this->output->writeln("$command");
150
        }
151
    }
152
}
153