Reactor::execQuery()   A
last analyzed

Complexity

Conditions 3
Paths 5

Size

Total Lines 13
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 3

Importance

Changes 3
Bugs 1 Features 1
Metric Value
c 3
b 1
f 1
dl 0
loc 13
ccs 9
cts 9
cp 1
rs 9.4285
cc 3
eloc 9
nc 5
nop 1
crap 3
1
<?php
2
3
/**
4
 * The MIT License (MIT)
5
 *
6
 * Copyright (c) 2015 Repo2
7
 *
8
 * Permission is hereby granted, free of charge, to any person obtaining a copy
9
 * of this software and associated documentation files (the "Software"), to deal
10
 * in the Software without restriction, including without limitation the rights
11
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12
 * copies of the Software, and to permit persons to whom the Software is
13
 * furnished to do so, subject to the following conditions:
14
 *
15
 * The above copyright notice and this permission notice shall be included in all
16
 * copies or substantial portions of the Software.
17
 *
18
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
 * SOFTWARE.
25
 */
26
27
namespace Repo2\QueryReactor;
28
29
class Reactor
30
{
31
    /** @var Driver */
32
    private $driver;
33
34
    /** @var Controller */
35
    private $controller;
36
37
    /** @var \SplObjectStorage */
38
    private $active;
39
40
    /**
41
     * @param Driver $driver
42
     * @param Controller $controller
43
     */
44 60
    public function __construct(Driver $driver, Controller $controller)
45
    {
46 60
        $this->driver = $driver;
47 60
        $this->controller = $controller;
48 60
        $this->active = new \SplObjectStorage();
49 60
    }
50
51
    /**
52
     * @param Query $query
53
     * @return Reactor
54
     */
55 60
    public function execQuery(Query $query)
56
    {
57
        try {
58 60
            $link = $this->controller->getLink($this->driver, $query);
59 57
            if ($link) {
60 57
                $this->driver->query($link, $query->getExpression());
61 54
                $this->active->attach($link, $query);
62 54
            }
63 60
        } catch (\Exception $err) {
64 9
            $query->reject($err);
65
        }
66 60
        return $this;
67
    }
68
69
    /**
70
     * @param \Iterator $iterator
71
     * @return Reactor
72
     */
73 18
    public function execIterator(\Iterator $iterator)
74
    {
75 18
        foreach ($iterator as $query) {
76 15
            $this->execQuery($query);
77 15
        }
78 15
        return $this;
79
    }
80
81
    /**
82
     * @param mixed $link
83
     */
84 54
    private function free($link)
85
    {
86 54
        $query = $this->controller->getQuery($this->driver, $link);
87 54
        if ($query) {
88
            try {
89 9
                $this->driver->query($link, $query->getExpression());
90 6
                $this->active->attach($link, $query);
91 9
            } catch (\Exception $err) {
92 3
                $this->active->detach($link);
93 3
                $query->reject($err);
94
            }
95 9
        } else {
96 51
            $this->active->detach($link);
97
        }
98 54
    }
99
100 54
    private function poll()
101
    {
102 54
        $links = [];
103 54
        foreach ($this->active as $link) {
104 54
            $links[] = $link;
105 54
        }
106
107 54
        list($read, $error) = $this->driver->poll($links);
108
109 54
        foreach ($error as $link) {
110 3
            $query = $this->active[$link];
111 3
            $err = $this->driver->error($link);
112 3
            $this->free($link);
113 3
            $query->reject($err);
114 54
        }
115
116 54
        foreach ($read as $link) {
117 51
            $query = $this->active[$link];
118
            try {
119 51
                $result = $this->driver->read($link);
120 51
            } catch (\Exception $err) {
121 9
                $this->free($link);
122 6
                $query->reject($err);
123 9
                continue;
124
            }
125 51
            $this->free($link);
126 51
            $subquery = $query->resolve($result);
127 51
            if ($subquery instanceof Query) {
128 6
                $this->execQuery($subquery);
129 51
            } elseif ($subquery instanceof \Iterator) {
130 6
                $this->execIterator($subquery);
131 6
            }
132 54
        }
133 54
    }
134
135 60
    public function await()
136
    {
137 60
        while ($this->active->count()) {
138 54
            $this->poll();
139 54
        }
140 60
    }
141
}
142