1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace Seasx\SeasLogger\Kafka\Socket; |
5
|
|
|
|
6
|
|
|
use Co; |
7
|
|
|
use Exception; |
8
|
|
|
use SplQueue; |
9
|
|
|
|
10
|
|
|
/** |
11
|
|
|
* Class Pool |
12
|
|
|
* @package Seasx\SeasLogger\Kafka\Socket |
13
|
|
|
*/ |
14
|
|
|
final class Pool |
15
|
|
|
{ |
16
|
|
|
/** @var int */ |
17
|
|
|
private $currentCount = 0; |
18
|
|
|
/** @var SplQueue */ |
19
|
|
|
private $queue; |
20
|
|
|
/** @var int */ |
21
|
|
|
private $maxWait = 0; |
22
|
|
|
/** @var int */ |
23
|
|
|
private $retry = 3; |
24
|
|
|
/** @var int */ |
25
|
|
|
private $waitReconnect = 1; |
26
|
|
|
/** |
27
|
|
|
* @var SplQueue |
28
|
|
|
*/ |
29
|
|
|
private $waitStack; |
30
|
|
|
/** @var int */ |
31
|
|
|
private $active = 3; |
32
|
|
|
/** @var string */ |
33
|
|
|
private $uri = 'localhost:9092'; |
34
|
|
|
/** @var null |int */ |
35
|
|
|
private $timeout = null; |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* Pool constructor. |
39
|
|
|
* @param array $configs |
40
|
|
|
*/ |
41
|
|
|
public function __construct(array $configs) |
42
|
|
|
{ |
43
|
|
|
foreach ($configs as $name => $value) { |
44
|
|
|
if (property_exists($this, $name)) { |
45
|
|
|
$this->$name = $value; |
46
|
|
|
} |
47
|
|
|
} |
48
|
|
|
$this->queue = new SplQueue(); |
49
|
|
|
$this->waitStack = new SplQueue(); |
50
|
|
|
} |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* @param SocketIO $connection |
54
|
|
|
*/ |
55
|
|
|
public function release(SocketIO $connection) |
56
|
|
|
{ |
57
|
|
|
if ($this->queue->count() < $this->active) { |
58
|
|
|
$this->queue->push($connection); |
59
|
|
|
if ($this->waitStack->count() > 0) { |
60
|
|
|
$id = $this->waitStack->shift(); |
61
|
|
|
Co::resume($id); |
62
|
|
|
} |
63
|
|
|
} |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @return SocketIO |
68
|
|
|
* @throws Exception |
69
|
|
|
*/ |
70
|
|
|
public function getConnection(): SocketIO |
71
|
|
|
{ |
72
|
|
|
if (!$this->queue->isEmpty()) { |
73
|
|
|
return $this->queue->shift(); |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
if ($this->currentCount >= $this->active) { |
77
|
|
|
if ($this->maxWait > 0 && $this->waitStack->count() > $this->maxWait) { |
78
|
|
|
throw new Exception('Connection pool queue is full'); |
79
|
|
|
} |
80
|
|
|
$this->waitStack->push((int)Co::getCid()); |
|
|
|
|
81
|
|
|
Co::yield(); |
82
|
|
|
return $this->queue->shift(); |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
$connection = $this->createConnection(); |
86
|
|
|
$this->currentCount++; |
87
|
|
|
if ($connection->check() === false) { |
88
|
|
|
$connection->reconnect(); |
89
|
|
|
} |
90
|
|
|
return $connection; |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* @return SocketIO |
95
|
|
|
* @throws Exception |
96
|
|
|
*/ |
97
|
|
|
public function createConnection(): SocketIO |
98
|
|
|
{ |
99
|
|
|
$socket = new SocketIO(); |
100
|
|
|
$socket->createConnection([ |
101
|
|
|
'uri' => $this->uri, |
102
|
|
|
'retry' => $this->retry, |
103
|
|
|
'sleep' => $this->waitReconnect, |
104
|
|
|
'timeout' => $this->timeout |
105
|
|
|
]); |
106
|
|
|
return $socket; |
107
|
|
|
} |
108
|
|
|
} |
This check looks for function or method calls that always return null and whose return value is used.
The method
getObject()
can return nothing but null, so it makes no sense to use the return value.The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.