Completed
Push — master ( 6af005...8c6e98 )
by Alessandro
05:47
created

InfluxDbEventListener::onConsoleTerminate()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Algatux\InfluxDbBundle\Events\Listeners;
6
7
use Algatux\InfluxDbBundle\Events\AbstractDeferredInfluxDbEvent;
8
use Algatux\InfluxDbBundle\Events\AbstractInfluxDbEvent;
9
use Algatux\InfluxDbBundle\Events\DeferredUdpEvent;
10
use Algatux\InfluxDbBundle\Events\UdpEvent;
11
use InfluxDB\Database;
12
use InfluxDB\Point;
13
14
/**
15
 * @internal
16
 */
17
final class InfluxDbEventListener
18
{
19
    const STORAGE_KEY_UDP = 'udp';
20
    const STORAGE_KEY_HTTP = 'http';
21
    /**
22
     * @var string
23
     */
24
    private $connection;
25
    /**
26
     * @var bool
27
     */
28
    private $isDefault;
29
    /**
30
     * @var Database
31
     */
32
    private $httpDatabase;
33
    /**
34
     * @var Database
35
     */
36
    private $udpDatabase;
37
    /**
38
     * @var array
39
     */
40
    private $storage;
41
42
    /**
43
     * @param string   $connection
44
     * @param bool     $isDefault
45
     * @param Database $httpDatabase
46
     * @param Database $udpDatabase
47
     */
48 8
    public function __construct(
49
        string $connection,
50
        bool $isDefault,
51
        Database $httpDatabase,
52
        Database $udpDatabase = null
53
    ) {
54 8
        $this->connection = $connection;
55 8
        $this->isDefault = $isDefault;
56 8
        $this->httpDatabase = $httpDatabase;
57 8
        $this->udpDatabase = $udpDatabase;
58 8
        $this->initStorage();
59 8
    }
60
61
    /**
62
     * @param AbstractInfluxDbEvent $event
63
     *
64
     * @return bool
65
     */
66 8
    public function onPointsCollected(AbstractInfluxDbEvent $event): bool
67
    {
68 8
        if (!$this->isConcerned($event)) {
69 1
            return false;
70
        }
71
72 7
        $this->testUdpConnection($event);
73
74 5
        if ($event instanceof AbstractDeferredInfluxDbEvent) {
75 3
            $this->addEventPointsToStorage($event);
76
77 3
            return true;
78
        }
79
80 2
        $this->writeEventPoints($event);
81
82 2
        return true;
83
    }
84
85 3
    public function onProcessTerminate(): bool
86
    {
87 3
        foreach ($this->storage[static::STORAGE_KEY_UDP] as $precision => $points) {
88 1
            $this->writeUdpPoints($points, $precision);
89
        }
90
91 3
        foreach ($this->storage[static::STORAGE_KEY_HTTP] as $precision => $points) {
92 2
            $this->writeHttpPoints($points, $precision);
93
        }
94
95
        // Reset the storage after writing points.
96 3
        $this->initStorage();
97
98 3
        return true;
99
    }
100
101
    /**
102
     * @return bool
103
     */
104 2
    public function onKernelTerminate(): bool
105
    {
106 2
        return $this->onProcessTerminate();
107
    }
108
109
    /**
110
     * @return bool
111
     */
112 1
    public function onConsoleTerminate(): bool
113
    {
114 1
        return $this->onProcessTerminate();
115
    }
116
117 8
    private function initStorage()
118
    {
119 8
        $this->storage = [
120 8
            static::STORAGE_KEY_UDP => [],
121 8
            static::STORAGE_KEY_HTTP => [],
122
        ];
123 8
    }
124
125
    /**
126
     * @param Point[] $points
127
     * @param string  $precision
128
     */
129 2
    private function writeUdpPoints(array $points, string $precision)
130
    {
131 2
        $this->udpDatabase->writePoints($points, $precision);
132 2
    }
133
134
    /**
135
     * @param Point[] $points
136
     * @param string  $precision
137
     */
138 3
    private function writeHttpPoints(array $points, string $precision)
139
    {
140 3
        $this->httpDatabase->writePoints($points, $precision);
141 3
    }
142
143
    /**
144
     * @param AbstractInfluxDbEvent $event
145
     */
146 3
    private function addEventPointsToStorage(AbstractInfluxDbEvent $event)
147
    {
148 3
        $typeKey = $event instanceof DeferredUdpEvent ? static::STORAGE_KEY_UDP : static::STORAGE_KEY_HTTP;
149 3
        $precision = $event->getPrecision();
150 3
        $points = $event->getPoints();
151
152 3
        if (array_key_exists($precision, $this->storage[$typeKey])) {
153 3
            $this->storage[$typeKey][$precision] = array_merge($this->storage[$typeKey][$precision], $points);
154
155 3
            return;
156
        }
157
158 3
        $this->storage[$typeKey][$precision] = $points;
159 3
    }
160
161
    /**
162
     * @param AbstractInfluxDbEvent $event
163
     *
164
     * @return bool
165
     */
166 8
    private function isConcerned(AbstractInfluxDbEvent $event): bool
167
    {
168 8
        return $this->connection === $event->getConnection() || is_null($event->getConnection()) && $this->isDefault;
169
    }
170
171
    /**
172
     * @param AbstractInfluxDbEvent $event
173
     */
174 7
    private function testUdpConnection(AbstractInfluxDbEvent $event)
175
    {
176 7
        if (!$this->udpDatabase && ($event instanceof UdpEvent || $event instanceof DeferredUdpEvent)) {
177 2
            throw new \RuntimeException(
178 2
                'No UDP connection available for database "'.$this->httpDatabase->getName().'". '
179 2
                .'You must enable it on the configuration to use it.'
180
            );
181
        }
182 5
    }
183
184
    /**
185
     * @param AbstractInfluxDbEvent $event
186
     */
187 2
    private function writeEventPoints(AbstractInfluxDbEvent $event)
188
    {
189 2
        $points = $event->getPoints();
190 2
        $precision = $event->getPrecision();
191
192 2
        if ($event instanceof UdpEvent) {
193 1
            $this->writeUdpPoints($points, $precision);
194
195 1
            return;
196
        }
197
198 1
        $this->writeHttpPoints($points, $precision);
199 1
    }
200
}
201