Completed
Push — master ( a38962...fdce7f )
by Danny
04:51
created

RabbitMqManager::wait()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 8
cp 0
rs 9.6666
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 0
crap 12
1
<?php
2
3
namespace MyOnlineStore\Bundle\RabbitMqManagerBundle\Manager;
4
5
use MyOnlineStore\Bundle\RabbitMqManagerBundle\Services\Supervisor;
6
use Symfony\Component\Templating\EngineInterface;
7
8
class RabbitMqManager implements RabbitMqManagerInterface
9
{
10
    /**
11
     * @var Supervisor
12
     */
13
    private $supervisor;
14
15
    /**
16
     * @var EngineInterface
17
     */
18
    private $templating;
19
20
    /**
21
     * @var array
22
     */
23
    private $config;
24
25
    /**
26
     * @var string
27
     */
28
    private $application;
29
30
    /**
31
     * @param Supervisor      $supervisor
32
     * @param EngineInterface $templating
33
     * @param array           $config
34
     * @param string          $application
35
     */
36
    public function __construct(Supervisor $supervisor, EngineInterface $templating, array $config, $application)
37
    {
38
        $this->supervisor = $supervisor;
39
        $this->templating = $templating;
40
        $this->config = $config;
41
        $this->application = $application;
42
    }
43
44
    /**
45
     * @inheritdoc
46
     */
47
    public function generate()
48
    {
49
        // create directory structure
50
        foreach (['worker', 'conf.d', 'logs'] as $directory) {
51
            $path = sprintf('%s/%s/%s', $this->config['path'], $this->application, $directory);
52
53
            if (!is_dir($path)) {
54
                mkdir($path, 0755, true);
55
            }
56
        }
57
58
        // get absolute (root)path
59
        if (!$path = realpath(sprintf('%s/%s', $this->config['path'], $this->application))) {
60
            throw new \RuntimeException(sprintf(
61
                'path "%s/%s" does not exist.',
62
                $this->config['path'],
63
                $this->application
64
            ));
65
        }
66
67
        // create the supervisord.conf configuration file
68
        file_put_contents(
69
            sprintf('%s/%s', $path, 'supervisord.conf'),
70
            $this->templating->render(
71
                'RabbitMqManagerBundle:Supervisor:supervisor.conf.twig', [
72
                    'path' => $path,
73
                ]
74
            )
75
        );
76
77
        // remove old configuration files
78
        $this->cleanDir(sprintf('%s/%s', $path, 'conf.d'));
79
        $this->cleanDir(sprintf('%s/%s', $path, 'worker'));
80
81
        foreach (['consumers', 'rpc_servers'] as $type) {
82
            foreach ($this->config[$type] as $name => $consumer) {
83
                if (!isset($consumer['worker']['queue']['routing'])) {
84
                    $this->writeConfig(
85
                        sprintf('%s_%s', $type, $name),
86
                        $path,
87
                        $consumer
88
                    );
89
90
                    continue;
91
                }
92
93
                foreach ($consumer['worker']['queue']['routing'] as $index => $route) {
94
                    $this->writeConfig(
95
                        sprintf('%s_%s_%s', $type, $name, $index),
96
                        $path,
97
                        $consumer,
98
                        $route
99
                    );
100
                }
101
            }
102
        }
103
    }
104
105
    /**
106
     * @param string $name
107
     * @param string $path
108
     * @param array  $consumer
109
     * @param null   $route
110
     */
111
    private function writeConfig($name, $path, array $consumer, $route = null)
112
    {
113
        if ('cli-consumer' === $consumer['processor']) {
114
            // write additional cli-consumer config
115
            file_put_contents(
116
                $consumerConfiguration = sprintf('%s/worker/%s.conf', $path, $name),
117
                $this->templating->render('RabbitMqManagerBundle:Supervisor:consumer.conf.twig', [
118
                    'path' => $path,
119
                    'routing_key' => $route,
120
                    'consumer' => $consumer,
121
                ])
122
            );
123
124
            $content = $this->templating->render('RabbitMqManagerBundle:Supervisor/processor:cli-consumer.conf.twig', [
125
                'path' => $path,
126
                'configuration' => $consumerConfiguration,
127
                'consumer' => $consumer,
128
            ]);
129
        } else {
130
            $consumer['command']['arguments'][] = sprintf('--messages=%s', $consumer['messages']);
131
132
            if (null !== $route) {
133
                $consumer['command']['arguments'][] = sprintf('--route=%s', $route);
134
            }
135
136
            $content = $this->templating->render('RabbitMqManagerBundle:Supervisor/processor:bundle.conf.twig', [
137
                'path' => $path,
138
                'consumer' => $consumer,
139
            ]);
140
        }
141
142
        file_put_contents(
143
            sprintf('%s/conf.d/%s.conf', $path, $name),
144
            $content
145
        );
146
    }
147
148
    /**
149
     * @inheritdoc
150
     */
151
    public function stop()
152
    {
153
        $this->kill('', true);
154
    }
155
156
    /**
157
     * @inheritdoc
158
     */
159
    public function start()
160
    {
161
        $this->supervisor->run();
162
        $this->supervisor->reloadAndUpdate();
163
    }
164
165
    /**
166
     * @inheritdoc
167
     */
168
    public function hup()
169
    {
170
        $this->kill('HUP');
171
    }
172
173
    /**
174
     * @inheritdoc
175
     */
176
    public function kill($signal = '', $waitForProcessToDisappear = false)
177
    {
178
        $pid = $this->getSupervisorPid();
179
        if (!empty($pid) && $this->isProcessRunning($pid)) {
180
            if (!empty($signal)) {
181
                $signal = sprintf('-%s', $signal);
182
            }
183
184
            $command = sprintf('kill %s %d', $signal, $pid);
185
186
            passthru($command);
187
188
            if ($waitForProcessToDisappear) {
189
                $this->wait();
190
            }
191
        }
192
    }
193
194
    /**
195
     * @inheritdoc
196
     */
197
    public function wait()
198
    {
199
        $pid = $this->getSupervisorPid();
200
        if (!empty($pid)) {
201
            while ($this->isProcessRunning($pid)) {
202
                sleep(1);
203
            }
204
        }
205
    }
206
207
    /**
208
     * Check if a process with the given pid is running
209
     *
210
     * @param int $pid
211
     * @return bool
212
     */
213
    private function isProcessRunning($pid) {
214
        $state = array();
215
        exec(sprintf('ps %d', $pid), $state);
216
217
        /*
218
         * ps will return at least one row, the column labels.
219
         * If the process is running ps will return a second row with its status.
220
         */
221
        return 1 < count($state);
222
    }
223
224
    /**
225
     * Determines the supervisord process id
226
     *
227
     * @return null|int
228
     */
229
    private function getSupervisorPid() {
230
231
        $pidPath = sprintf('%s/%s/%s', realpath($this->config['path']), $this->application, 'supervisord.pid');
232
233
        $pid = null;
234
        if (is_file($pidPath) && is_readable($pidPath)) {
235
            $pid = (int)file_get_contents($pidPath);
236
        }
237
238
        return $pid;
239
    }
240
241
    /**
242
     * @param string $path
243
     */
244
    private function cleanDir($path)
245
    {
246
        /** @var \SplFileInfo $item */
247
        foreach (new \DirectoryIterator($path) as $item) {
248
            if ($item->isDir()) {
249
                continue;
250
            }
251
252
            if ('conf' !== $item->getExtension()) {
253
                continue;
254
            }
255
256
            unlink($item->getRealPath());
257
        }
258
    }
259
}
260