Completed
Push — master ( 36822d...38098f )
by Danny
04:15
created

RabbitMqSupervisor::stop()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace MyOnlineStore\Bundle\RabbitMqManagerBundle\Services;
4
5
use Symfony\Component\Templating\EngineInterface;
6
7
class RabbitMqSupervisor
8
{
9
    /**
10
     * @var Supervisor
11
     */
12
    private $supervisor;
13
14
    /**
15
     * @var EngineInterface
16
     */
17
    private $templating;
18
19
    /**
20
     * @var array
21
     */
22
    private $config;
23
24
    /**
25
     * @var string
26
     */
27
    private $application;
28
29
    /**
30
     * @param Supervisor      $supervisor
31
     * @param EngineInterface $templating
32
     * @param array           $config
33
     * @param string          $application
34
     */
35 1
    public function __construct(Supervisor $supervisor, EngineInterface $templating, array $config, $application)
36
    {
37 1
        $this->supervisor = $supervisor;
38 1
        $this->templating = $templating;
39 1
        $this->config = $config;
40 1
        $this->application = $application;
41 1
    }
42
43
    /**
44
     * Generate all supervisor worker configuration files
45
     */
46
    public function generate()
47
    {
48
        // create directory structure
49
        foreach (['worker', 'conf.d', 'logs'] as $directory) {
50
            $path = sprintf('%s/%s/%s', $this->config['path'], $this->application, $directory);
51
52
            if (!is_dir($path)) {
53
                mkdir($path, 0755, true);
54
            }
55
        }
56
57
        // get absolute (root)path
58
        if (!$path = realpath(sprintf('%s/%s', $this->config['path'], $this->application))) {
59
            throw new \RuntimeException(sprintf(
60
                'path "%s/%s" does not exist.',
61
                $this->config['path'],
62
                $this->application
63
            ));
64
        }
65
66
        // create the supervisord.conf configuration file
67
        file_put_contents(
68
            sprintf('%s/%s', $path, 'supervisord.conf'),
69
            $this->templating->render(
70
                'RabbitMqManagerBundle:Supervisor:supervisor.conf.twig', [
71
                    'path' => $path,
72
                ]
73
            )
74
        );
75
76
        // remove old configuration files
77
        $this->cleanDir(sprintf('%s/%s', $path, 'conf.d'));
78
        $this->cleanDir(sprintf('%s/%s', $path, 'worker'));
79
80
        foreach (['consumers', 'rpc_servers'] as $type) {
81
            foreach ($this->config[$type] as $name => $consumer) {
82
                if (!isset($consumer['worker']['queue']['routing'])) {
83
                    $this->writeConfig(
84
                        sprintf('%s_%s', $type, $name),
85
                        $path,
86
                        $consumer
87
                    );
88
89
                    continue;
90
                }
91
92
                foreach ($consumer['worker']['queue']['routing'] as $index => $route) {
93
                    $this->writeConfig(
94
                        sprintf('%s_%s_%s', $type, $name, $index),
95
                        $path,
96
                        $consumer,
97
                        $route
98
                    );
99
                }
100
            }
101
        }
102
    }
103
104
    /**
105
     * @param string $name
106
     * @param string $path
107
     * @param array  $consumer
108
     * @param null   $route
109
     */
110
    private function writeConfig($name, $path, array $consumer, $route = null)
111
    {
112
        if ('cli-consumer' === $consumer['processor']) {
113
            // write additional cli-consumer config
114
            file_put_contents(
115
                $consumerConfiguration = sprintf('%s/worker/%s.conf', $path, $name),
116
                $this->templating->render('RabbitMqManagerBundle:Supervisor:consumer.conf.twig', [
117
                    'path' => $path,
118
                    'routing_key' => $route,
119
                    'consumer' => $consumer,
120
                ])
121
            );
122
123
            $content = $this->templating->render('RabbitMqManagerBundle:Supervisor/processor:cli-consumer.conf.twig', [
124
                'path' => $path,
125
                'configuration' => $consumerConfiguration,
126
                'consumer' => $consumer,
127
            ]);
128
        } else {
129
            $consumer['command']['arguments'][] = sprintf('--messages=%s', $consumer['messages']);
130
131
            if (null !== $route) {
132
                $consumer['command']['arguments'][] = sprintf('--route=%s', $route);
133
            }
134
135
            $content = $this->templating->render('RabbitMqManagerBundle:Supervisor/processor:bundle.conf.twig', [
136
                'path' => $path,
137
                'consumer' => $consumer,
138
            ]);
139
        }
140
141
        file_put_contents(
142
            sprintf('%s/conf.d/%s.conf', $path, $name),
143
            $content
144
        );
145
    }
146
147
    /**
148
     * Stop supervisord and all processes
149
     */
150
    public function stop()
151
    {
152
        $this->kill('', true);
153
    }
154
155
    /**
156
     * Start supervisord and all processes
157
     */
158 1
    public function start()
159
    {
160 1
        $this->supervisor->run();
161 1
        $this->supervisor->reloadAndUpdate();
162 1
    }
163
164
    /**
165
     * Send -HUP to supervisord to gracefully restart all processes
166
     */
167
    public function hup()
168
    {
169
        $this->kill('HUP');
170
    }
171
172
    /**
173
     * Send kill signal to supervisord
174
     *
175
     * @param string $signal
176
     * @param bool $waitForProcessToDisappear
177
     */
178
    public function kill($signal = '', $waitForProcessToDisappear = false)
179
    {
180
        $pid = $this->getSupervisorPid();
181
        if (!empty($pid) && $this->isProcessRunning($pid)) {
182
            if (!empty($signal)) {
183
                $signal = sprintf('-%s', $signal);
184
            }
185
186
            $command = sprintf('kill %s %d', $signal, $pid);
187
188
            passthru($command);
189
190
            if ($waitForProcessToDisappear) {
191
                $this->wait();
192
            }
193
        }
194
    }
195
196
    /**
197
     * Wait for supervisord process to disappear
198
     */
199
    public function wait()
200
    {
201
        $pid = $this->getSupervisorPid();
202
        if (!empty($pid)) {
203
            while ($this->isProcessRunning($pid)) {
204
                sleep(1);
205
            }
206
        }
207
    }
208
209
    /**
210
     * Check if a process with the given pid is running
211
     *
212
     * @param int $pid
213
     * @return bool
214
     */
215
    private function isProcessRunning($pid) {
216
        $state = array();
217
        exec(sprintf('ps %d', $pid), $state);
218
219
        /*
220
         * ps will return at least one row, the column labels.
221
         * If the process is running ps will return a second row with its status.
222
         */
223
        return 1 < count($state);
224
    }
225
226
    /**
227
     * Determines the supervisord process id
228
     *
229
     * @return null|int
230
     */
231
    private function getSupervisorPid() {
232
233
        $pidPath = sprintf('%s/%s/%s', realpath($this->config['path']), $this->application, 'supervisord.pid');
234
235
        $pid = null;
236
        if (is_file($pidPath) && is_readable($pidPath)) {
237
            $pid = (int)file_get_contents($pidPath);
238
        }
239
240
        return $pid;
241
    }
242
243
    /**
244
     * @param string $path
245
     */
246
    private function cleanDir($path)
247
    {
248
        /** @var \SplFileInfo $item */
249
        foreach (new \DirectoryIterator($path) as $item) {
250
            if ($item->isDir()) {
251
                continue;
252
            }
253
254
            if ('conf' !== $item->getExtension()) {
255
                continue;
256
            }
257
258
            unlink($item->getRealPath());
259
        }
260
261
    }
262
}
263