KsqlResult   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 90
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Importance

Changes 0
Metric Value
wmc 11
lcom 1
cbo 10
dl 0
loc 90
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
A result() 0 12 2
C detectEntity() 0 66 9
1
<?php
2
declare(strict_types=1);
3
4
namespace Ytake\KsqlClient\Result;
5
6
use Ytake\KsqlClient\Entity\AbstractKsql;
7
use Ytake\KsqlClient\Entity\Description;
8
use Ytake\KsqlClient\Entity\EntityInterface;
9
use Ytake\KsqlClient\Entity\FieldSchema;
10
use Ytake\KsqlClient\Entity\KsqlCollection;
11
use Ytake\KsqlClient\Entity\KsqlErrorMessage;
12
use Ytake\KsqlClient\Entity\KsqlStatementErrorMessage;
13
use Ytake\KsqlClient\Entity\Queries;
14
use Ytake\KsqlClient\Entity\RunningQuery;
15
16
/**
17
 * Class KsqlResult
18
 */
19
class KsqlResult extends AbstractResult
20
{
21
    /**
22
     * @return EntityInterface|KsqlCollection
23
     */
24
    public function result(): EntityInterface
25
    {
26
        $decode = \GuzzleHttp\json_decode(
27
            $this->response->getBody()->getContents(), true
28
        );
29
        $collect = new KsqlCollection();
30
        foreach ($decode as $row) {
31
            $collect->addKsql($this->detectEntity($row));
32
        }
33
34
        return $collect;
35
    }
36
37
    /**
38
     * @param array $row
39
     *
40
     * @return AbstractKsql
41
     */
42
    protected function detectEntity(array $row): AbstractKsql
43
    {
44
        if (isset($row['queries'])) {
45
            $queries = [];
46
            foreach ($row['queries']['queries'] as $query) {
47
                $queries[] = new RunningQuery(
48
                    $query['statementText'],
49
                    $query['sinks'],
50
                    $query['id']
51
                );
52
            }
53
54
            return new Queries($row['queries']['statementText'], $queries);
55
        }
56
57
        if (isset($row['description'])) {
58
            $read = $write = $schema = [];
59
            foreach ($row['description']['readQueries'] as $query) {
60
                $read[] = new RunningQuery(
61
                    $query['statementText'],
62
                    $query['sinks'],
63
                    $query['id']
64
                );
65
            }
66
            foreach ($row['description']['writeQueries'] as $query) {
67
                $write[] = new RunningQuery(
68
                    $query['statementText'],
69
                    $query['sinks'],
70
                    $query['id']
71
                );
72
            }
73
            foreach ($row['description']['schema'] as $query) {
74
                $schema[] = new FieldSchema(
75
                    $query['name'],
76
                    $query['type']
77
                );
78
            }
79
            $description = $row['description'];
80
81
            return new Description(
82
                $row['description']['statementText'],
83
                $description['name'],
84
                $read,
85
                $write,
86
                $schema,
87
                $description['type'],
88
                $description['key'],
89
                $description['timestamp'],
90
                $description['statistics'],
91
                $description['errorStats'],
92
                $description['extended'],
93
                $description['replication'],
94
                $description['partitions']
95
            );
96
        }
97
        if (isset($row['error'])) {
98
            if (isset($row['error']['errorMessage'])) {
99
                $errorMessage = $row['error']['errorMessage'];
100
101
                return new KsqlStatementErrorMessage(
102
                    $row['error']['statementText'],
103
                    new KsqlErrorMessage($errorMessage['message'], $errorMessage['stackTrace'])
104
                );
105
            }
106
        }
107
    }
108
}
109