Completed
Push — master ( 27dee6...22d68a )
by Stanislav
05:48
created

InfluxDbClient::buildStatus()   B

Complexity

Conditions 3
Paths 4

Size

Total Lines 28
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 28
rs 8.8571
cc 3
eloc 17
nc 4
nop 2
1
<?php
2
3
namespace Popstas\Transmission\Console;
4
5
use GuzzleHttp\Exception\ConnectException;
6
use InfluxDB;
7
use Martial\Transmission\API\Argument\Torrent;
8
use Popstas\Transmission\Console\Helpers\TorrentUtils;
9
use Psr\Log\LoggerInterface;
10
11
class InfluxDbClient
12
{
13
    /**
14
     * @var InfluxDB\Client $influxDb
15
     */
16
    private $influxDb;
17
18
    /**
19
     * @var InfluxDB\Database
20
     */
21
    private $database;
22
23
    /**
24
     * @var LoggerInterface
25
     */
26
    private $logger;
27
28
    private $databaseName;
29
30
    public function __construct(InfluxDB\Client $influxDb, $databaseName)
31
    {
32
33
        $this->influxDb = $influxDb;
34
        $this->databaseName = $databaseName;
35
    }
36
37
    /**
38
     * @return InfluxDB\Database $database
39
     */
40
    private function getDatabase()
41
    {
42
        if (!isset($this->database)) {
43
            $this->database = $this->connectDatabase();
44
        }
45
        return $this->database;
46
    }
47
48
    /**
49
     * @param InfluxDB\Database $database
50
     */
51
    public function setDatabase($database)
52
    {
53
        $this->database = $database;
54
    }
55
56
    /**
57
     * Injects a logger.
58
     *
59
     * @param LoggerInterface $logger
60
     */
61
    public function setLogger(LoggerInterface $logger)
62
    {
63
        $this->logger = $logger;
64
    }
65
66
    /**
67
     * @param string $level
68
     * @param string $message
69
     * @param array $context
70
     */
71
    private function log($level, $message, $context = [])
72
    {
73
        if (!is_null($this->logger)) {
74
            $this->logger->log($level, $message, $context);
75
        }
76
    }
77
78
    /**
79
     * @return InfluxDB\Database
80
     * @throws InfluxDB\Database\Exception
81
     */
82
    public function connectDatabase()
83
    {
84
        if (isset($this->database)) {
85
            $this->database;
86
        }
87
88
        $database = $this->influxDb->selectDB($this->databaseName);
89
90
        try {
91
            $databaseExists = $database->exists();
92
        } catch (ConnectException $e) {
93
            throw new \RuntimeException('InfluxDb connection error: ' . $e->getMessage());
94
        }
95
        if (!$databaseExists) {
96
            $this->log('info', 'Database ' . $this->databaseName . ' not exists, creating');
97
            $database->create(null, false);
98
        }
99
100
        return $database;
101
    }
102
103
    /**
104
     * @param array $torrent
105
     * @param string $transmissionHost
106
     * @return InfluxDB\Point
107
     */
108
    public function buildPoint(array $torrent, $transmissionHost)
109
    {
110
        $age = TorrentUtils::getTorrentAge($torrent);
111
        $lastPoint = $this->getLastPoint($torrent, $transmissionHost);
112
113
        $tagsData = [
114
            'host'             => $transmissionHost,
115
            'torrent_name'     => $torrent[Torrent\Get::NAME],
116
        ];
117
118
        $uploadedDerivative = count($lastPoint) && $torrent[Torrent\Get::UPLOAD_EVER] - $lastPoint['last'] >= 0 ?
119
            $torrent[Torrent\Get::UPLOAD_EVER] - $lastPoint['last'] : $torrent[Torrent\Get::UPLOAD_EVER];
120
121
        $fieldsData = [
122
            'uploaded_last'       => $uploadedDerivative,
123
            'downloaded'          => $torrent[Torrent\Get::TOTAL_SIZE],
124
            'age'                 => $age,
125
            'uploaded_per_day'    => $age ? intval($torrent[Torrent\Get::UPLOAD_EVER] / $age * 86400) : 0,
126
        ];
127
128
        return new InfluxDB\Point(
129
            'uploaded',
130
            $torrent[Torrent\Get::UPLOAD_EVER],
131
            $tagsData,
132
            $fieldsData,
133
            time()
134
        );
135
    }
136
137
    public function buildStatus(array $torrentList, $transmissionHost)
138
    {
139
        $tagsData = [
140
            'host' => $transmissionHost,
141
        ];
142
143
        $fieldsData = [
144
            'total' => 0,
145
        ];
146
147
        for ($statusId = 0; $statusId < 7; $statusId++) {
148
            $fieldsData[TransmissionClient::getTorrentStatusName($statusId)] = 0;
149
        }
150
151
        foreach ($torrentList as $torrent) {
152
            $fieldsData['total']++;
153
            $statusName = TransmissionClient::getTorrentStatusName($torrent[Torrent\Get::STATUS]);
154
            $fieldsData[$statusName]++;
155
        }
156
157
        return new InfluxDB\Point(
158
            'status',
159
            null,
160
            $tagsData,
161
            $fieldsData,
162
            time()
163
        );
164
    }
165
166
    /**
167
     * @param array $torrent
168
     * @param string $transmissionHost
169
     * @return array
170
     */
171
    public function getLastPoint(array $torrent, $transmissionHost)
172
    {
173
        $torrentName = $torrent[Torrent\Get::NAME];
174
        $queryBuilder = $this->getDatabase()->getQueryBuilder();
175
        $results = $queryBuilder
176
            ->last('value')
177
            ->from('uploaded')
178
            ->where([
179
                "host='$transmissionHost'",
180
                "torrent_name='$torrentName'"
181
            ])
182
            ->getResultSet()
183
            ->getPoints();
184
185
        return count($results) ? $results[0] : [];
186
    }
187
    
188
    public function writePoints($points, $precision = InfluxDB\Database::PRECISION_SECONDS)
189
    {
190
        foreach ($points as $point) {
191
            $this->log('debug', 'Send point: {point}', ['point' => $point]);
192
        }
193
        return $this->getDatabase()->writePoints($points, $precision);
194
    }
195
196
    public function sendTorrentPoints(array $torrentList, $transmissionHost)
197
    {
198
        $points = [];
199
        foreach ($torrentList as $torrent) {
200
            $points[] = $this->buildPoint($torrent, $transmissionHost);
201
        }
202
        $isSuccess = $this->writePoints($points);
203
        $this->log('info', 'InfluxDB write ' . ($isSuccess ? 'success' : 'failed'));
204
        return $isSuccess;
205
    }
206
207
    /**
208
     * @param array $torrent
209
     * @param string $fieldName
210
     * @param string $transmissionHost
211
     * @param int $lastDays
212
     * @return int
213
     */
214
    public function getTorrentSum(array $torrent, $fieldName, $transmissionHost = '', $lastDays = 0)
215
    {
216
        $where = [];
217
218
        if (isset($torrent[Torrent\Get::NAME])) {
219
            $where[] = "torrent_name = '" . $torrent[Torrent\Get::NAME] . "'";
220
        }
221
222
        if ($transmissionHost) {
223
            $where[] = "host = '" . $transmissionHost . "'";
224
        }
225
226
        if ($lastDays) {
227
            $fromTimestamp = strtotime('-' . $lastDays . ' days');
228
            $fromDate = date('c', $fromTimestamp);
229
            $where[] = "time >= '$fromDate'";
230
        }
231
232
        $results = $this->getDatabase()->getQueryBuilder()
233
            ->from('uploaded')
234
            ->select("sum($fieldName) as $fieldName")
235
            ->where($where)
236
            ->getResultSet()
237
            ->getPoints();
238
        ;
239
240
        $this->log('debug', $this->influxDb->getLastQuery());
241
242
        if (!empty($results)) {
243
            return $results[0][$fieldName];
244
        }
245
        return 0;
246
    }
247
}
248