MysqliAsync::__construct()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
namespace BenTools\SimpleDBAL\Model\Adapter\Mysqli;
4
5
use GuzzleHttp\Promise\Promise;
6
use GuzzleHttp\Promise\PromiseInterface;
7
use mysqli;
8
use mysqli_result;
9
use mysqli_sql_exception;
10
11
final class MysqliAsync implements \Countable
12
{
13
    private $queries   = [];
14
    private $processed = 0;
15
16
    private static $instance;
17
    private static $sleep = 0;
18
    private static $usleep = 50000;
19
20
    /**
21
     * MysqliAsync constructor.
22
     * Disabled to force singleton use.
23
     */
24
    private function __construct()
25
    {
26
    }
27
28
    /**
29
     * @param string $query
30
     * @param mysqli $cnx
31
     * @return Promise
32
     */
33
    public static function query(string $query, mysqli $cnx)
34
    {
35
        $promise = new Promise(function () {
36
            self::getInstance()->wait();
37
        });
38
        $cnx->query($query, MYSQLI_ASYNC);
39
        self::getInstance()->queries[] = [
40
            'l' => $cnx,
41
            'p' => $promise,
42
        ];
43
        return $promise;
44
    }
45
46
    /**
47
     * @return MysqliAsync
48
     */
49
    public static function getInstance()
50
    {
51
        if (null === self::$instance) {
52
            self::$instance = new self;
53
        }
54
55
        return self::$instance;
56
    }
57
58
    /**
59
     * Poll every $wait seconds.
60
     * To poll every 100ms, call pollEvery(0.100).
61
     *
62
     * @param float $wait
63
     */
64
    public static function pollEvery(float $wait)
65
    {
66
        if (false === strpos((string) $wait, '.')) {
67
            self::$sleep = (int) $wait;
68
            self::$usleep = 0;
69
        } else {
70
            list($seconds, $hundreds) = explode('.', (string) $wait);
71
            self::$sleep = (int) $seconds;
72
            self::$usleep = (int) (((float) sprintf('0.%d', $hundreds)) * 1000000);
73
        }
74
    }
75
76
    /**
77
     * @param array $query
78
     * @return mysqli
79
     */
80
    private function getConnection(array $query): mysqli
81
    {
82
        return $query['l'];
83
    }
84
85
    /**
86
     * @param array $query
87
     * @return PromiseInterface
88
     */
89
    private function getPromise(array $query): PromiseInterface
90
    {
91
        return $query['p'];
92
    }
93
94
    /**
95
     * Resets current pool.
96
     */
97
    private function reset()
98
    {
99
        $this->queries   = [];
100
        $this->processed = 0;
101
    }
102
103
    /**
104
     * Wait for pending queries to complete.
105
     */
106
    private function wait()
107
    {
108
        do {
109
            if (empty($this->queries)) {
110
                break;
111
            }
112
            $links = $errors = $reject = [];
113
            foreach ($this->queries as $link) {
114
                $links[] = $errors[] = $reject[] = $link['l'];
115
            }
116
            if (!mysqli_poll($links, $errors, $reject, self::$sleep, self::$usleep)) {
117
                continue;
118
            }
119
            foreach ($this->queries as $l => $link) {
120
                $promise = $this->getPromise($link);
121
                $cnx    = $this->getConnection($link);
122
                try {
123
                    $result = $cnx->reap_async_query();
124
                    if ($result instanceof mysqli_result) {
125
                        $promise->resolve($result);
126
                    } else {
127
                        $errNo = mysqli_errno($cnx);
128
                        $errStr = mysqli_error($cnx);
129
                        if (0 !== $errNo || 0 !== strlen($errStr)) {
130
                            throw new mysqli_sql_exception($errStr, $errNo);
131
                        } else {
132
                            $promise->resolve($cnx);
133
                        }
134
                    }
135
                } catch (mysqli_sql_exception $e) {
136
                    $promise->reject($e);
137
                }
138
                $this->processed++;
139
                unset($this->queries[$l]);
140
            }
141
        } while ($this->processed < count($this->queries));
142
        $this->reset();
143
    }
144
145
    /**
146
     * @inheritDoc
147
     */
148
    public function count()
149
    {
150
        return count($this->queries);
151
    }
152
}
153