1
|
|
|
<?php |
2
|
|
|
namespace PHPDaemon\IPCManager; |
3
|
|
|
|
4
|
|
|
use PHPDaemon\Core\Daemon; |
5
|
|
|
use PHPDaemon\Network\Connection; |
6
|
|
|
|
7
|
|
|
class MasterPoolConnection extends Connection |
8
|
|
|
{ |
9
|
|
|
/** |
10
|
|
|
* @TODO DESCR |
11
|
|
|
*/ |
12
|
|
|
const STATE_CONTENT = 1; |
13
|
|
|
/** @var array */ |
14
|
|
|
public $instancesCount = []; |
15
|
|
|
/** @var null */ |
16
|
|
|
protected $timeout = null; // initial value of the minimal amout of bytes in buffer |
17
|
|
|
/** @var int */ |
18
|
|
|
protected $lowMark = 4; // initial value of the maximum amout of bytes in buffer |
19
|
|
|
/** @var int */ |
20
|
|
|
protected $highMark = 0xFFFF; |
21
|
|
|
/** @var */ |
22
|
|
|
protected $workerId; |
23
|
|
|
/** @var */ |
24
|
|
|
protected $packetLength; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @TODO DESCR |
28
|
|
|
*/ |
29
|
|
|
public function onFinish() |
30
|
|
|
{ |
31
|
|
|
unset($this->pool->workers[$this->workerId]); |
32
|
|
|
$this->pool->appInstance->updatedWorkers(); |
33
|
|
|
} |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* @TODO DESCR |
37
|
|
|
* @param $p |
38
|
|
|
*/ |
39
|
|
|
public function sendPacket($p) |
40
|
|
|
{ |
41
|
|
|
$data = igbinary_serialize($p); |
42
|
|
|
$this->write(pack('N', mb_orig_strlen($data)) . $data); |
43
|
|
|
} |
44
|
|
|
|
45
|
|
|
/** |
46
|
|
|
* Called when new data received. |
47
|
|
|
* @return void |
48
|
|
|
*/ |
49
|
|
View Code Duplication |
public function onRead() |
|
|
|
|
50
|
|
|
{ |
51
|
|
|
start: |
52
|
|
|
if ($this->state === self::STATE_ROOT) { |
53
|
|
|
if (false === ($r = $this->readExact(4))) { |
54
|
|
|
return; // not ready yet |
55
|
|
|
} |
56
|
|
|
$u = unpack('N', $r); |
57
|
|
|
$this->packetLength = $u[1]; |
58
|
|
|
$this->state = self::STATE_CONTENT; |
59
|
|
|
} |
60
|
|
|
if ($this->state === self::STATE_CONTENT) { |
61
|
|
|
if (false === ($packet = $this->readExact($this->packetLength))) { |
62
|
|
|
$this->setWatermark($this->packetLength, $this->packetLength); |
63
|
|
|
return; // not ready yet |
64
|
|
|
} |
65
|
|
|
$this->setWatermark(4, 0xFFFF); |
66
|
|
|
$this->state = self::STATE_ROOT; |
67
|
|
|
$this->onPacket(igbinary_unserialize($packet)); |
68
|
|
|
} |
69
|
|
|
goto start; |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* @param $p |
74
|
|
|
*/ |
75
|
|
|
protected function onPacket($p) |
76
|
|
|
{ |
77
|
|
|
if (!is_array($p)) { |
78
|
|
|
return; |
79
|
|
|
} |
80
|
|
|
//Daemon::log(Debug::dump($p));; |
81
|
|
|
if ($p['op'] === 'start') { |
82
|
|
|
$this->workerId = $p['workerId']; |
83
|
|
|
$this->pool->workers[$this->workerId] = $this; |
84
|
|
|
$this->pool->appInstance->updatedWorkers(); |
85
|
|
|
} elseif ($p['op'] === 'broadcastCall') { |
86
|
|
|
$p['op'] = 'call'; |
87
|
|
|
foreach ($this->pool->workers as $worker) { |
88
|
|
|
$worker->sendPacket($p); |
89
|
|
|
} |
90
|
|
|
} elseif ($p['op'] === 'directCall') { |
91
|
|
|
$p['op'] = 'call'; |
92
|
|
|
if (!isset($this->pool->workers[$p['workerId']])) { |
93
|
|
|
Daemon::$process->log('directCall(). not sent.'); |
94
|
|
|
return; |
95
|
|
|
} |
96
|
|
|
$this->pool->workers[$p['workerId']]->sendPacket($p); |
97
|
|
|
} elseif ($p['op'] === 'singleCall') { |
98
|
|
|
$p['op'] = 'call'; |
99
|
|
|
$sent = false; |
100
|
|
|
foreach ($this->pool->workers as $worker) { |
101
|
|
|
$worker->sendPacket($p); |
102
|
|
|
$sent = true; |
103
|
|
|
break; |
104
|
|
|
} |
105
|
|
|
if (!$sent) { |
106
|
|
|
Daemon::$process->log('singleCall(). not sent.'); |
107
|
|
|
} |
108
|
|
|
} elseif ($p['op'] === 'addIncludedFiles') { |
109
|
|
|
foreach ($p['files'] as $file) { |
110
|
|
|
Daemon::$process->fileWatcher->addWatch($file, $this->workerId); |
111
|
|
|
} |
112
|
|
|
} |
113
|
|
|
} |
114
|
|
|
} |
115
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.